RabbitMQ 死信交换机、延迟队列、惰性队列

发布时间 2023-10-05 10:46:46作者: 乔京飞

如果一个队列设置了死信交换机,该队列的消息就有了极大的可靠性保障,当出现以下情况时,消息就会投递到死信交换机中:

  • 队列中的消息在被消费者处理后,抛出异常,返回了 nack 或者 reject
  • 如果队列设置了 ttl 或者消息本身设置了 ttl ,消息因为超时而未消费
  • 队列容量已经满了,后续发来的消息无法接收,直接被丢弃

有了死信交换机后,在保障消息的可靠性同时,也极大的简化了代码编写。死信交换机结合 ttl 队列也能够实现消息的延迟发送。由于延迟发送消息的使用场景非常广泛,因此 RabbitMQ 专门提供了延迟插件,进一步简化了延迟发送消息的代码。

队列满了,主要是因为服务器内存不足,这种情况其实是比较危险的,说明生产消息的速度超过了消费速度,已经造成了大量消息的累积。虽然 Spring AMQP 发送消息默认设置为持久化,但是当服务器内存不足时,会集中将一批消息持久化到硬盘中,此时会消耗一定的时间,造成 RabbitMQ 的不稳定,有可能会造成新生产的消息无法及时入队而丢失。当然增加消费者程序来提高消费速度是最佳解决方案,但是如果短期内无法增加消费者程序,我们可以将队列迅速变为惰性队列,将消息持续稳定的持久化到硬盘上,就可以确保消息可靠性,由于硬盘容量很大,将在足够长的时间内不会出现队列爆满的问题。

本篇博客主要通过代码的方式演示 RabbitMQ 以上消息可靠性保障措施,在博客的最后会提供源代码下载。


一、搭建工程

继续采用上篇博客通过 docker-compose 部署的 RabbitMQ 进行演示,搭建 SpringBoot 工程如下:

image

两个子工程没有引入任何依赖,主要使用父工程 pom 文件的依赖,父工程 pom 文件细节如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
         http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.jobs</groupId>
    <artifactId>rmq_dead_delay</artifactId>
    <packaging>pom</packaging>
    <version>1.0</version>
    <modules>
        <module>publish_msg</module>
        <module>consumer_msg</module>
    </modules>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.5</version>
    </parent>

    <dependencies>
        <!--在此主要使用 lombok 自带的 log 方法-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--Spring AMQP 消息队列依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--引入单元测试依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
</project>

二、死信交换机

死信交换机是在队列上进行设置,只需要在队列上增加 2 个参数设置即可:

  • x-dead-letter-exchange 参数设置死信交换机的名称
  • x-dead-letter-routing-key 参数设置私信交换机的路由 key

在上篇博客中,消费者本地重试次数耗尽后,我们设置的策略是投递到新的交换机中。有了死信交换机后,我们就不需要这么做了,本地重试次数耗尽后,仍然使用默认的丢弃策略,此时被丢弃的消息会通过死信交换机投递到其它队列中进行处理。

//用于死信交换机投递消息,当别的队列中消息处理错误被丢弃,或者超时未消费,则在该队列中进行处理
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "error.queue"),
    exchange = @Exchange(name = "test.exchange", type = ExchangeTypes.DIRECT),
    key = "error"
))
public void listenerDeadQueue(String msg) {
    log.info("接收到 dead.queue 消息:" + msg);
    //在这里可以记录日志,或将消息存储到数据库中,后续由人工进行干预处理
}

//接收到消息后,处理程序会报异常
//在 application.yml 中配置了本地重试 3 次,如果都是失败,默认会丢弃消息,返回 basic.reject
//由于已经为该队列设置了死信交换机和相应的路由 key,因此最终失败的消息会投递到 error.queue 中
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "data.queue", arguments = {
        //在队列上设置【死信交换机】和【路由key】
        @Argument(name = "x-dead-letter-exchange", value = "test.exchange"),
        @Argument(name = "x-dead-letter-routing-key", value = "error")}),
    exchange = @Exchange(name = "test.exchange", type = ExchangeTypes.DIRECT),
    key = "data"
))
public void listenerDataQueue(String msg) {
    log.info("接收到 data.queue 消息:" + msg);
    //以下代码会抛出异常
    Integer result = 1 / 0;
}

发送消息进行测试的代码如下:

