RabbitMQ 07 发布订阅模式

发布时间 2023-04-02 20:51:43作者: 程序航

发布订阅模式

发布订阅模式结构图:

比如信用卡还款日临近了,那么就会给手机、邮箱发送消息,提示需要去还款了,但是手机短信和邮件发送并不一定是同一个业务提供的,但是现在又希望能够都去执行,就可以用到发布订阅模式,简而言之就是,发布一次,消费多个

实现这种模式需要用到另一种类型的交换机,叫做fanout(扇出)类型,这是一种广播类型,消息会被广播到所有与此交换机绑定的消息队列中。

这里使用默认的扇出交换机:

  1. 定义配置类。

    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Exchange;
    import org.springframework.amqp.core.ExchangeBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * RabbitMQ配置类
     */
    @Configuration
    public class RabbitMqConfig {
        
        /**
         * 定义交换机,可以很多个
         * @return 交换机对象
         */
        @Bean
        public Exchange fanoutExchange(){
            return ExchangeBuilder.fanoutExchange("amq.fanout").build();
        }
    
        /**
         * 定义消息队列
         * @return 消息队列对象
         */
        @Bean
        public Queue fanoutQueue1(){
            return new Queue("fanoutQueue1");
        }
    
        /**
         * 定义绑定关系
         * @return 绑定关系
         */
        @Bean
        public Binding binding1(@Qualifier("fanoutExchange") Exchange exchange,
                                @Qualifier("fanoutQueue1") Queue queue){
            // 将定义的交换机和队列进行绑定
            return BindingBuilder
                    // 绑定队列
                    .bind(queue)
                    // 到交换机
                    .to(exchange)
                    // 使用自定义的routingKey
                    .with("")
                    // 不设置参数
                    .noargs();
        }
    
        /**
         * 定义消息队列
         * @return 消息队列对象
         */
        @Bean
        public Queue fanoutQueue2(){
            return new Queue("fanoutQueue2");
        }
    
        /**
         * 定义绑定关系
         * @return 绑定关系
         */
        @Bean
        public Binding binding(@Qualifier("fanoutExchange") Exchange exchange,
                               @Qualifier("fanoutQueue2") Queue queue){
            // 将定义的交换机和队列进行绑定
            return BindingBuilder
                    // 绑定队列
                    .bind(queue)
                    // 到交换机
                    .to(exchange)
                    // 使用自定义的routingKey
                    .with("")
                    // 不设置参数
                    .noargs();
        }
    }
    
  2. 定义消费者。

    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 发布订阅监听器
     */
    @Component
    public class FanoutListener {
    
        @RabbitListener(queuesToDeclare = {@Queue("fanoutQueue1")})
        public void receiver1(String message) {
            System.out.println("1号监听器:" + message);
        }
    
        @RabbitListener(queuesToDeclare = {@Queue("fanoutQueue2")})
        public void receiver2(String message) {
            System.out.println("2号监听器:" + message);
        }
    }
    
    

    为了避免监听时没有该队列而报错,可以采用queuesToDeclare = {@Queue("队列名称")}的形式,这样如果没有该队列会自动创建该队列。

  3. 定义生产者。

    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import java.util.concurrent.TimeUnit;
    
    @SpringBootTest
    class RabbitMqSpringBootTests {
    
        /**
         * RabbitTemplate封装了大量的RabbitMQ操作,已经由Starter提供,因此直接注入使用即可
         */
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 生产者
         */
        @Test
        void producer()  {
    
            rabbitTemplate.convertAndSend("amq.fanout", "", "Hello World");
        }
    
    }
    
    
  4. 启动生产者,发送消息。

    可以看到,发送一条消息,两个消费者都收到了消息,这就是发布订阅模式。