rabbitmq延迟队列

发布时间 2023-09-03 13:54:08作者: 自学Java笔记本

概念

所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费

使用场景

1、订单在十分钟之内未支付则自动取消
2、预定会议后,需要在预定时间点前十分钟通知各个与会人员参加会议。
3、淘宝七天自动确认收货,自动评价功能等

TTL(消息存活时间)

TTL 是 RabbitMQ 中一个消息或者队列的属性
表示一条消息或是该队列中的所有消息的最大存活时间,单位是毫秒;目前有两种方法可以设置消息的 TTL。
第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
第二种方法是对消息本身进行单独设置,每条消息的 TTL 可以不同。
如果两种方法一起使用,则消息的过期时间以两者之间较小的那个数值为准。消息在队列中的生存时间一旦超过设置的 TTL 值时,就会变成“死信”。

当设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列则会被丢到死信队列中)
当设置了消息的 TTL 属性,那么消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需要注意一点是,如果不设置 TTL,表示消息永远不会过期

RabbitMQ 中的 TTL

TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有 消息的最大存活时间,

单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这 条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。

  • 消息TTL
  • 队列TTL

场景

对于消息TTL来说,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者 之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间
场景:用户自定义发布文章,用户自定义闹钟提示

对于队列TTL来说,场景:需要固定的时间,例如订单支付,每个用户下单30分钟必须支付,否则取消。

如何使用,在整合springboot篇章讲解

解释

其实跟死信队列一样,大致流程就是 生产者发送消息到普通交换机,交换机与普通队列绑定,设置一个过期时间,当这个时间到了,通过参数使得普通队列的消息转发到死信队列,然后由死信消费者去消费。

image

整合springboot实现延迟队列

案例引入

创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交 换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:

image

引入依赖

<dependencies>

        <!--RabbitMQ 依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--swagger-->
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>
        <!--RabbitMQ 测试依赖-->
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

application.yml文件

spring:
  rabbitmq:
    port: 5672
    host: xxxxx
    username: xxxx
    password: xxxx

配置文件类

用于声明队列,交换机,路由key ,绑定关系等,这样就不需要消费者或者生产者端去声明了。

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * TTL队列 配置文件类
 */

@Configuration
public class TtlQueueConfig {
    // 普通交换机名称
    public static final String X_EXCHANGE = "x";
    // 普通队列名称
    public static final String QA_QUEUE = "QA";
    public static final String QB_QUEUE = "QB";

    // 死信交换机名称
    public static final String Y_DEAD_EXCHANGE = "Y";
    // 死信交换机名称
    public static final String QD_DEAD_QUEUE = "QD";

    // 声明普通交换机 x
    @Bean("xExchange")
    public DirectExchange xExchange(){
        return new DirectExchange(X_EXCHANGE);
    }

    // 声明死信交换机 y
    @Bean("yExchange")
    public DirectExchange yExchange(){
        return new DirectExchange(Y_DEAD_EXCHANGE);
    }

    // 声明普通队列 QA 需要携带TTL参数过期时间,同时要绑定死信交换机
    // QA 10s中过期
    @Bean("queueA")
    public Queue queueA(){
        Map<String,Object> arguments = new HashMap<>();
        // 绑定死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_EXCHANGE);
        // 设置死信routingkey
        arguments.put("x-dead-letter-routing-key","YD");
        // 设置过期时间ttl
        arguments.put("x-message-ttl",10000);
        return QueueBuilder.durable(QA_QUEUE).withArguments(arguments).build();
    }

    // 声明普通队列QB ttl过期时间40s
    @Bean("queueB")
    public Queue queueB(){
        Map<String,Object> arguments = new HashMap<>();
        // 绑定死信交换机
        arguments.put("x-dead-letter-exchange",Y_DEAD_EXCHANGE);
        // 设置死信routingkey
        arguments.put("x-dead-letter-routing-key","YD");
        // 设置过期时间ttl
        arguments.put("x-message-ttl",40000);
        return QueueBuilder.durable(QB_QUEUE).withArguments(arguments).build();
    }

    //声明死信队列 QD
    @Bean("queueD")
    public Queue queueD(){
        return QueueBuilder.durable(QD_DEAD_QUEUE).build();
    }


