RabbitMq的死信队列

发布时间 2023-08-16 15:52:03作者: 我隔壁是老王

参考博客:
https://blog.csdn.net/weixin_59074080/article/details/130673121
https://blog.csdn.net/m0_46979453/article/details/127229005
https://zhuanlan.zhihu.com/p/582787597?utm_id=0

什么是死信队列

正常情况下,一条消息自生产者发布到broke,然后转发到队列中,最后被订阅了(一般是绑定的路由键和发布消息时的路由键相同)该队列的消费者所消费。


消费者在消费生产者生产的消息时发生了某些特殊情况,导致消息无法被正常消费,这些消息会被重新发送到另一个交换机中,这个交换机就是DLX(死信交换机),绑定DLX的队列就称之为死信队列。


DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定

image

产生死信消息的原因

1、消息在队列的存活时间超过设置的生存时间(TTL)时间。

2、队列达到最大的长度 (队列容器已经满了。)

3、消费者消费多次消息失败,就会转移存放到死信队列中。

死信消息需要进行特殊处理,不然可能导致消息中间件不可用,或者业务没法正常提供服务。

如果我们配置了死信队列,那么死信消息将会被丢进死信队列中;如果没有配置死信队列,则该消息将会被丢弃。

如何配置死信交换机和死信队列

需要三步:

1、配置业务队列,绑定到业务交换机上。

这一步和死信队列没有关系,但是没有业务队列,也就没有所谓的死信队列,因此,将这一步排在第一。

2、为业务队列配置死信交换机和路由键。

没有死信交换机,也就没有所谓的死信队列,两者还要通过路由键绑定在一起。

3、为死信交换机配置死信队列。

死信消息最终是要传递到死信队列中的,因此必须配置一个死信队列。

所谓死信交换机(DLX),其实也是一个普通的交换机,它可以是任何类型的交换机,direct、topic、fanout、headers都可以。

配置死信交换机有两种方式:

1、通过rabbitmqctl命令配置

linux系统:

rabbitmqctl set_policy DLX ".*"  '{"dead-letter-exchange":"my-dlx", "dead-letter-routing-key":"rk001"}'

window系统:

rabbitmqctl set_policy DLX ".*"  "{""dead-letter-exchange"":""my-dlx"",""dead-letter-routing-key"":""rk001""}"

注意单引号和双引号的不同!

2、在声明队列时,通过队列参数x-dead-letter-exchange配置

// 声明交换机
channel.exchangeDeclare("some.exchange.name", "direct");
Map<String, Object> args = new HashMap<String, Object>();
// 设置死信交换机
args.put("x-dead-letter-exchange", "some.exchange.name");
// 设置死信路由键
args.put("x-dead-letter-routing-key", "some-routing-key");
// 声明队列
channel.queueDeclare("myqueue", false, false, false, args);

需要注意的是,当我们同时通过以上两种方式配置了私信交换机,通过队列参数配置的优先级更高,它会覆盖通过rabbitmqctl命令配置的信息。

但是,更推荐通过rabbitmqctl命令配置,因为可以在不重新部署应用程序就可以配置。


参数dead-letter-exchange:指定死信交换机。

参数dead-letter-routing-key:指定死信路由键,用于绑定死信交换机和死信队列。

所谓死信队列(DLQ),也是一个普通的队列,只不过它是和死信交换机绑定的而已,在声明队列的时候,通过x-dead-letter-exchange参数和x-dead-letter-routing-key指定死信交换机以及死信路由键即可。

死信队列的场景应用场景

消息本身存在错误的场景中,如参数校验异常、关键字段缺失等

对于这类消息,无论重新入队消费多少次,都不可能被消费成功的,而且频繁入队会导致后续的正常的消息没法被消费,甚至造成消息积压,影响服务的整体可用性。

所以,我们设置一个合理的重试次数N,在消息消费失败时,先不将消息放入死信队列,而是在重新入队N次之后,如果消息依然没有被成功消费,这时候再将该消息放入死信队列中。

