RabbitMQ消息应答

发布时间 2023-09-01 14:04:35作者: 自学Java笔记本

MQ中的消息应答

前言

在消息队列当中,生产者发送消息给消费者,其中生产者通过中间件也就是rabbitmq将消息存储到此处,由消费者从mq中获取消息,并处理之后的逻辑,由于mq默认是采用自动应答机制,消费者在获取消息后就会通知mq,mq此时就会将消息内容删除,但是此时处理业务逻辑时发生了某种错误需要重试的时候,那么这个消息已经被删除了,就找不到我们要的数据了,因此在某些场合情况下我们需要对消息应答机制做一个修改。

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了,
会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消 息标记为删除。在这种情况下
突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续 发送给该消费这的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,
消息应答就是:
消费者在接 收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权 衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢 失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制, 当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终 使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并 以某种速率能够处理这些消息的情况下使用

-----相对来说不是很靠谱
例如这段代码:

System.out.println(Thread.currentThread().getName()+"等待接收消息。。。。。");
// 接收消息
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

// 接收完消息之后就自动应答给MQ了,那么MQ就会删除消息,可是,在我们实际程序中,接收消息之后还有后续的代码
//  。。。。。例如在执行后续代码的时候,某个逻辑错误了,那么之后再去获取,就获取不到了,此数据就消失了。

image

手动应答

// 处理成功----false 表示不批量处理,批量处理容易丢失信息
// 例如队列中有 :5、6、7、8 四条数据,此时 8先出队列,应答了,后面的 5、6、7 也跟着应答了,如果
// 8 出错了,那么 5、6、7 就会丢失了
channel.basicAck(envelope.getDeliveryTag(), false);

// 处理失败 
channel.basicNack(envelope.getDeliveryTag(), false, true);
// 放弃消息,不重新入队
channel.basicNack(tag, false, false);
// 重新入队
channel.basicNack(tag, false, true);

默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改 为手动应答,消费者在上面代码的基础上增加下面画红色部分代码。

代码修改的逻辑如下:两行,自动应答改为false,在接收消息的回调函数上 手动应答。通过channel.basicAck()
image

在手动应答中,多个消费者情况下,其中一个消费者在处理任务时出现宕机,此时的消息会重新入队,让其正常运行的消费者去消费。