如何保证消息在99.99%的情况下不丢失

发布时间 2023-03-24 14:01:39作者: edda_huang
  1. 简介
    MQ虽然帮我们解决了很多问题,但是也带来了很多问题,其中最麻烦的就是,如何保证消息的可靠性传输。

我们在聊如何保证消息的可靠性传输之前,先考虑下哪些情况下会出现消息丢失的情况。

首先,上图中完整的展示了消息从生产到被消费的完整链路,我们通过图列举下各种情况。

Producer在把Message发送到Broker的过程中,因为网络不可靠的原因,可能会出现Message还未发送到Broker就丢失,或者Message发送到了Broker,但是由于某种原因,消息未保存到Broker。
Broker接收到Message数据存储在内存,Consumer还没消费,Broker宕机了。
Consumer接收到了Message,Message相关业务还没来得及处理,程序报错或者宕机了,Broker会认为Consunmer消息正常消费了,就把当前消息从队列中移除了。这种情况也算是消息丢失。
从上述的问题中我们可以总结出想要消息被正常消费,就得保证:

消息成功被Broker接收到。
消息可以被Broker持久化。
消息成功被Consumer接收并且当消费失败时,消息可以重回队列。
要有相应的补偿机制。(当任何一个环节出错时,可以进行消息 补偿)。
2. 消息的可靠投递
我们在使用MQ的时候,为了避免消息丢失或者投递失败。RabbitMQ为我们提供了两种方式来控制消息的投递可靠性。

confirm 确认模式
return 退回模式

如图所示:

消息从 producer 到 exchange 则会返回一个confirmCallback 。
消息从 exchange 到 queue 投递失败则会返回一个 ReturnsCallback 信息,其内容为ReturnedMessage实例信息。
我们将利用这两个 callback 控制消息的可靠性投递。

2.1 confirm
2.1.1 引入所需依赖

点击查看代码
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit-test</artifactId>
    <scope>test</scope>
</dependency>

2.1.2 application.yaml
spring:

点击查看代码
spring:
  rabbitmq:
    host: localhost
    port: 5672
    # rabbit 默认的虚拟主机
    virtual-host: /
    # rabbit 用户名密码
    username: admin
    password: admin123
    # 开启消息发送确认功能
    publisher-confirm-type: correlated
    # 高版本已弃用
#    publisher-confirms: true

2.1.3 ConfirmCallBack
package com.ldx.rabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

/**

  • 生产者消息确认回调方法

  • @author ludangxin

  • @date 2021/9/11
    */
    @Slf4j
    public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {

    /**
    *

    • @param correlationData 相关配置信息
    • @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
    • @param cause 失败原因
      */
      @Override
      public void confirm(CorrelationData correlationData, boolean ack, String cause) {
      log.info("MsgSendConfirmCallBack , 回调id: {}", correlationData);
      if(ack) {
      log.info("消息发送成功");
      }else {
      log.info("消息发送失败: {}", cause);
      }
      }
      }

2.1.3 RabbitConfig
package com.ldx.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig implements InitializingBean {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
 * 设置一个简单的队列
  */
@Bean(name = "durableQueue")
public Queue queue() {
    /*
     * 参数1:队列名称
     * 参数2:是否定义持久化队列
     * 参数3:是否独占本次连接
     * 参数4:是否在不使用的时候自动删除队列
     * 参数5:队列其它参数
     */
    return new Queue("helloRabbitMQ", true, false, false, null);
}

/**
 * bean 初始化后执行
 */
@Override
public void afterPropertiesSet() {
    // 设置消息确认回调类
    rabbitTemplate.setConfirmCallback(new MsgSendConfirmCallBack());
}

}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
31.
32.
33.
34.
35.
36.
37.
38.
39.
2.1.4 测试方法
这里两个测试方法,sentMsg()使用默认的Exchange,而sentMsg2()设置一个不存在的Exchange测试失败情况。