//发送到 dead.queue
//消费者接收到消息后,处理过程中出现异常,本地重试 3 次,一共处理 4 次都抛异常,
//最终由死信交换机投递到 error.queue
@Test
void publishDeadTest() {
    String message = "dead message test";
    String exchange = "test.exchange";
    String rootingkey = "data";
    //发送消息
    rabbitTemplate.convertAndSend(exchange, rootingkey, message);
}

三、TTL 队列

一个队列可以设置 ttl 时间,表示消息最多可以在队列中的存放时间,如果过期未消费就会被丢弃。对于一个消息来说,也可以设置 ttl 时间,表示该消息的存活时间,过期未消费也会被丢弃。如果队列和消息同时设置了 ttl 时间,则以最短的 ttl 时间为准。

package com.jobs.listener;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TTLQueueConfig {

    //在 SpringAmqpListener 中通过注解绑定交换机和队列时,必须要有消息处理程序。
    //为了测试发送到队列中的消息超时后,自动由死信交换机投递到指定队列中,我们不需要消息处理程序
    //因此只能采用 @Bean 注解去声明交换机和队列并进行绑定。
    //如果在 SpringAmqpListener 设置 ttl 队列的话,只需要给队列设置 x-message-ttl 参数即可

    @Bean
    public DirectExchange ttlExchange(){
        return new DirectExchange("ttl.exchange");
    }

    @Bean
    public Queue ttlQueue(){
        return QueueBuilder
                .durable("ttl.queue")
                //设置队列中每条消息的存活时间只有 10 秒钟
                .ttl(10000)
                //给队列设置【死信交换机】和【路由key】
                .deadLetterExchange("test.exchange")
                .deadLetterRoutingKey("error")
                .build();
    }

    @Bean
    public Binding ttlBinding(){
        return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
    }
}

发送消息测试队列的 ttl 和消息的 ttl

//发送普通消息到 ttl.queue
//由于我们创建的 ttl.queue 具有以下 3 个特征:
//1 每条消息的存活时间只有 10 秒
//2 没有消息处理程序
//3 设置了死信交换机和路由 key
//因此 10 秒之后,消息就会被死信交换机投递到 error.queue
@Test
void publish_Normal_To_TTL_Test() {
    String message = "normal message to ttl queue test";
    String exchange = "ttl.exchange";
    String rootingkey = "ttl";
    //发送消息
    rabbitTemplate.convertAndSend(exchange, rootingkey, message);
}

//消息本身也可以设置 ttl 过期时间
//如果【消息】和【队列】都设置了 ttl 过期时间,以最短的 ttl 过期时间为准
//这里我们将消息的 ttl 过期时间设置为 5 秒,短于队列的 ttl 过期时间
//因此 5 秒之后,会被死信交换机投递到 error.queue 中
@Test
void publish_ttlmsg_To_TTL_Test() {
    String exchange = "ttl.exchange";
    String rootingkey = "ttl";
    Message message = MessageBuilder
        .withBody("ttl messsage to ttl queue test".getBytes(StandardCharsets.UTF_8))
        //设置消息的 ttl 过期时间为 5 秒
        .setExpiration("5000").build();
    //发送消息
    rabbitTemplate.convertAndSend(exchange, rootingkey, message);
}

通过上面的代码和实际执行效果可以发现:死信交换机和 ttl 队列相结合可以实现延迟发送消息的功能。


四、延迟队列

通过死信交换机和 ttl 队列配合实现延迟发送消息的方案,太过于繁琐。由于延迟发送消息的场景比较广泛,RabbitMQ 提供了延迟插件,可以将任意交换机设置为延迟交换机。当发送给延迟交换机的消息包含 x-delay 头时,会先将消息持久化到硬盘中,时间到后再投递到相应的队列中,实现延迟发送消息的功能,大大简化了代码的实现方案。

首先我们需要到官网上下载与 RabbitMQ 版本相同的延迟插件,由于我们使用的 RabbitMQ 的版本是 3.12,因此我们需要下载 3.12 版本的延迟插件,由于插件的下载地址是 github,属于国外网站,偶尔才能访问,可以在不同的时间尝试访问。

RabbitMQ 延迟插件的下载地址为:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

安装插件非常简单,只需要将插件文件放到 RabbitMQ 的插件目录中,再运行启用插件的命令即可。由于我使用 docker-compose 部署的 RabbitMQ,容器中插件目录是 /plugins ,因此首先我们需要将插件文件拷贝到该目录中。