    //绑定关系  X交换机绑定 QA routking 为 XA。
    // @Qualifier 注解,可以根据声明的Bean名称进行自动注入
    @Bean
    public Binding queueABindX(@Qualifier("queueA") Queue queueA,
                               @Qualifier("xExchange") DirectExchange eXchange){

        return BindingBuilder.bind(queueA).to(eXchange).with("XA");
    }

    // 绑定关系, X交换机绑定队列QB,routking 为 XB
    @Bean
    public Binding queueABindB(@Qualifier("queueB") Queue queueB,
                               @Qualifier("xExchange") DirectExchange eXchange){

        return BindingBuilder.bind(queueB).to(eXchange).with("XB");
    }

    // 绑定关系, X交换机绑定队列QB,routking 为 XB
    @Bean
    public Binding queueABindY(@Qualifier("queueD") Queue queueD,
                               @Qualifier("yExchange") DirectExchange yXchange){

        return BindingBuilder.bind(queueD).to(yXchange).with("YD");
    }

}

生产者

/**
 * 发送延迟消息
 *  http://localhost:8080/ttl/sendMsg/嘻嘻嘻
 *
 *  发送的消息 通过url 去体现, 如 嘻嘻嘻 既是消息
 */

@RestController
@Slf4j
@RequestMapping("/ttl")
public class SendMsgController {

    @Autowired
    // 这是spring 提供的 操作MQ的模板引擎
    private RabbitTemplate rabbitTemplate;

    //发消息
    @GetMapping("sendMsg/{message}")
    public void sendMSg(@PathVariable String message){
        log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message);
        rabbitTemplate.convertAndSend("X","XA","消息来自TTL为10秒的队列:"+message);
        rabbitTemplate.convertAndSend("X","XB","消息来自TTL为40秒的队列:"+message);
    }

}

消费者

/**
 * 队列TTL  消费者
 */

@Slf4j
@Component
public class DeadLetterQueueConsumer {

    /**
    	为什么延迟队列,这是根据死信队列来的,死信队列形成的条件无非就是三种
    	1、消息延迟
    	2、拒收消息
    	3、队列长度
    	满足其中之一即可

    	当我们生产者发送消息给QA时,由于QA 没有消费此信息,10秒到期之后,自动发送到死信队列,由QD死信队列接收并且消费。这就是一种延迟消息。
    */

    //接收消息
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) throws Exception{
        String mes = new String(message.getBody(),"utf-8");
        log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),mes);
    }
}

测试

浏览器输入http://localhost:8080/ttl/sendMsg/你好啊
image

经过测试,我们可以指定队列的TTL过期时间,但是业务场景不一定都是固定的时间,例如如果用户需要自定义发布文章,自定义闹钟等场景,这个队列就不能使用了,所以我们可以使用消息TTL过期

消息TTL过期

在上诉代码架构的模型上在添加一个队列QC,绑定关系如下,该队列不设置TTL时间:
image

配置文件类代码

在原来的代码的基础中,在配置类中 声明一个QC ,同时进行绑定关系。即可

这个QC 队列 不用设置延迟消息时间,而是通过生产者来指定延迟消息的时间。这样就可以完成自定义延迟队列了。

// 优化代码-----创建一个普通队列QC
public static final String QUEUE_C = "QC";

//声明QC队列----->这里不用定义TTL时间,由生产者去定义
@Bean("queueC")
public Queue queueC(){
    Map<String,Object> arguments = new HashMap<>(3);
    // 设置 死信交换机
    arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
    // 设置死信routingkey
    arguments.put("x-dead-letter-routing-key","YD");
    return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
}

//绑定关系
@Bean
public Binding xExchangeBindQC(@Qualifier("xExchange") DirectExchange eExchange,
                               @Qualifier("queueC") Queue queueC){
    return BindingBuilder.bind(queueC).to(eExchange).with("XC");
}

生产者指定发送时间

//开始发送消息----》带有消息 和 TTL时间
@GetMapping("sendExporationMsg/{message}/{ttlTime}")
public void sendMSg(@PathVariable String message,@PathVariable String ttlTime){
    log.info("当前时间:{},发送一条时长{}毫秒TTL信息给QC队列:{}",new Date().toString(),ttlTime,message);
    
    rabbitTemplate.convertAndSend("X","XC","消息来自TTL为10秒的队列:"+message, msg->{
        
        // 设置发送消息时候的延迟时长
        msg.getMessageProperties().setExpiration(ttlTime);
        return msg;
    });
}