package com.ldx.rabbitmq;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.UUID;

@SpringBootTest
public class ProducerTest {

@Autowired
RabbitTemplate rabbitTemplate;

@Test
public void sentMsg(){
    String uuid = UUID.randomUUID().toString();
    CorrelationData correlationId = new CorrelationData(uuid);
    rabbitTemplate.convertAndSend("", "helloRabbitMQ","Hello RabbitMQ ~ ", correlationId);
}

@Test
public void sentMsg2(){
    String uuid = UUID.randomUUID().toString();
    CorrelationData correlationId = new CorrelationData(uuid);
    // 设置一个不存在的exchange 测试失败情况
    rabbitTemplate.convertAndSend("abc", "helloRabbitMQ","Hello RabbitMQ ~ ", correlationId);
}

}
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
20.
21.
22.
23.
24.
25.
26.
27.
28.
29.
30.
2.1.5 启动测试
sendMsg()方法日志如下:

2021-09-11 21:30:38.336 INFO 63112 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回调id: CorrelationData [id=8e9fc4b8-aa32-4e1b-a165-8a83457636ed]
2021-09-11 21:30:38.339 INFO 63112 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : 消息发送成功
1.
2.
sendMsg2()方法日志如下:

2021-09-11 21:32:27.377 INFO 63139 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回调id: CorrelationData [id=399c8d85-f010-433f-946c-419d9b9396c2]
2021-09-11 21:32:27.379 INFO 63139 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : 消息发送失败: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'abc' in vhost '/', class-id=60, method-id=40)
1.
2.
2.1.6 小结
Confirm 确认模式 是从Producer到Exchange。
Producer发送的消息正常或失败时都会进入Confirm Callback方法。
Producer发送消息的Exchange不存在时,Confirm Callback中的 Ack为false且Cause为发送失败原因。
2.2 return
2.2.1 application.yaml
spring:
rabbitmq:
host: localhost
port: 5672
# rabbit 默认的虚拟主机
virtual-host: /
# rabbit 用户名密码
username: admin
password: admin123
# 开启消息发送确认功能
publisher-confirm-type: correlated
# 高版本已弃用

publisher-confirms: true

# 开启失败退回功能
publisher-returns: true

2.2.2 ReturnCallback
这里注意下,网上很多提到的ReturnCallback(少了个s)接口已经弃用,注释中也提到了,弃用是为了更好的使用ReturnedMessage类,因为对象的方式可以更好的支持lambda表达式。

package com.ldx.rabbitmq.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

/**

  • 发生异常时的消息返回提醒

  • @author ludangxin

  • @date 2021/9/11
    */
    @Slf4j
    public class RabbitReturnCallback implements RabbitTemplate.ReturnsCallback {

    /**

    • Returned message callback.
    • @param returned the returned message and metadata.
      */
      @Override
      public void returnedMessage(ReturnedMessage returned) {
      log.info("消息主体: {}", returned.getMessage());
      log.info("回复编码: {}", returned.getReplyCode());
      log.info("回复内容: {}", returned.getReplyText());
      log.info("交换器: {}", returned.getExchange());
      log.info("路由键: {}", returned.getRoutingKey());
      }
      }

2.2.3 RabbitConfig
将RabbitReturnCallback设置到RabbitTemplate中。

/**

  • bean 初始化后执行
    */
    @Override
    public void afterPropertiesSet() {
    // 设置消息确认回调类
    rabbitTemplate.setConfirmCallback(new MsgSendConfirmCallBack());
    // 设置消息回退回调类
    rabbitTemplate.setReturnsCallback(new RabbitReturnCallback());
    }

2.2.4 测试方法
@Test
public void sentMsg3(){
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
// 设置一个不存在的routingkey 测试失败情况
rabbitTemplate.convertAndSend("", "helloRabbitMQ1", "Hello RabbitMQ ~ ", correlationId);
}
1.
2.
3.
4.
5.
6.
7.
2.2.5 启动测试

