如何保证RabbitMQ消息不重复消费

发布时间 2023-04-15 10:30:48作者: 都是朕的江山

如何保证RabbitMQ消息不重复消费

消息中间件是无法保证消息重复消费,所以只能从业务上来保证消费不重复消费,在消费端保证接口的幂等性

什么是幂等性

幂等性原本是数学上的概念,用在接口上就可以理解为:同一个接口,多次发出同一个请求,必须保证操作只执行一次。
调用接口发生异常并且重复尝试时,总是会造成系统所无法承受的损失,所以必须阻止这种现象的发生。
比如下面这些情况,如果没有实现接口幂等性会有很严重的后果:
支付接口,重复支付会导致多次扣钱
订单接口,同一个订单可能会多次创建。

接口幂等性,一般是用在订单处理或者消费处理的场景的。使用接口幂等性是为了防止同一请求多次处理的状况。

重复消费的原因

  1. 消费方的业务项目从MQ队列中接收数据;
  2. 接着处理业务;
  3. 业务处理成功后,消费方项目给MQ返回ack进行手动确认;
  4. 返回回调执行结果的过程中,因为网络抖动等原因,回调数据时,MQ没有返回成功。所以MQ队列中的数据会再次发给业务项目,造成重复消费。

如何解决

RabbitMQ提供了消息确认机制和幂等性处理来保证不重复消费:

  • 消息确认机制:消费者在消费消息后向RabbitMQ服务器发送确认消息(ACK),RabbitMQ收到确认消息后才会将该消息从队列中删除,如果由于某种原因消费者未能发送确认消息,RabbitMQ会认为该消息未被正常处理,然后重新将该消息发送给其他消费者或者当前消费者进行重试。
  • 幂等性处理:通过在系统设计中引入唯一标识符,使得同一个消息可以被多次接收并处理,但只有第一次处理会对系统状态产生影响。例如,消费端可以在处理完一个消息后,在数据库中记录下该消息的ID,以便之后查询是否已经处理过该消息,如果已经处理过则直接忽略该消息。

消息确认机制

RabbitMQ的消息确认机制基于消费者向队列发送确认消息来告诉队列已经成功接收并处理了消息。当确认消息被队列接收后,队列就会将该消息从队列中删除,确保下次不会再次投递给同一个消费者。

如果消费者没有确认消息,RabbitMQ会将该消息重新投递给其他可用的消费者或者重新投递给当前消费者。这种方式可以确保消息不会丢失,并且能够在消费者异常退出或宕机时重新分发消息。

因此,通过按照上述机制进行消息确认,RabbitMQ可以避免消息重复消费的情况。

实现:

1.在这里,我们选择手动确认模式,使用acknowledgeMode参数指定消息确认模式为手动:

@Component
@RabbitListener(queues = RabbitMqConfig.ORDER_QUEUE, ackMode = "MANUAL")
public class OrderConsumer {

    @Autowired
    private OrderService orderService;

    @RabbitHandler
    public void handleMessage(OrderMessage orderMessage, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        try {
            orderService.handleOrder(orderMessage);
            // 手动确认消息已被消费
            channel.basicAck(tag, false);
        } catch (Exception e) {
            // 出现异常,拒绝消息并将其返回到队列中重新处理
            channel.basicReject(tag, true);
            e.printStackTrace();
        }
    }

}

在上面的代码中,我们使用了RabbitMQ的@RabbitListener注解来声明消息消费者,并使用@RabbitHandler注解来指定消息处理方法。在handleMessage方法中,我们处理了订单消息,并在处理成功后手动确认该消息已被消费,使用basicAck方法实现确认。如果在处理过程中出现异常,我们将拒绝该消息并将其返回到队列中,使用basicReject方法实现拒绝操作。

2.最后,我们需要编写消息生产者代码。在这里,我们构造一个订单消息对象,使用RabbitTemplate将消息发送到指定的交换机和队列:

@Service
public class OrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void placeOrder(Order order) {
        // 构造订单消息
        OrderMessage orderMessage = new OrderMessage();
        orderMessage.setOrderId(order.getId());
        orderMessage.setOrderNo(order.getOrderNo());
        orderMessage.setOrderAmount(order.getOrderAmount());
        // 发送订单消息
        rabbitTemplate.convertAndSend(RabbitMqConfig.ORDER_EXCHANGE, RabbitMqConfig.ORDER_ROUTING_KEY, orderMessage);
    }

    public void handleOrder(OrderMessage orderMessage) {
        // 订单处理逻辑
    }

}

幂等性处理

给每一个消息携带一个全局唯一的id

image-20230414162745786

可以在消息生产者服务中设置一个消息id,然后在消费者监听到消息后获取该id,再去查询这个id是否存在。如果不存在,则正常消费消息,并将消息的id存入数据库或者Redis中。如果存在,则丢弃此消息。例如:

// 消息生产者服务
public void sendMessage() {
    String messageId = UUID.randomUUID().toString();
    // 将消息id和消息体一起发送
    rabbitTemplate.convertAndSend("exchange", "routingKey", message, new CorrelationData(messageId));
}

// 消息消费者服务
@RabbitListener(queues = "queue")
public void handleMessage(Message message, Channel channel) throws Exception {
    String messageId = message.getMessageProperties().getCorrelationId();
    // 先去查询这个id是否存在
    if (!redisTemplate.opsForValue().setIfAbsent(messageId, "")) {
        // 如果存在则丢弃此消息
        return;
    }
    // 正常消费消息
    // ...
}