Spring Boot中使用RabbitMQ完成延迟功能

发布时间 2023-11-09 18:02:59作者: 东方来客

MQ-消息队列简单来说就是将“消息”放到“队列”中,然后慢慢处理队列中的消息。
完成延迟功能总体的思路是将消息放到队列中,为消息设置过期时间,不直接处理这个队列中的消息,
等到消息过期,将它转到另一个队列进行处理,从而完成延迟功能。

基本概念

1. 队列

队列是RabbitMQ的内部对象,用来存储消息。多个消费者可以订阅同一个队列。

2. 死信队列

消息变成死信的几种情况:
1. 消息被拒绝,并且requeue为false。
2. 消息过期
3. 队列达到最大长度
这里采取第二种方式,设置过期时间。

3. 交换器

交换器常用类型: direct、topic、fanout、headers。
路由键和队列之间的匹配规则取决于交换机的类型。

  1. 对于 direct 交换机:

    • 当一个队列通过绑定键(binding key)与 direct 交换机绑定时,只有消息的路由键与绑定键完全匹配时,消息才会被路由到该队列。
  2. 对于 fanout 交换机:

    • fanout 交换机会将消息广播给所有与之绑定的队列,忽略消息的路由键。
  3. 对于 topic 交换机:

    • topic 交换机使用通配符匹配路由键和绑定键之间的关系。
    • 路由键可以包含一个或多个单词(以点分隔),例如 "stock.usd.nyse"。
    • 绑定键可以使用以下通配符进行匹配:
      • *:匹配一个单词。
      • #:匹配零个或多个单词。
    • 例如,绑定键 "stock.*.nyse" 可以匹配 "stock.usd.nyse",但不匹配 "stock.eur.nyse"。
  4. 对于 headers 交换机:

    • headers 交换机使用消息的头部信息来匹配队列。
    • 消息的头部信息是一组键值对。
    • 当消息的头部信息与队列绑定时指定的键值对完全匹配时,消息才会被路由到该队列。

根据交换机类型和绑定规则,RabbitMQ 可以灵活地将消息路由到与之匹配的队列中。根据实际需求,我们可以选择合适的交换机类型和绑定规则来实现灵活的消息路由。

绑定

RabbitMQ通过绑定将消息路由到指定队列,Binding的作用是将交换器和队列关联起来。

代码

添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置

@Configuration
public class RabbitMQConfig {
    /**
     * orderQueue是正常的队列,用于接收实时信息
     * @return Queue
     */
    @Bean
    public Queue orderQueue() {
        return new Queue("orderQueue");
    }

    /**
     * orderDelayQueue是延迟队列,用于接收延迟信息
     * @return Queue
     */
    @Bean
    public Queue orderDelayQueue() {
        HashMap<String, Object> args = new HashMap<>();
        // 通过x-dead-letter-exchange设置为死信队列
        args.put("x-dead-letter-exchange", "");
        // 设置死信路由键,死信会被路由到orderQueue中
        args.put("x-dead-letter-routing-key", "orderQueue");
        // 设置消息过期时间,单位毫秒
        args.put("x-message-ttl", 60000);
        return new Queue("orderDelayQueue", true, false, false, args);
    }

    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange("orderExchange");
    }

    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("orderRoutingKey");
    }

    @Bean
    public Binding orderDelayBinding() {
        return BindingBuilder.bind(orderDelayQueue()).to(orderExchange()).with("orderDelayRoutingKey");
    }
}

连接配置

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

发送消息

@Service
public class OrderService {
    private final RabbitTemplate rabbitTemplate;

    @Autowired
    public OrderService(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    public void sendOrder(String  params) {
        var id = "1";
        rabbitTemplate.convertAndSend("orderExchange", "orderRoutingKey", params, message -> {
            message.getMessageProperties().setMessageId(id);
            return message;
        });
    }

    public void sendDelayedOrder(Date pickUpTime, String param) {
        var id = "2";
        // 发送消息到orderDelayQueue队列中
        rabbitTemplate.convertAndSend("orderExchange", "orderDelayRoutingKey", param, message -> {
            // 设置消息过期时间,单位毫秒
            message.getMessageProperties().setExpiration(String.valueOf(getMilSecond(pickUpTime)));
            message.getMessageProperties().setMessageId(id);
            return message;
        });
    }

    public long getMilSecond(Date pickupTime) {
        long deliveryTime = pickupTime.getTime();
        // 计算过期时间的毫秒数
        return deliveryTime - System.currentTimeMillis();
    }
}

消费信息

@Service
public class RabbitMQService {
    @RabbitListener(queues = "orderQueue")
    public void deal1(String params, Channel channel, Message message) {
        System.out.println("orderQueue" + params);
    }

    @RabbitListener(queues = "orderQueue")
    public void deal2(String params, Channel channel, Message message) {
        System.out.println("orderQueue" + params);
    }