sentMsg()

2021-09-11 22:12:24.079 INFO 63803 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回调id: CorrelationData [id=fb471c69-6c7b-48bc-89aa-ae70ac1ed6f8]
2021-09-11 22:12:24.081 INFO 63803 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : 消息发送成功

sentMsg2()

2021-09-11 22:13:42.910 INFO 63825 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回调id: CorrelationData [id=0e3211ee-a1ba-45e4-90f6-296be79def07]
2021-09-11 22:13:42.912 INFO 63825 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : 消息发送失败: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'abc' in vhost '/', class-id=60, method-id=40)

sentMsg3()

2021-09-11 22:14:23.600 INFO 63841 --- [nectionFactory1] c.l.r.config.RabbitReturnCallback : 消息主体: (Body:'Hello RabbitMQ ~ ' MessageProperties [headers={spring_returned_message_correlation=0a8db922-ff7c-4b13-86a3-04957a7359bc}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
2021-09-11 22:14:23.602 INFO 63841 --- [nectionFactory1] c.l.r.config.RabbitReturnCallback : 回复编码: 312
2021-09-11 22:14:23.603 INFO 63841 --- [nectionFactory1] c.l.r.config.RabbitReturnCallback : 回复内容: NO_ROUTE
2021-09-11 22:14:23.603 INFO 63841 --- [nectionFactory1] c.l.r.config.RabbitReturnCallback : 交换器:
2021-09-11 22:14:23.603 INFO 63841 --- [nectionFactory1] c.l.r.config.RabbitReturnCallback : 路由键: helloRabbitMQ1
2021-09-11 22:14:23.603 INFO 63841 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回调id: CorrelationData [id=0a8db922-ff7c-4b13-86a3-04957a7359bc]
2021-09-11 22:14:23.603 INFO 63841 --- [nectionFactory2] c.l.r.config.MsgSendConfirmCallBack : 消息发送成功
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
2.2.6 小节
Return 退回模式是从Exchange到Queue。
Return 给了 Producer。
Producer发送的消息即使Routing Key不正确,当Exchange接收失败后直接触发Confirm Callback,不会进入到Return Callback,因为还没到Exchange。
当Exchange正确接收消息,但是Routing Key设置错误, 触发Return Callback方法。
3. 消息的可靠消费
上文中我们提到了一种消息丢失的情况,即 Consumer接收到了Message,Message相关业务还没来得及处理,程序报错或者宕机了,Broker会认为Consunmer消息正常消费了,就把当前消息从队列中移除了。这种情况也算是消息丢失。

那能不能消息消费成功后再将消息从queue中移除呢?

答案肯定是可以的。

3.1 ACK确认机制

ACK指Acknowledge,确认。 表示消费端收到消息后的确认方式。

作用:
确认消息是否被消费者消费,消息通过ACK机制确认是否被正确接收,每个消息都要被确认。
默认情况下,一个消息被消费者正确消费就会从队列中移除
ACK确认模式
AcknowledgeMode.NONE :不确认
默认所有消息消费成功,会不断的向消费者推送消息。
因为RabbitMQ认为所有推送的消息已被成功消费,所以推送出去的消息不会暂存在broker,消息存在丢失的危险。
AcknowledgeMode.AUTO:自动确认
由spring-rabbit依据消息处理逻辑是否抛出异常自动发送ack(无异常)或nack(异常)到broker。
使用自动确认模式时,需要考虑的另一件事是消费者过载,因为broker会暂存没有收到ack的消息,等消费端ack后才会丢掉;如果收到消费端的nack(消费失败的标识)或connection断开没收到反馈,会将消息放回到原队列头部,导致消费者反复的在消费这条消息。
AcknowledgeMode.MANUAL:手动确认
手动确认则当消费者调用 ack、nack、reject 几种方法进行确认,手动确认可以在业务失败后进行一些操作,如果消息未被 ACK 则会发送到下一个消费者。
手动确认模式可以使用 prefetch,限制通道上未完成的(“正在进行中的”)发送的数量。也就是Consumer一次可以从Broker取几条消息。
如果忘记进行ACK确认
忘记通过basicAck返回确认信息是常见的错误。这个错误非常严重,将导致消费者客户端退出或者关闭后,消息会被退回RabbitMQ服务器,这会使RabbitMQ服务器内存爆满,而且RabbitMQ也不会主动删除这些被退回的消息。只要程序还在运行,没确认的消息就一直是 Unacked 状态,无法被 RabbitMQ 重新投递。更厉害的是,RabbitMQ 消息消费并没有超时机制,也就是说,程序不重启,消息就永远是 Unacked 状态。处理运维事件时不要忘了这些 Unacked 状态的消息。当程序关闭时(实际只要 消费者 关闭就行),消息会恢复为 Ready 状态。
3.2 配置application.yaml
spring:
rabbitmq:
host: localhost
port: 5672
# rabbit 默认的虚拟主机
virtual-host: /
# rabbit 用户名密码
username: admin
password: admin123
listener:
simple:
# manual 手动确认
acknowledge-mode: manual
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
3.3 Consumer
package com.ldx.rabbitmq.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

/**

  • 消费者

  • @author ludangxin

  • @date 2021/9/12
    */
    @Slf4j
    @Component
    public class RabbitMQListener {

    @RabbitListener(queues = "helloRabbitMQ")
    public void helloRabbitMq(Message message, Channel channel) throws IOException {
    MessageProperties messageProperties = message.getMessageProperties();
    log.info(messageProperties.toString());
    try {
    log.info(message.toString());
    log.info(new String(message.getBody()));
    int a = 1/0;
    channel.basicAck(messageProperties.getDeliveryTag(), false);
    } catch (Exception e) {
    // 当前的消息是否重新投递的消息,也就是该消息是重新回到队列里的消息
    if (messageProperties.getRedelivered()) {
    log.info("消息已重复处理失败,拒绝再次接收...");
    // 拒绝消息
    channel.basicReject(messageProperties.getDeliveryTag(), false);
    } else {
    log.info("消息即将再次返回队列处理...");
    channel.basicNack(messageProperties.getDeliveryTag(), false, true);
    }
    }
    }
    }

消费消息有三种回执方法,接下来先看下每个方法参数的含义。

3.3.1 basicAck
/**

  • Acknowledge one or several received
  • messages. Supply the deliveryTag from the
  • or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
  • containing the received message being acknowledged.
  • @see com.rabbitmq.client.AMQP.Basic.Ack
  • @param deliveryTag the tag from the received
  • @param multiple true to acknowledge all messages up to and
  • including the supplied delivery tag; false to acknowledge just
  • the supplied delivery tag.
  • @throws java.io.IOException if an error is encountered
    */
    void basicAck(long deliveryTag, boolean multiple) throws IOException;

deliveryTag:消息投递的标签号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行ack、nack、reject等操作。

multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。

举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。

3.3.2 basicNack
/**

  • Reject one or several received messages.
  • Supply the deliveryTag from the
  • or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
  • @see com.rabbitmq.client.AMQP.Basic.Nack
  • @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or
  • @param multiple true to reject all messages up to and including
  • the supplied delivery tag; false to reject just the supplied
  • delivery tag.
  • @param requeue true if the rejected message(s) should be requeued rather
  • than discarded/dead-lettered
  • @throws java.io.IOException if an error is encountered
    */
    void basicNack(long deliveryTag, boolean multiple, boolean requeue)
    throws IOException;

deliveryTag:表示消息投递序号。

multiple:是否批量确认。

requeue:值为 true 消息将重新入队列。

3.3.3 basicReject
basicNack :表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。

/**

  • Reject a message. Supply the deliveryTag from the
  • or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
  • containing the received message being rejected.
  • @see com.rabbitmq.client.AMQP.Basic.Reject
  • @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or
  • @param requeue true if the rejected message should be requeued rather than discarded/dead-lettered
  • @throws java.io.IOException if an error is encountered
    */
    void basicReject(long deliveryTag, boolean requeue) throws IOException;

deliveryTag:表示消息投递序号。

requeue:值为 true 消息将重新入队列。

3.4 启动测试
@Test
public void sentMsg() throws IOException {
String uuid = UUID.randomUUID().toString();
CorrelationData correlationId = new CorrelationData(uuid);
rabbitTemplate.convertAndSend("","helloRabbitMQ","Hello RabbitMQ111 ~ ", correlationId);
// 为了使进程阻塞
System.in.read();
}
1.
2.
3.
4.
5.
6.
7.
8.
在这里我们执行sentMsg()方法,输出日志如下:

从日志信息中我们可以看出,消息已成功被消费,并且当第一次消费失败后消息被重新放回了队列,并进行了再此消费,当再次失败后则放弃该条消息。

2021-09-12 00:47:03.451 INFO 66160 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : MsgSendConfirmCallBack , 回调id: CorrelationData [id=eb06a986-0e51-464a-8b8c-d2a8271c0008]
2021-09-12 00:47:03.452 INFO 66160 --- [nectionFactory1] c.l.r.config.MsgSendConfirmCallBack : 消息发送成功
2021-09-12 00:47:04.142 INFO 66160 --- [ntContainer#3-1] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@75181b50: tags=[[amq.ctag-C1o5ZRm1g0fxX-Q53CCZcw]], channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://admin@127.0.0.1:5672/,4), conn: Proxy@52f57666 Shared Rabbit Connection: SimpleConnection@3d96fa9e [delegate=amqp://admin@127.0.0.1:5672/, localPort= 58094], acknowledgeMode=AUTO local queue size=0
2021-09-12 00:47:04.157 INFO 66160 --- [ntContainer#3-2] c.l.rabbitmq.consumer.RabbitMQListener : MessageProperties [headers={spring_listener_return_correlation=7252a3e3-77d5-4985-a93c-0ee7a977d1a8, spring_returned_message_correlation=eb06a986-0e51-464a-8b8c-d2a8271c0008}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=helloRabbitMQ, deliveryTag=1, consumerTag=amq.ctag-GMJHJuVr22w1so4vhSp-dQ, consumerQueue=helloRabbitMQ]
2021-09-12 00:47:04.157 INFO 66160 --- [ntContainer#3-2] c.l.rabbitmq.consumer.RabbitMQListener : (Body:'Hello RabbitMQ111 ~ ' MessageProperties [headers={spring_listener_return_correlation=7252a3e3-77d5-4985-a93c-0ee7a977d1a8, spring_returned_message_correlation=eb06a986-0e51-464a-8b8c-d2a8271c0008}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=helloRabbitMQ, deliveryTag=1, consumerTag=amq.ctag-GMJHJuVr22w1so4vhSp-dQ, consumerQueue=helloRabbitMQ])
2021-09-12 00:47:04.158 INFO 66160 --- [ntContainer#3-2] c.l.rabbitmq.consumer.RabbitMQListener : Hello RabbitMQ111 ~
2021-09-12 00:47:04.158 INFO 66160 --- [ntContainer#3-2] c.l.rabbitmq.consumer.RabbitMQListener : 消息即将再次返回队列处理...
2021-09-12 00:47:04.162 ERROR 66160 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
2021-09-12 00:47:05.163 INFO 66160 --- [ntContainer#3-2] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@37695b29: tags=[[amq.ctag-GMJHJuVr22w1so4vhSp-dQ]], channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://admin@127.0.0.1:5672/,8), conn: Proxy@52f57666 Shared Rabbit Connection: SimpleConnection@3d96fa9e [delegate=amqp://admin@127.0.0.1:5672/, localPort= 58094], acknowledgeMode=AUTO local queue size=0
2021-09-12 00:47:05.186 INFO 66160 --- [ntContainer#3-3] c.l.rabbitmq.consumer.RabbitMQListener : MessageProperties [headers={spring_listener_return_correlation=7252a3e3-77d5-4985-a93c-0ee7a977d1a8, spring_returned_message_correlation=eb06a986-0e51-464a-8b8c-d2a8271c0008}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=helloRabbitMQ, deliveryTag=1, consumerTag=amq.ctag-0XT90qJ0AYEzyDr-cztV8g, consumerQueue=helloRabbitMQ]
2021-09-12 00:47:05.186 INFO 66160 --- [ntContainer#3-3] c.l.rabbitmq.consumer.RabbitMQListener : (Body:'Hello RabbitMQ111 ~ ' MessageProperties [headers={spring_listener_return_correlation=7252a3e3-77d5-4985-a93c-0ee7a977d1a8, spring_returned_message_correlation=eb06a986-0e51-464a-8b8c-d2a8271c0008}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=true, receivedExchange=, receivedRoutingKey=helloRabbitMQ, deliveryTag=1, consumerTag=amq.ctag-0XT90qJ0AYEzyDr-cztV8g, consumerQueue=helloRabbitMQ])
2021-09-12 00:47:05.186 INFO 66160 --- [ntContainer#3-3] c.l.rabbitmq.consumer.RabbitMQListener : Hello RabbitMQ111 ~
2021-09-12 00:47:05.186 INFO 66160 --- [ntContainer#3-3] c.l.rabbitmq.consumer.RabbitMQListener : 消息已重复处理失败,拒绝再次接收...
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
3.5 小节
消费方的ACK机制可以有效的解决消息从Broker到Consumer丢失的问题。但也要注意一点:消息的无限消费。

3.6 消息无限消费
如果消费端代码就像下边这样写的,思路很简单:处理完业务逻辑后确认消息, int a = 1 / 0 发生异常后将消息重新投入队列。

@RabbitHandler
public void processHandler(String msg, Channel channel, Message message) throws IOException {

    try {
        log.info("消费者 2 号收到:{}", msg);
        int a = 1 / 0;
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    } catch (Exception e) {
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    }
}

但是有个问题是,业务代码一旦出现 bug 99.9%的情况是不会自动修复,一条消息会被无限投递进队列,消费端无限执行,导致了死循环,CPU被瞬间打满了,而且rabbitmq management 只有一条未被确认的消息。

经过测试分析发现,当消息重新投递到消息队列时,这条消息不会回到队列尾部,仍是在队列头部。

消费者会立刻消费这条消息,业务处理再抛出异常,消息再重新入队,如此反复进行。导致消息队列处理出现阻塞,导致正常消息也无法运行,那该怎么处理呢?

第一种方法:是根据异常类型来选择是否重新放入队列。

第二种方法: 先将消息进行应答,此时消息队列会删除该条消息,然后通过channel.basicPublish()重新发布这个消息,异常消息就放在了消息队列尾部,,进而不会影响已经进入队列的消息处理。

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 重新发送消息到队尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
JSON.toJSONBytes(msg));
1.
2.
3.
4.
5.
但这种方法并没有解决根本问题,错误消息还是会时不时报错,后面优化设置了消息重试次数,达到了重试上限以后,手动确认,队列删除此消息,并将消息持久化入MySQL并推送报警,进行人工处理和定时任务做补偿。

  1. 总结
    4.1 持久化
    Exchange 要持久化 通过durable属性控制,true:持久化, 缺省:true。
    queue 要持久化 通过durable属性控制,true:持久化, 缺省:true。
    message 要持久化
    在springboot环境下,message模式也是持久化。

4.2 生产方确认Confirm
4.3 消费方确认Ack
4.4 Broker 高可用