合理的重试次数可以避免由于网络波动导致的短暂不可用,错误地将正常的消费直接放入死信队列的问题。对于这种情况,重试之后,消息一般都可以重新消费成功。

死信对消息的影响

假设,一条消息投递到了交换机exchange_a,并指定routing key 为 rk001。

我们通过dead-letter-exchange为其设置了死信交换机exchange_dl,通过dead-letter-routing-key 为其设置了死信路由键 rk001_dl。

交换机的名称会改变

当这条消息由于消费失败,被投递到死信交换机之后,其交换机名称会变为exchange_dl,而不是原来的exchange_a。

路由键会改变

原来的routing key 是 rk001,消息投递到死信交换机之后,routing key变为 rk001_dl了。

路由键变化的前提是:我们通过参数 x-dead-letter-routing-key 指定死信路由键,死信路由键将会被替换成该参数对应的值。

如果没有通过dead-letter-routing-key 设置死信路由键,那么,该消息的路由键依然是原来的rk001。

消息头变化

一条消息变为死信消息之后,其Header中会有一些额外的参数。

  • x-first-death-exchange:第一次成为死信之前的交换机的名称。

  • x-first-death-reason:第一次成为死信的原因。

rejected:由于default-requeue-rejected 参数被设置为false,消息在重新进入队列时被拒绝。

expired :消息的存活时间超过了设置的过期时间。

maxlen : 队列内消息数量超过队列最大容量。

delivery_limit:消息返回的次数超过了限制(通过仲裁队列的策略参数delivery-limit设置)。
  • x-first-death-queue: 第一次成为死信之前的队列的名称

  • x-death: 历次被投入死信交换机的信息列表,同一个消息每次进入一个死信交换机,这个数组的信息就会被更新。

[
    {
      "reason": "rejected",
      "count": 1,
      "exchange": "business.exchange",
      "time": 1669478090000,
      "routing-keys": [
        "rk.001"
      ],
      "queue": "business.queue"
    }
]

如上所示,x-death是一个json串,其有以下几个属性:

reason:该消息变为死信消息的原因

count:该消息投递到死信队列中的次数

exchange:该消息在投递到死信队列之前的交换机

time:该消息被投递到死信队列的时间戳

routing-keys:该消息在投递到死信队列之前的路由键

queue:该消息在投递到死信队列之前所在的队列

original-expiration:消息的原始过期属性。如果一个消息是因为超过存活时间而过期,会展示这个属性。另外,过期属性将从死信消息中删除,以防止其再次过期。

模拟死信队列

1、nack响应,且requeue = false

application.yml配置文件信息

spring:
  rabbitmq:
    host: 116.114.21.15
    port: 5672
    username: guest
    password: guest
    dynamic: true
    listener:
      simple:
        acknowledge-mode: manual
        default-requeue-rejected: false
    virtual-host: /

通过acknowledge-mode启用手动应答模式。

通过default-requeue-rejected设置消息被拒绝后,不再重新入队。

RabbitConfig

package com.panda.rabbitmq.config;

import lombok.Data;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
@Data
public class RabbitConfig {
    public static final String DEAD_LETTER_ROUTING_KEY = "rk.dead.letter.001";
    public static final String BUSINESS_ROUTING_KEY = "rk.001";
    public static final String DEAD_LETTER_QUEUE = "dead.letter.queue";
    public static final String BUSINESS_QUEUE = "business.queue";
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
    public static final String BUSINESS_EXCHANGE = "business.exchange";

    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.port}")
    private int port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    /**
     * 声明业务队列的交换机
     */
    @Bean("businessExchange")
    public DirectExchange businessExchange() {
        return new DirectExchange(BUSINESS_EXCHANGE);
    }

    /**
     * 声明死信交换机
     */
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    /**
     * 声明业务队列
     */
    @Bean("businessQueue")
    public Queue businessQueue() {
        Map<String, Object> args = new HashMap<>(16);
        // 设置当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // 设置当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
        return QueueBuilder.durable(BUSINESS_QUEUE).withArguments(args).build();
    }

    /**
     * 声明死信队列
     */
    @Bean("deadLetterQueue")
    public Queue deadLetterQueue() {
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }

    /**
     * 声明业务队列和业务交换机的绑定关系
     */
    @Bean
    public Binding businessBinding(@Qualifier("businessQueue") Queue queue,
                                   @Qualifier("businessExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(BUSINESS_ROUTING_KEY);
    }

    /**
     * 声明死信队列和死信交换机的绑定关系
     */
    @Bean
    public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue,
                                     @Qualifier("deadLetterExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY);
    }
}