# 可以通过 xftp 将文件上传到 /app/rabbitmq 目录中
# 然后运行以下命令将插件文件拷贝到容器的 /plugins 目录中
# 备注:我启动的 RabbitMQ 的容器名称是 rabbitmq
docker cp /app/rabbitmq/rabbitmq_delayed_message_exchange-3.12.0.ez rabbitmq:/plugins

然后进入容器中,使用 RabbitMQ 自带的命令启用插件即可:

# 进入容器中
docker exec -it rabbitmq bash
# 启用延迟插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在接收消息的处理方法上,我们声明一个新的交换机,通过 delayed = "true" 设置为延迟交换机即可:

//接收 header 设置了 x-delay 的延迟消息
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "test.queue"),
    //给 rabbitmq 安装了 delay message 插件后,给交换机设置 delayed 为 true,就变成了延迟交换机
    //当带有 x-delay 头的消息发送到该交换机后,消息先持久化到硬盘中,然后返回给生产者没有找到路由的错误异常,
    //因此生产者如果监听了消息是否由交换机发送到队列的话,就会收到 routing not found 的错误消息
    //此时生产者的回调方法中,需要判断返回的错误消息是否设置了 delay 以及 delay 值是否大于 0
    exchange = @Exchange(name = "delay.exchange", delayed = "true"),
    key = "delay"
))
public void listenDelayExchange(String msg) {
    log.info("接收到了 test.queue 的延迟消息:" + msg);
}

通过以下发送消息的代码进行测试验证:

//发送延迟消息测试
@Test
void publishDelayMessageTest() {
    String exchange = "delay.exchange";
    String rootingkey = "delay";
    Message message = MessageBuilder
        .withBody("delay messsage test".getBytes(StandardCharsets.UTF_8))
        //延迟消息,必须要包含 x-delay 头,设置延迟时间,此处设置为延迟 5 秒发送
        .setHeader("x-delay", 5000).build();
    //如果发送者监听消息是否由交换机发送到队列时,必须给消息设置全局唯一 id
    //CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    //rabbitTemplate.convertAndSend(exchange, rootingkey, message, correlationData);

    //将消息发送给延迟交换机
    rabbitTemplate.convertAndSend(exchange, rootingkey, message);
}

五、惰性队列

从 RabbitMQ 的 3.6.0 版本开始,就增加了惰性队列的功能,具有如下特征:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持海量的消息存储,存取性能取决于硬盘性能

可以通过 RabbitMQ 自带的命令添加策略,将已存在的队列设置为惰性队列,比如:

# 通过正则表达式,将以 lazy 开头的队列,设置为惰性队列
rabbitmqctl set_policy Lazy "^lazy" '{"queue-mode":"lazy"}' --apply-to queues

在代码中,只需要给队列设置 x-queue-mode 参数值为 lazy 即可变为惰性队列:

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "lazy.queue",
                   //将队列设置为惰性队列,消息会持续稳定的存储到磁盘中,用于解决消息大量累积的问题
                   arguments = @Argument(name = "x-queue-mode", value = "lazy")),
    exchange = @Exchange(name = "test.exchange"),
    key = "lazy"
))
public void listenLazyQueue(String msg) throws InterruptedException {
    log.info("接收到了 lazy.queue 的消息:" + msg);
    //模拟处理消息比较慢,造成消息无法及时消费处理掉,在队列中进行累积
    //如果采用惰性队列,则能够持续稳定的存储到硬盘中,防止消息丢失
    Thread.sleep(5000);
}

发送消息给惰性队列进行测试,这里发送 100 万条消息进行测试:

//短时间内,发送 100 万条消息到惰性队列中,可通过 RabbitMQ 的 web 控制台观察持久化的稳定性
@Test
void publishLazyMessageTest() {
    String exchange = "test.exchange";
    String rootingkey = "lazy";
    String message = "test lazy message %d";
    for (int i = 0; i < 1000000; i++) {
        //发送消息
        rabbitTemplate.convertAndSend(exchange, rootingkey, String.format(message, i));
    }
}

到此为止,已经介绍完毕,所有代码都经过测试无误,这里就不截图展示具体执行效果了,可自行下载源代码测试验证。

本篇博客的源代码下载地址为:https://files.cnblogs.com/files/blogs/699532/rmq_dead_delay.zip