RabbitMQ 05 直连模式-Spring Boot操作

发布时间 2023-03-26 15:27:07作者: 程序航

Spring Boot操作

Spring Boot集成RabbitMQ是现在主流的操作RabbitMQ的方式。

官方文档:https://docs.spring.io/spring-amqp/docs/current/reference/html/

  1. 引入依赖。

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  2. 添加配置。

    spring:
      rabbitmq:
        addresses: 127.0.0.1
        username: admin
        password: admin
        virtual-host: /test
    
  3. 配置类。

    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Exchange;
    import org.springframework.amqp.core.ExchangeBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.QueueBuilder;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * RabbitMQ配置类
     */
    @Configuration
    public class RabbitMqConfig {
    
        /**
         * 定义交换机,可以很多个
         * @return 交换机对象
         */
        @Bean("directExchange")
        public Exchange exchange(){
            return ExchangeBuilder.directExchange("amq.direct").build();
        }
    
        /**
         * 定义消息队列
         * @return 消息队列对象
         */
        @Bean("testQueue")
        public Queue queue(){
            return QueueBuilder
                    // 非持久化类型
                    .nonDurable("test_springboot")
                    .build();
        }
    
        /**
         * 定义绑定关系
         * @return 绑定关系
         */
        @Bean
        public Binding binding(@Qualifier("directExchange") Exchange exchange,
                               @Qualifier("testQueue") Queue queue){
            // 将定义的交换机和队列进行绑定
            return BindingBuilder
                    // 绑定队列
                    .bind(queue)
                    // 到交换机
                    .to(exchange)
                    // 使用自定义的routingKey
                    .with("test_springboot_key")
                    // 不设置参数
                    .noargs();
        }
    }
    

普通消费

  1. 实现生产者。

    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    @SpringBootTest
    class RabbitMqSpringBootTests {
    
        /**
         * RabbitTemplate封装了大量的RabbitMQ操作,已经由Starter提供,因此直接注入使用即可
         */
        @Autowired
        private RabbitTemplate rabbitTemplate;
        
        /**
         * 生产者
         */
        @Test
        void producer() {
            
            /*
            发送消息
            参数 1:指定交换机。
            参数 2:指定路由标识。
            参数 3:消息内容。
             */
            rabbitTemplate.convertAndSend("amq.direct", "test_springboot_key", "Hello World!");
        }
        
    }
    
    

    运行代码后,查看可视化界面,可以看到创建了一个新的队列:

    绑定关系也已经建立:

  2. 实现消费者。

    消费者实际上就是一直等待消息然后进行处理的角色,这里只需要创建一个监听器就行了,它会一直等待消息到来然后再进行处理:

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 直连队列监听器
     * @author CodeSail
     */
    @Component
    public class DirectListener {
    
        /**
         * 定义此方法为队列test_springboot的监听器,一旦监听到新的消息,就会接受并处理
         * @param message 消息内容
         */
        @RabbitListener(queues = "test_springboot")
        public void customer(Message message){
            System.out.println(new String(message.getBody()));
        }
    }
    
  3. 启动服务。

    可以看到,成功消费了消息。

消费并反馈

如果需要确保消息能够被消费者接受并处理,然后得到消费者的反馈,也是可以的。

  1. 定义生产者。

    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    @SpringBootTest
    class RabbitMqSpringBootTests {
    
        /**
         * RabbitTemplate封装了大量的RabbitMQ操作,已经由Starter提供,因此直接注入使用即可
         */
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 生产者
         */
        @Test
        void producer() {
            
            // 会等待消费者消费然后返回响应结果
            Object res = rabbitTemplate.convertSendAndReceive("amq.direct", "test_springboot_key", "Hello World!");
            System.out.println("收到消费者响应:" + res);
        }
    
    }
    
  2. 定义生产者。

    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 直连队列监听器
     * @author CodeSail
     */
    @Component
    public class DirectListener {
    
        /**
         * 定义此方法为队列test_springboot的监听器,一旦监听到新的消息,就会接受并处理
         * @param message 消息内容
         */
        @RabbitListener(queues = "test_springboot")
        public String customer(String message){
            System.out.println("1号消息队列监听器:" + message);
            return "收到!";
        }
    }
    
  3. 启动生产者发送消息。

    可以看到,消费完成后接收到了反馈消息。

Json消息

  1. 引入依赖。

    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.14.2</version>
    </dependency>
    
  2. 定义对象。

    import lombok.Data;
    
    /**
     * 用户
     */
    @Data
    public class User {
        
        /**
         * 姓名
         */
        private String name;
    
        /**
         * 年龄
         */
        private Integer age;
        
    }
    
  3. 定义Bean。

    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Exchange;
    import org.springframework.amqp.core.ExchangeBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.QueueBuilder;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * RabbitMQ配置类
     */
    @Configuration
    public class RabbitMqConfig {
    
        ...
    
        /**
         * 构建Json转换器
         * @return Json转换器
         */
        @Bean
        public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){
            return new Jackson2JsonMessageConverter();
        }
    }
    
  4. 定义消费者。

    import cn.codesail.rabbitmq.entity.User;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 直连队列监听器
     */
    @Component
    public class DirectListener {
    
        /**
         * 指定messageConverter为创建的Bean名称
         * @param user 用户
         */
        @RabbitListener(queues = "test_springboot", messageConverter = "jackson2JsonMessageConverter")
        public void receiver(User user) {
            System.out.println(user);
        }
    }
    
  5. 定义生产者。

    import cn.codesail.rabbitmq.entity.User;
    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    @SpringBootTest
    class RabbitMqSpringBootTests {
    
        /**
         * RabbitTemplate封装了大量的RabbitMQ操作,已经由Starter提供,因此直接注入使用即可
         */
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 生产者
         */
        @Test
        void producer() {
            
            // 发送Json消息
            User user = new User();
            user.setName("张三");
            user.setAge(18);
            rabbitTemplate.convertAndSend("amq.direct", "test_springboot_key", user);
        }
    
    }
    
  6. 启动生产者发送消息。

    可以看到,对象转成了Json,消费者接收到Json转为的对象。

Spring Boot操作RabbitMQ是十分方便的,也是现在的主流,后续都用这种方式演示。


  • 环境
    • JDK 17.0.6
    • Maven 3.6.3
    • SpringBoot 3.0.4
    • spring-boot-starter-amqp 3.0.4
    • jackson-databind 2.14.2