ack机制中取值为手动ack的作用是什么?

发布时间 2023-09-15 21:58:18作者: qd372502
ack机制中取值为手动ack的作用是什么?
 
手动 ACK(Acknowledgement)机制在消息消费中有着重要的作用。它允许消费者自主决定何时确认(ACK)已经处理完一条消息。
 
当消息被消费者接收后,默认情况下,消息代理(如RabbitMQ、Kafka)会认为消息已经被成功处理,并将其从队列中删除。这种自动确认(Auto ACK)的方式适用于那些无需保证消息处理可靠性的场景,比如日志记录等。
 
而手动 ACK 机制则提供了更高的可靠性,它允许消费者在消息被正确处理后,显式地发送 ACK 消息给消息代理。这种方式的使用需要消费者显示调用消息代理提供的ACK接口。
 
手动 ACK 的主要作用是:
1. 确保消息处理的可靠性:当消费者从队列中接收到一条消息后,只有在消息处理完成且处理结果符合预期时,消费者发送ACK消息,告知消息代理可以删除该消息。如果消息处理失败,消费者可以选择不发送ACK消息,使消息重新进入队列等待重新处理。
2. 控制消息消费速率:通过手动 ACK,消费者可以根据自身业务处理能力,调整消息的消费速率。消费者可以在处理完一条消息后再次请求下一条消息,从而实现流控和限制消费的速度,保证系统的稳定性和性能。
 
手动 ACK 机制帮助我们提高了消息处理的可靠性和控制消费速率的能力,确保消息被正确处理和消费者适应不同场景的需求。在使用手动 ACK 时,消费者需要注意消息处理的正确性和及时发送 ACK 消息,以确保消息顺利被删除。
 
一个通过手动ack和分布式锁保证消息不重复消费的例子(场景:可能是消费者消费失败后消息重回消息队列)
    public void handleMessage2(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long tag) throws IOException {
String msgId = message.getMessageProperties().getHeader("msgId");
String lockKey = "lock:" + msgId;
String ifConsumeKey = "ifConsume:" + msgId;
// 尝试获取锁,设置过期时间,避免死锁
boolean lockAcquired = redisTemplate.opsForValue().setIfAbsent(lockKey, "0",300, TimeUnit.SECONDS);
redisTemplate.opsForValue().setIfAbsent(ifConsumeKey, "0");
String ifConsume = (String) redisTemplate.opsForValue().get(ifConsumeKey);
if (lockAcquired) {
if("0".equals(ifConsume)){
try {
// 处理消息逻辑
System.out.println("消费者2处理消息:" + new String(message.getBody()));
//处理完消息后,设置状态为1
redisTemplate.opsForValue().set(ifConsumeKey, "1");
int i=1/0;
//手动ack
channel.basicAck(tag,false);
}
catch(Exception e){
e.printStackTrace();
channel.basicNack(tag,false,true);
}
finally {
// 释放锁
System.out.println("消费者2释放锁");
redisTemplate.delete(lockKey);
}
}else{
//手动ack
//channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
channel.basicAck(tag,false);
System.out.println("消费者2发现:消息已经消费完成");
}
} else {
// 其他消费者正在处理该消息,直接返回
System.out.println("其他消费者正在处理该消息,直接返回:" + new String(message.getBody()));
}
}