rabbitmq死信队列

发布时间 2023-09-02 20:22:38作者: 自学Java笔记本

死信的概念

死信队列(Dead Letter Queue)是指当消息无法被消费者正常消费时,将这些无法消费的消息发送到专门的死信队列中,以便进行进一步的处理。这种处理方式通常被称为“死信处理”。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息 消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时 间未支付时自动失效

在RabbitMQ中,死信队列通常用于以下几种情况:

  • 消息被拒绝:消费者在消费消息时,如果无法处理该消息,可以将该消息拒绝并返回到RabbitMQ。此时,RabbitMQ可以将这些被拒绝的消息发送到死信队列中,以便进行处理。

  • 消息过期:在发送消息时,可以设置消息的过期时间。如果消息在过期时间内没有被消费者消费,那么RabbitMQ会将这些过期的消息发送到死信队列中。

  • 队列达到最大长度:当队列达到最大长度时,新的消息将无法被发送到队列中。此时,RabbitMQ可以选择将这些无法发送的消息发送到死信队列中,以便进行进一步处理。

实战

image
说明:这里的交换机类型可以自己定义,不一定要求就是直连,根据业务场景选择不同的类型。
正常情况下,生产者发送消息到交换机上,然后到普通的队列,由消费者消费,但是当达到图中三个条件中的其中一个,此时消息会转为死信消息。由于消息不能丢失,要保持正常的消费,所以此时需要定义死信交换机和队列,由死信消费者去消费。

生产者

我将有关于队列的声明,交换机的声明,以及队列与交换机的绑定关系都写在生产者中。
在此之前还记得构建队列时,我们的有一个形参需要传递一个map嘛,这个map可以定义相关的参数
channel.queueDeclare(NORMAL_QUEUE,true,false,false,arguments);
例如普通队列与死信交换机的绑定关系

 // 2.1 构建参数:指定某种条件下达成的死信消息
        Map<String, Object> arguments = new HashMap<>();
        // 过期时间  10s = 10000ms  可以由生产者中设定----一般由生产者发送
        // arguments.put("x-message-ttl",10000);
        //正常队列设置过期之后的死信交换机,也就是如果消息成为了死信,将由队列发送到死信交换机上
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);  // 固定格式
        // 设置死信routingkey
        arguments.put("x-dead-letter-routing-key","del-queue");
        channel.queueDeclare(NORMAL_QUEUE,true,false,false,arguments);

完整代码如下:

/**
 * 死信生产者
 * 定义相关队列、交换机等
 */
public class DieProducer {
    // 定义普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    // 定义普通队列名称
    public static final String NORMAL_QUEUE = "normal_queue";
    // 定义 死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";
    // 定义 死信队列名称
    public static final String DEAD_QUEUE = "dead_queue";

    // 初始化队列
    public static void init( Channel channel) throws Exception{
        // 1.声明普通交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        // 2.声明普通队列
        // 2.1 构建参数:指定某种条件下达成的死信消息
        Map<String, Object> arguments = new HashMap<>();
        // 过期时间  10s = 10000ms  可以由生产者中设定----一般由生产者发送
        arguments.put("x-message-ttl",10000);
        //正常队列设置过期之后的死信交换机,也就是如果消息成为了死信,将由队列发送到死信交换机上
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);  // 固定格式
        // 设置死信routingkey
        arguments.put("x-dead-letter-routing-key","del-queue");
        channel.queueDeclare(NORMAL_QUEUE,true,false,false,arguments);
        // 3.声明死信交换机
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
        // 4.声明死信队列
        channel.queueDeclare(DEAD_QUEUE,true,false,false,null);

        //5.绑定队列与交换机的关系
        // 5.1 绑定普通队列与普通交换机的关系
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"h1");
        // 5.2 绑定死信队列与死信交换机的关系
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"del-queue");

    }

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();
        // 初始化操作
        init(channel);

        //发送消息
        for (int i = 1; i < 11 ; i++) {
            String message = "info "+i;
            channel.basicPublish(NORMAL_EXCHANGE,"h1",null,message.getBytes(StandardCharsets.UTF_8));
        }

    }
}

