RabbitMQ的机制

发布时间 2023-09-06 14:01:52作者: 九极致之术

1.RabbitMQ的保证消息的稳定性

1.1 消息的可靠投递

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。于是,我们开始思考,如何才能进行 RabbitMQ 的消息可靠投递呢?特别是在这样比较极端的情况,RabbitMQ 集群不可用的时候,无法投递的消息该如何处理呢?

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式

  • confirm 确认模式

  • return 退回模式

  • 消息从 producer 到 exchange 则会返回一个 confirmCallback 

  • 消息从 exchange-->queue 投递失败则会返回一个 returnCallback 

 

 

1.1.1 confirm确认模式实现

【注:confirm属于生产者一方】

【注:延续使用上一篇文章的项目工程模板】

首先要在ProduceSpringBoot项目工程中application配置文件中开启confirm,因为默认confirm是不开启的

【注:confirm开启对性能上会有影响,所以默认不开启,需要手动开启】

准备 :

  这里使用发布订阅模式

  1.需要准备一个交换机,这里就用 --> ConfirmTestExchange

  2.需要准备两个个队列,这里就用 --> ConfirmTestQueue01、ConfirmTestQueue02

  3.将交换机与两个队列进行绑定

#RabbitMQ confirm确认模式开启
spring.rabbitmq.publisher-confirm-type=correlated

在测试类中进行测试:

//confirm确认模式
@Test
public void testConfirm(){
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String s) {
            if(ack){
                System.out.println("生产者发送消息到交换机成功");
            }else {
                System.out.println("发送消息到交换机失败");
            }
        }
    });
    String msg = "Hello ConfirmTestExchange,This is ConfirmTestExchange Message";
    rabbitTemplate.convertAndSend("ConfirmTestExchange","",msg.getBytes());
}

测试运行:

随后故意将交换机名写错模拟异常情况:测试运行:

 

1.1.2 return退回模式实现

【注:return属于生产者一方】

【注:延续使用上一篇文章的项目工程模板】

首先要在ProduceSpringBoot项目工程中application配置文件中开启return,因为默认confirm是不开启的

准备:

这里使用路由模式

一台路由 这里用 --> ConfirmTestRouting

两个队列 这里用 --> ConfirmTestQueue01、ConfirmTestQueue02

将队列与路由之间进行绑定

 

#RabbitMQ return退回机制开启
spring.rabbitmq.publisher-returns=true
@Test
public void testReturning(){
    rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(Message message, int i, String replyText, String s1, String s2) {
            System.out.println("返回的内容:"+message);
            System.out.println("失败的原因:"+replyText);
        }
    });
    String msg = "Hello ConfirmTestExchange,This is ConfirmTestExchange Message";
    rabbitTemplate.convertAndSend("ConfirmTestRouting","warning",msg.getBytes());
}

【注:这里是故意将路由key写错,他将会找不到队列而报异常,这时就会触发return退回机制】

 

1.2 ACK消息确认机制

多个消费者同时收取消息,收取消息到一半,突然某个消费者挂掉,要保证此条消息不丢失,就需要acknowledgement机制,就是消费者消费完要通知服务端,服务端才将数据删除

这样就解决了

即使一个消费者出了问题,没有同步消息给服务端,还有其他的消费端去消费,保证了消息不丢的case

ACK实现:

【注:ACK消息确认属于消费一方】

准备:

需要准备一个队列用来消费消息,这里用 --> ConfirmTestQueue02

在ConmusptionSpringBoot项目中创建Config包下MyRabbitMQListener

需要在application中进行配置ACK信息

#RabbitMQ ACK确认信息
spring.rabbitmq.listener.simple.acknowledge-mode=manual

ack指Acknowledge,确认

表示消费端收到消息后的确认方式。 有三种确认方式:

自动确认:acknowledge="none"

手动确认:acknowledge="manual"

根据异常情况确认:acknowledge="auto" 【注:这种方式使用麻烦,并且不常用,不作讲解】

@Component
public class MyRabbitMQListener {
   @RabbitListener(queues = {"ConfirmTestQueue02"})
    public void myListener(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
       try {
           byte[] body = message.getBody();
           String msg = new String(body);
           System.out.println("收到的消息是:"+msg);
       }catch (Exception e){
           System.out.println("程序出现意外情况");
       }
    }
}