配置业务交换机、业务队列、死信交换机、死信队列,并通过路由键将业务交换机和业务队列绑定在一起,将死信交换机和死信队列绑定在一起。

注意,我们在声明业务队列时,指定了两个参数x-dead-letter-exchange和x-dead-letter-routing-key,这个是关键,如果不配置这两个参数,那么在业务消息消费失败之后,是不会投递到死信交换机的。

BusinessProducer

业务生产者代码,发送业务消息。

import com.alibaba.fastjson.JSON;
import com.panda.rabbitmq.config.RabbitConfig;
import com.panda.rabbitmq.entity.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
@Slf4j
public class BusinessProducer {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void send(Order order) {
        rabbitTemplate.convertAndSend(RabbitConfig.BUSINESS_EXCHANGE, RabbitConfig.BUSINESS_ROUTING_KEY, 
            JSON.toJSONString(order));
    }
}

BusinessConsumer

import com.alibaba.fastjson.JSON;
import com.panda.rabbitmq.config.RabbitConfig;
import com.panda.rabbitmq.entity.Order;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
@Slf4j
public class BusinessConsumer {
    @RabbitListener(queues = RabbitConfig.BUSINESS_QUEUE)
    public void receive(Message message, Channel channel) throws IOException {
        Order order = JSON.parseObject(message.getBody(), Order.class);
        log.info("收到业务消息:{}", order);
        log.info("业务消息附带的头信息: {}", JSON.toJSONString(message.getMessageProperties().getHeaders()));
        try {
            if (StringUtils.isBlank(order.getStatus())) {
                throw new IllegalArgumentException("order's status can not be null!");
            }
        } catch (Exception e) {
            log.error("业务消息消费失败:{}", e.getMessage());
            // 消息消费异常后,返回一个nack响应
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
}

业务消费者代码。

通过@RabbitListener注解,标注一个方法为消费者。

在这个方法中,我们首先将消息打印出来,然后故意抛出一个IllegalArgumentException(其实是发送消息时,故意没有指定status),模拟消费失败的情况。

最后捕获这个异常,通过channel.basicNack给一个nack响应。

DeadLetterConsumer

import com.alibaba.fastjson.JSON;
import com.panda.rabbitmq.config.RabbitConfig;
import com.panda.rabbitmq.entity.Order;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
@Slf4j
public class DeadLetterConsumer {
    @RabbitListener(queues = RabbitConfig.DEAD_LETTER_QUEUE)
    public void receiveA(Message message, Channel channel) throws IOException {
        Order order = JSON.parseObject(message.getBody(), Order.class);
        log.info("收到死信消息:{}", order);
        log.info("死信消息附带的头信息: {}", JSON.toJSONString(message.getMessageProperties().getHeaders()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

死信消费者。

简单的打印出进入到死信队列的消息,然后手动给一个ack应答(在实际业务中,肯定不能这么简单的处理,但这里不是我们关注的重点)。

TestController

import com.panda.rabbitmq.deadletter.BusinessProducer;
import com.panda.rabbitmq.entity.Order;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
@RequestMapping("test")
public class TestController {

    @Resource
    private BusinessProducer businessProducer;

    @RequestMapping(value = "sendMsg")
    public void sendMsg() {
        Order order = new Order();
        order.setId("20221126000001");
        order.setType("1");
        businessProducer.send(order);
    }
}

简单的测试接口,发送一条消息,故意不设置status属性,为了模拟业务消费者消费失败。

image

测试结果

2022-11-26 23:54:50.061  INFO 32848 --- [ntContainer#0-1] c.p.r.deadletter.BusinessConsumer        : 
收到业务消息:Order(id=20221126000001, type=1, status=null)
2022-11-26 23:54:50.067  INFO 32848 --- [ntContainer#0-1] c.p.r.deadletter.BusinessConsumer        : 
业务消息附带的头信息: {}
2022-11-26 23:54:50.071 ERROR 32848 --- [ntContainer#0-1] c.p.r.deadletter.BusinessConsumer        : 
业务消息消费失败:order's status can not be null!
2022-11-26 23:54:50.084  INFO 32848 --- [ntContainer#1-1] c.p.r.deadletter.DeadLetterConsumer      : 
收到死信消息:Order(id=20221126000001, type=1, status=null)
2022-11-26 23:54:50.086  INFO 32848 --- [ntContainer#1-1] c.p.r.deadletter.DeadLetterConsumer      : 
死信消息附带的头信息: 
{
  "x-first-death-exchange": "business.exchange",
  "x-death": [
    {
      "reason": "rejected",
      "count": 1,
      "exchange": "business.exchange",
      "time": 1669478090000,
      "routing-keys": [
        "rk.001"
      ],
      "queue": "business.queue"
    }
  ],
  "x-first-death-reason": "rejected",
  "x-first-death-queue": "business.queue"
}

从上面显示的结果可以看出,业务消息在消费时失败了,而后被投递到死信交换机,因此,该死信消息才能被死信消费者所消费。

业务消息并没有附带任何头信息!而死信消息却附带了很多头信息。这些都头信息就是上面的【消息头变化】一节讲述的

代码示例——消息的存活时间超过设置的生存时间

application.yml

配置同上面的例子相同,不再赘述。

RabbitConfig

import lombok.Data;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
@Data
public class RabbitConfig {
    public static final String DEAD_LETTER_ROUTING_KEY = "rk.dead.letter.001";
    public static final String BUSINESS_ROUTING_KEY = "rk.001";
    public static final String DEAD_LETTER_QUEUE = "dead.letter.queue";
    public static final String BUSINESS_QUEUE = "business.queue";
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
    public static final String BUSINESS_EXCHANGE = "business.exchange";

    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.port}")
    private int port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    /**
     * 声明业务队列的交换机
     */
    @Bean("businessExchange")
    public DirectExchange businessExchange() {
        return new DirectExchange(BUSINESS_EXCHANGE);
    }

    /**
     * 声明死信交换机
     */
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    /**
     * 声明业务队列
     */
    @Bean("businessQueue")
    public Queue businessQueue() {
        Map<String, Object> args = new HashMap<>(16);
        // 设置当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // 设置当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
        // 设置消息的过期时间 单位:ms(毫秒)
//        args.put("x-message-ttl", 5000);
        return QueueBuilder.durable(BUSINESS_QUEUE).withArguments(args).build();
    }

    /**
     * 声明死信队列
     */
    @Bean("deadLetterQueue")
    public Queue deadLetterQueue() {
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }

    /**
     * 声明业务队列和业务交换机的绑定关系
     */
    @Bean
    public Binding businessBinding(@Qualifier("businessQueue") Queue queue,
                                   @Qualifier("businessExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(BUSINESS_ROUTING_KEY);
    }

    /**
     * 声明死信队列和死信交换机的绑定关系
     */
    @Bean
    public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue,
                                     @Qualifier("deadLetterExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY);
    }
}

在声明业务队列时,指定了两个参数x-dead-letter-exchange和x-dead-letter-routing-key,这个是关键,如果不配置这两个参数,那么在业务消息消费失败之后,是不会投递到死信交换机的。

另外,我们其实还配置了x-message-ttl参数,该参数指定队列中消息的过期时间,单位是毫秒。不过我们先把它注释掉。

BusinessProducer

业务生产者代码,发送业务消息。

在发送消息的同时,指定该消息的过期时间(5秒)。

@Component
@Slf4j
public class BusinessProducer {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void send(Order order) {
        MessagePostProcessor messagePostProcessor = message -> {
            // 设置消息过期时间为5秒
            message.getMessageProperties().setExpiration("5000");
            return message;
        };
        log.info("开始发送业务消息");
        rabbitTemplate.convertAndSend(RabbitConfig.BUSINESS_EXCHANGE, RabbitConfig.BUSINESS_ROUTING_KEY,
                JSON.toJSONString(order), messagePostProcessor);
    }
}

BusinessConsumer

将@Component注解和 @RabbitListener注解全都注释掉,这样就没有消费者消费业务队列中的消息,在超过我们设置的超时时间之后,消息会进入到死信队列中。

//@Component
@Slf4j
public class BusinessConsumer {
//    @RabbitListener(queues = RabbitConfig.BUSINESS_QUEUE)
    public void receive(Message message, Channel channel) throws IOException {
        Order order = JSON.parseObject(message.getBody(), Order.class);
        log.info("收到业务消息:{}", order);
        log.info("业务消息附带的头信息: {}", JSON.toJSONString(message.getMessageProperties().getHeaders()));
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            log.error("业务消息消费失败:{}", e.getMessage());
            // 消息消费异常后,返回一个nack响应
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
}

DeadLetterConsumer

配置同上面的例子相同,不再赘述。

TestController

配置同上面的例子相同,不再赘述。

测试结果

2023-08-1611: 11: 19.553INFO17776---[nio-8081-exec-1]e.d.c.r.deadLetter2.BusinessProducer: 开始发送业务消息2023-08-1611: 11: 24.711INFO17776---[ntContainer#2-1]e.d.c.r.deadLetter2.DeadLetterConsumer: 收到死信消息:Order(id=20221126000001,
type=1,
status=null)2023-08-1611: 11: 24.713INFO17776---[ntContainer#2-1]e.d.c.r.deadLetter2.DeadLetterConsumer: 死信消息附带的头信息:{
	"x-first-death-exchange": "business.exchange",
	"x-death": [{
		"reason": "expired",
		"original-expiration": "5000",
		"count": 1,
		"exchange": "business.exchange",
		"time": 1692155485000,
		"routing-keys": ["rk.001"],
		"queue": "business.queue"
	}],
	"x-first-death-reason": "expired",
	"x-first-death-queue": "business.queue"
}

如上所示。发送消息的时间是2022-11-27 00:37:56.910,由于没有消费者(我们注释掉了),该消息在超时之后,应该进入到死信队列中。

在上面的结果中,我们可以清楚地看到,收到死信消息的时间是2022-11-27 00:38:01.994,发送消息的时间和死信消费者消费死信消息的时间,比我们设置的超时时间(5秒)多一点点。

而且,从"reason":"expired"也可以看出,消息进入死信交换机的原因是超时(expired)了。

另外,由于该消息是由于超时进入死信队列的,所以x-death属性中有original-expiration属性,这一点和我们在上面的【消息头变化】一节的分析也是一致的。

注意点

在配置消息的过期时间时,有两种配置方式,一种是上面我们在发送消息时配置,如下所示:

public void send(Order order) {
        MessagePostProcessor messagePostProcessor = message -> {
            // 设置消息过期时间为5秒
            message.getMessageProperties().setExpiration("5000");
            return message;
        };
        log.info("开始发送业务消息");
        rabbitTemplate.convertAndSend(RabbitConfig.BUSINESS_EXCHANGE, RabbitConfig.BUSINESS_ROUTING_KEY,
                JSON.toJSONString(order), messagePostProcessor);
    }

一种是在声明队列时,通过x-message-ttl参数指定消息的超时时间,如下所示:

   /**
     * 声明业务队列
     */
    @Bean("businessQueue")
    public Queue businessQueue() {
        Map<String, Object> args = new HashMap<>(16);
        // 设置当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // 设置当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
        // 设置消息的过期时间 单位:ms(毫秒)
        args.put("x-message-ttl", 5000);
        return QueueBuilder.durable(BUSINESS_QUEUE).withArguments(args).build();
    }

这两种方式的区别是:

前者是指定单个消息的过期时间。而后者是指定整个队列中所有消息的超时时间。

代码示例——超出队列的最大长度限制

application.yml

配置同上面的例子相同,不再赘述。

RabbitConfig

import lombok.Data;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
@Data
public class RabbitConfig {
    public static final String DEAD_LETTER_ROUTING_KEY = "rk.dead.letter.001";
    public static final String BUSINESS_ROUTING_KEY = "rk.001";
    public static final String DEAD_LETTER_QUEUE = "dead.letter.queue";
    public static final String BUSINESS_QUEUE = "business.queue";
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.exchange";
    public static final String BUSINESS_EXCHANGE = "business.exchange";

    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.port}")
    private int port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    /**
     * 声明业务队列的交换机
     */
    @Bean("businessExchange")
    public DirectExchange businessExchange() {
        return new DirectExchange(BUSINESS_EXCHANGE);
    }

    /**
     * 声明死信交换机
     */
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    /**
     * 声明业务队列
     */
    @Bean("businessQueue")
    public Queue businessQueue() {
        Map<String, Object> args = new HashMap<>(16);
        // 设置当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // 设置当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
        // 设置消息的过期时间 单位:ms(毫秒)
//        args.put("x-message-ttl", 5000);
        // 设置队列中最大的消息容量
        args.put("x-max-length", 5);
        // 指定队列中消息达到最大限制之后的行为
        // 可选参数有:
        // drop-head(删除队列头部的消息)、
        // reject-publish(最近发来的消息将被丢弃)、
        // reject-publish-dlx(拒绝发送消息到死信交换器)
        // 注意,类型为quorum的队列只支持drop-head
        // x-overflow属性默认的处理策略是丢掉队列的头部的消息,或者将队列头部的消息投递到死信交换机
        // args.put("x-overflow", "reject-publish");
        return QueueBuilder.durable(BUSINESS_QUEUE).withArguments(args).build();
    }

    /**
     * 声明死信队列
     */
    @Bean("deadLetterQueue")
    public Queue deadLetterQueue() {
        return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
    }

    /**
     * 声明业务队列和业务交换机的绑定关系
     */
    @Bean
    public Binding businessBinding(@Qualifier("businessQueue") Queue queue,
                                   @Qualifier("businessExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(BUSINESS_ROUTING_KEY);
    }

    /**
     * 声明死信队列和死信交换机的绑定关系
     */
    @Bean
    public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue,
                                     @Qualifier("deadLetterExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY);
    }
}

注意,我们在声明业务队列时,指定了两个参数x-dead-letter-exchange和x-dead-letter-routing-key,这个是关键,如果不配置这两个参数,那么在业务消息消费失败之后,是不会投递到死信交换机的。

我们还配置了x-max-length参数,该参数指定队列中消息的最大容量(消息的条数,不是消息的字节大小)为5,即该队列中同时可以最多容纳5条消息。

另外,我们还配置了x-overflow参数,该参数指定了队列中的消息达到最大容量之后,再接收到消息时的处理策略,暂时先注释掉。

BusinessProduer

@Component
@Slf4j
public class BusinessProducer {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void send(Order order) {
        log.info("开始发送业务消息");
        rabbitTemplate.convertAndSend(RabbitConfig.BUSINESS_EXCHANGE, RabbitConfig.BUSINESS_ROUTING_KEY,
                JSON.toJSONString(order));
    }
}

生产者,发送一条消息。

BusinessConsumer

//@Component
@Slf4j
public class BusinessConsumer {
    //    @RabbitListener(queues = RabbitConfig.BUSINESS_QUEUE)
    public void receive(Message message, Channel channel) throws IOException {
        Order order = JSON.parseObject(message.getBody(), Order.class);
        log.info("收到业务消息:{}", order);
        log.info("业务消息附带的头信息: {}", JSON.toJSONString(message.getMessageProperties().getHeaders()));
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            log.error("业务消息消费失败:{}", e.getMessage());
            // 消息消费异常后,返回一个nack响应
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }
}

将@Component注解和 @RabbitListener注解全都注释掉,这样就没有消费者消费业务队列中的消息,当我们发送多条消息,超过指定的最大容量之后的消息就会进入到死信队列中。

DeadLetterConsumer

配置同上面的例子相同,不再赘述。

TestController

@RestController
@RequestMapping("test")
public class TestController {

    @Resource
    private BusinessProducer businessProducer;

    @RequestMapping(value = "sendMsg")
    public void sendMsg() {
        for (int i = 1; i <= 6; i++) {
            Order order = new Order();
            order.setId("2022112600000" + i);
            order.setType(i + "");
            order.setStatus(i + "");
            businessProducer.send(order);
        }
    }
}

我们在sendMsg方法中,循环发送6条消息,由于我们设置了消息的最大容量是5,且没有消费者,所以有一条消息会进入到死信队列中。

测试结果

2023-08-16 15:36:31.546  INFO 18504 --- [nio-8081-exec-2] e.d.c.r.deadLetter3.BusinessProducer     : 开始发送业务消息
2023-08-16 15:36:31.634  INFO 18504 --- [nio-8081-exec-2] e.d.c.r.deadLetter3.BusinessProducer     : 开始发送业务消息
2023-08-16 15:36:31.667  INFO 18504 --- [nio-8081-exec-2] e.d.c.r.deadLetter3.BusinessProducer     : 开始发送业务消息
2023-08-16 15:36:31.667  INFO 18504 --- [nio-8081-exec-2] e.d.c.r.deadLetter3.BusinessProducer     : 开始发送业务消息
2023-08-16 15:36:31.699  INFO 18504 --- [nio-8081-exec-2] e.d.c.r.deadLetter3.BusinessProducer     : 开始发送业务消息
2023-08-16 15:36:31.700  INFO 18504 --- [nio-8081-exec-2] e.d.c.r.deadLetter3.BusinessProducer     : 开始发送业务消息
2023-08-16 15:36:31.728  INFO 18504 --- [ntContainer#2-1] e.d.c.r.deadLetter3.DeadLetterConsumer   : 收到死信消息:Order(id=20221126000001, type=1, status=1)
2023-08-16 15:36:31.731  INFO 18504 --- [ntContainer#2-1] e.d.c.r.deadLetter3.DeadLetterConsumer   : 死信消息附带的头信息: {"x-first-death-exchange":"business.exchange","x-death":[{"reason":"maxlen","count":1,"exchange":"business.exchange","time":1692171392000,"routing-keys":["rk.001"],"queue":"business.queue"}],"x-first-death-reason":"maxlen","x-first-death-queue":"business.queue"}

日志打印了6次“开始发送业务消息”,说明我们发送了6条消息,而死信消费者消费了一条消息,而且消费的是最先发送到队列中的消息,也就是队列头部的消息。

也就是说当队列中消息容量达到最大值之后,如果依然有消息投递到该队列中,那么队列头部的消息会被投递到死信交换机中!

而且,从"reason":"maxlen",也可以看出,消息进入死信交换机的原因是超出最大限制(maxlen)了。

如果我们把RabbitConfig中配置改成如下:

/**
     * 声明业务队列
     */
    @Bean("businessQueue")
    public Queue businessQueue() {
        Map<String, Object> args = new HashMap<>(16);
        // 设置当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // 设置当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
        // 设置消息的过期时间 单位:ms(毫秒)
        // args.put("x-message-ttl", 5000);
        // 设置队列中最大的消息容量
        args.put("x-max-length", 5);
        // 指定队列中消息达到最大限制之后的行为
        // 可选参数有:
        // drop-head(删除队列头部的消息)、
        // reject-publish(最近发来的消息将被丢弃)、
        // reject-publish-dlx(拒绝发送消息到死信交换器)
        // 注意,类型为quorum的队列只支持drop-head
        // x-overflow属性默认的处理策略是drop-head,即丢掉队列的头部的消息,或者将队列头部的消息投递到死信交换机
        args.put("x-overflow", "reject-publish");
        return QueueBuilder.durable(BUSINESS_QUEUE).withArguments(args).build();
    }

即,给队列设置参数x-overflow,其属性值为reject-publish,结果如何呢?

2023-08-16 15:41:26.136  INFO 15936 --- [nio-8081-exec-1] e.d.c.r.deadLetter3.BusinessProducer     : 开始发送业务消息
2023-08-16 15:41:26.230  INFO 15936 --- [nio-8081-exec-1] e.d.c.r.deadLetter3.BusinessProducer     : 开始发送业务消息
2023-08-16 15:41:26.280  INFO 15936 --- [nio-8081-exec-1] e.d.c.r.deadLetter3.BusinessProducer     : 开始发送业务消息
2023-08-16 15:41:26.280  INFO 15936 --- [nio-8081-exec-1] e.d.c.r.deadLetter3.BusinessProducer     : 开始发送业务消息
2023-08-16 15:41:26.366  INFO 15936 --- [nio-8081-exec-1] e.d.c.r.deadLetter3.BusinessProducer     : 开始发送业务消息
2023-08-16 15:41:26.367  INFO 15936 --- [nio-8081-exec-1] e.d.c.r.deadLetter3.BusinessProducer     : 开始发送业务消息

IDEA控制台信息如上,可以看出6条消息确实是发送出去了,但是没有死信消费者的消费信息,说明多余那一条消息没有被投递到死信交换机之中。

我们再看RabbitMQ控制台,如下图所示,可以看出第6条消息被抛弃了(不是队头,是队尾的那条消息)。

image

踩坑

Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-max-length' for queue 'business.queue' in vhost '/': received the value '5' of type 'longstr' but current is none, class-id=50, method-id=10)

错误原因:

  RabbitMQ中已存在这个队列,但在启动的项目中对这个队列的属性进行了修改。RabbitMQ中的队列一经声明,其属性不可修改。

解决方法:

  删除该队列并重新声明

注意点

设置队列容量的最大限制有两种方式:

一,通过rabbitmqct命令,如下所说:

rabbitmqctl set_policy ploicy_name "queue_name" '{"max-length-bytes":1048576}'

二、通过x-max-length参数,如下所示。

    @Bean("businessQueue")
    public Queue businessQueue() {
        Map<String, Object> args = new HashMap<>(16);
        // 设置当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // 设置当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
        // 设置消息的过期时间 单位:ms(毫秒)
        // args.put("x-message-ttl", 5000);
        // 设置队列中最大的消息容量
        args.put("x-max-length", 5);
        // 指定队列中消息达到最大限制之后的行为
        // 可选参数有:
        // drop-head(删除队列头部的消息)、
        // reject-publish(最近发来的消息将被丢弃)、
        // reject-publish-dlx(拒绝发送消息到死信交换器)
        // 注意,类型为quorum的队列只支持drop-head
        // x-overflow属性默认的处理策略是drop-head,即丢掉队列的头部的消息,或者将队列头部的消息投递到死信交换机
        args.put("x-overflow", "reject-publish");
        return QueueBuilder.durable(BUSINESS_QUEUE).withArguments(args).build();
    }

注意,如果我们同时使用这两种方式设置了队列的最大长度,那么较小的值将被使用!另外,只有处于ready状态(在RabbitMQ中,消息有2种状态:ready 和 unacked)的消息被计数,未被确认的消息不会被计数受到limit的限制。