运行:
image

可以看到普通队列的消息都转发到了死信队列中了,这是因为设置了消息过期时间,从而满足了成为死信队列的条件之一

// 设置过期时间,当消息时间10秒还未消费,那么就成为死信队列
arguments.put("x-message-ttl",10000);
//正常队列设置过期之后的死信交换机,也就是如果消息成为了死信,将由队列发送到死信交换机上
 arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);  // 固定格式
// 设置死信routingkey
arguments.put("x-dead-letter-routing-key","del-queue");

普通消费者

代码很简单,只需要接收消息就好了

/**
 * 普通消费者,用于消费普通消息队列
 */
public class DieNormalConsumer {

    // 定义普通队列名称
    public static final String NORMAL_QUEUE = "normal_queue";


    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();

        // 定义消息成功的回调
        DeliverCallback nackCallback = ( consumerTag,  message)->{
            //接收消息,打印到控制台
            System.out.println("接收到普通队列的消息:"+new String(message.getBody()));
        };

        channel.basicConsume(NORMAL_QUEUE,true,nackCallback,(consumerTag)->{});
    }
}

死信消费者

同样的,只需要接收消息就好了


/**
 * 死信消费者,用于消费死信消息队列
 */
public class DieDeadConsumer {
    // 定义 死信队列名称
    public static final String DEAD_QUEUE = "dead_queue";


    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMQUtil.getChannel();

        // 定义消息成功的回调
        DeliverCallback nackCallback = ( consumerTag,  message)->{
            //接收消息,打印到控制台
            System.out.println("接收到死信队列的消息:"+new String(message.getBody()));
        };

        channel.basicConsume(DEAD_QUEUE,true,nackCallback,(consumerTag)->{});
    }
}

以上代码实现是基于TTL消息过期时间导致普通队列转换为死信队列的场景
我们知道,成为死信只需要满足三个条件中的其中一个即可

  • 消息被拒绝:消费者在消费消息时,如果无法处理该消息,可以将该消息拒绝并返回到RabbitMQ。此时,RabbitMQ可以将这些被拒绝的消息发送到死信队列中,以便进行处理。

  • 消息过期:在发送消息时,可以设置消息的过期时间。如果消息在过期时间内没有被消费者消费,那么RabbitMQ会将这些过期的消息发送到死信队列中。

  • 队列达到最大长度:当队列达到最大长度时,新的消息将无法被发送到队列中。此时,RabbitMQ可以选择将这些无法发送的消息发送到死信队列中,以便进行进一步处理。

队列达到最大长度成为死信

只需要在上面的案例中修改声明时的参数列表即可

//3.声明普通队列:因为如果消息称为死信,那么是由队列发送给死信交换机的。所以需要用到 arguments
Map<String, Object> arguments = new HashMap<>();
//正常队列设置过期之后的死信交换机
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);  // 固定格式
// 设置死信routingkey
arguments.put("x-dead-letter-routing-key","lisi");
arguments.put("x-max-length",6);  // 表示 队列最大长度为6,超过即为死信队列
channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);

例如我循环发送10条信息,但是队列最大长度为6,在未消费的情况下,剩余的4个消息就会成为死信

消息被拒成为死信

例如在消费者方,指定某个消息我不接收,那么该消息就会转到死信队列中

// 接收消息成功的回调函数
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    // 拒收 消息 info5,同时将他送会死信队列中
      if(message.equals("info5")){
             System.out.println("Consumer01 接收到消息" + message + "并拒绝签收该消息");
             //requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
             channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
      }else {
             System.out.println("Consumer01 接收到消息"+message);
             // 开启逐一应答
             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
         }
 };

总结参数

以上三个案例用到的参数:

 arguments.put("x-message-ttl",10000);
 //正常队列设置过期之后的死信交换机,也就是如果消息成为了死信,将由队列发送到死信交换机上
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);  // 固定格式
// 设置死信routingkey
arguments.put("x-dead-letter-routing-key","del-queue");
arguments.put("x-max-length",6);  // 表示 队列最大长度为6,超过即为死信队列