测试运行:

会发现,虽然消费了消息,但是并没有确认消息,保证了消息不丢失性

此时模拟异常且开启手动确认机制:

@RabbitListener(queues = {"ConfirmTestQueue02"})
public void myListener(Message message, Channel channel) throws IOException {
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
   try {
       byte[] body = message.getBody();
       String msg = new String(body);
       System.out.println("收到的消息是:"+msg);
       int num = 10/0;
       channel.basicAck(deliveryTag,true);//手动告诉rabbitmq对列
   }catch (Exception e){
       System.out.println("程序出现意外情况");
       //BOOlean requeue 是否要求对列继续发送该消息,设置它继续发送
       channel.basicNack(deliveryTag,true,true);
   }
}

【注:会发现,它不会消费这条消息,并且一直重复发出这一条消息】

保证消息的可靠性方式:

  1. 消息持久化: RabbitMQ的消息默认存在内存中的,一旦服务器意外挂掉,消息就会丢失 .

消息持久化需做到三点

 Exchange设置持久化
 Queue设置持久化
 Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息
  1. 生产方确认Confirm和Return机制

  2. ACK确认机制

  3. 设置集群镜像模式

 

2.RabbitMQ延迟队列

2.1 TTL

TTL 全称 Time To Live(存活时间/过期时间)

当消息到达存活时间后,还没有被消费,会被自动清除

RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间

为对列设置过期时间

//创建队列且为对列设置过期时间,20秒,【注:设置的过期时间单位是毫秒】
@Bean
public Queue queue01(){
    Queue queue = QueueBuilder.durable(aaa_queue01).withArgument("x-message-ttl",2000).build();
    return queue;
}

为消息设置过期时间

@Test
void aaa_queue02(){
    Map<String, Object> stringObjectHashMap = new HashMap<>();
    stringObjectHashMap.put("uid", UUID.randomUUID());
    stringObjectHashMap.put("Pid",225);
    stringObjectHashMap.put("num",1);
    stringObjectHashMap.put("monery",56.6);
    stringObjectHashMap.put("time",new Date());
    //为消息设置过期时间
    MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            message.getMessageProperties().setExpiration("3000");
            return message;
        }
    };
    rabbitTemplate.convertAndSend(MyRabbitMQConfig.aaa_Exchange,"*.orange.*",JSON.toJSONString(stringObjectHashMap),messagePostProcessor);//记得要把时间消息传过去
}

总概括:

设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。

设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。

如果两者都进行了设置,以时间短的为准。

3. RabbitMQ死信队列

死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX

图解:

死信队列的形成:

  1. 队列消息长度到达限制;

  2. 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;

  3. 原队列存在消息过期设置,消息到达超时时间未被消费;

4. 延迟队列 

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费

需求:

  1. 下单后,30分钟未支付,取消订单,回滚库存。

  2. 新用户注册成功7天后,发送短信问候。

实现方式:

  1. 定时器

  2. 延迟队列

通过消息队列完成延迟队列的功能

很可惜,在RabbitMQ中并未提供延迟队列功能。

但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

 

5.RabbitMQ保证幂等性

在编程中一个幂等操作的特点是其任意多次执行所产生的结果与一次执行的产生的结果相同,在mq中由于网络故障或客户端延迟消费mq自动重试过程中可能会导致消息的重复消费,那我们如何保证消息的幂等问题呢?也可以理解为如何保证消息不被重复消费呢,不重复消费也就解决了幂等问题

解决方式:

1、生成全局id,存入redis或者数据库,在消费者消费消息之前,查询一下该消息是否有消费过

2、如果该消息已经消费过,则告诉mq消息已经消费,将该消息丢弃(手动ack)

3、如果没有消费过,将该消息进行消费并将消费记录写进redis或者数据库中

 

【注:如果订单完成之后,需要为用户累加积分,又需要保证积分不会重复累加。那么再mq消费消息之前,先去数据库查询该消息是否已经消费,如果已经消费那么直接丢弃消息】


【注:以上存在视频讲解,这里将链接放入了百度云盘中】

RabbitMQ机制视频讲解百度云盘链接  提取码:6666


以上便是RabbitMQ的机制中的内容,如有漏缺请在下方留言告知,我会及时补充