由于队列的特性是先进先出,那么例如我发起一个20秒的ttl消息过期的信息 在发送一个2秒消息过期的信息,但是在接收的时候,是先等到20秒的消息,在接收到2秒的消息,就不能实现根据消息ttl的粒度去执行了。

Rabbitmq 插件实现延迟队列

上文中提到的问题,确实是一个问题,如果不能实现在消息粒度上的 TTL,并使其在设置的 TTL 时间 及时死亡,就无法设计成一个通用的延时队列。那如何解决呢,接下来我们就去解决该问题。

安装延时队列插件

在官网上下载 https://www.rabbitmq.com/community-plugins.html,

下载 rabbitmq_delayed_message_exchange 插件,然后解压放置到 RabbitMQ 的插件目录。

插件目录:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins

进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启 RabbitMQ
执行下列命令

rabbitmq-plugins enable rabbitmq_delayed_message_exchange 安装延迟插件

重启:systemctl restart rabbitmq-server

image

代码架构图

在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:
image

可以看到,按照原来的方法,我们需要定义普通交换机,普通队列,死信交换机,死信队列
但是使用插件后,我们仅仅只需要定义交换机延迟,和一个队列就实现了延迟队列的功能了。

案例引入

image

在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并 不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才 投递到目标队列中。
声明交换机类型的时候,我们要注意,类型一定是 x-delayed-message
在springboot 中,由于这个类型是没有的,所以我们要自定义,通过CustomExchange 可以自定。
image

参数的说明:

  • name:交换机的名称
  • type:交换机的类型
  • durable:是否持久化
  • autoDelete:是否自动删除
  • arguments:参数----》如TTL、死信交换机等。x-dead-letter-routing-key" 这样的。
  • x-delayed-message---基于插件的交换机
@Configuration
public class DelayedQueueConfig {
    // 定义 交换机延迟名称
    public static final String DELAYED_EXCHANGE = "delayed.exchange";
    // 定义routingkey
    public static final String DELAYED_ROUTING_KEY = "delayed.routing.key";
    // 定义消息队列
    public static final String DELAYED_QUEUE = "delayed.queue";

    //声明交换机----交换机类型是:x-delayed-message---基于插件的交换机
    @Bean
    public CustomExchange delayedExchange(){
        //自定义交换机
        /**
         * 1、交换机的名称
         * 2、交换机的类型
         * 3、是否需要持久化
         * 4、是否需要自动删除
         * 5、参数
         */
        Map<String,Object> arguments = new HashMap<>();
        // 设置 死信交换机
        arguments.put("x-delayed-type","direct") ;
        return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,arguments);
    }

    //声明队列
    @Bean
    public Queue delayedQueue(){
        return QueueBuilder.durable(DELAYED_QUEUE).build();
    }

    //绑定
    @Bean
    public Binding delayedQueueBindCustomExchange(@Qualifier("delayedQueue") Queue queue,
                                                  @Qualifier("delayedExchange") CustomExchange customExchange){
        return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

生产者代码

// 开始发送消息,基于插件的消息 以及 延迟的时间
@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime){
    log.info("当前时间:{},发送一条时长{}毫秒TTL信息给延迟队列delayed.queue:{}",new Date().toString(),delayTime,message);
    rabbitTemplate.convertAndSend("delayed.exchange","delayed.routing.key",message,msg->{
        // 发送消息的时候, 延迟时长   单位 ms
        msg.getMessageProperties().setDelay(delayTime);
        return msg;
    });
}

消费者代码


/**
 * 基于插件的 延迟消息
 */
@Slf4j
@Component
public class DelayQueueConsumer {

    //监听消息
    @RabbitListener(queues = "delayed.queue")
    public void receiveDelayQueue(Message message){
        byte[] body = message.getBody();
        String meg = new String(body);
        log.info("当前时间:{},收到延迟队列的消息:{}",new Date().toString(),meg);
    }
}

发起请求:

http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000

http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000
image
第二个消息被先消费掉了,符合预期

总结

延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正 确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为 单个节点挂掉导致延时队列不可用或者消息丢失。