RabbitMQ 消费者可靠性——失败重试机制

发布时间 2023-12-03 22:46:16作者: 嘎嘎鸭2

 

效果:

消费者抛异常后,会本地重试,如果本地重试次数达到最大重试次数之后,直接给队列返回reject,队列收到后就会丢弃该消息,也就是策略的第一种

但就这样把删了不太好,所以有了失败消息处理策略

 

 第二种 ImmediateRequeueMessageRecoverer:消费者抛异常后,会本地重试,如果本地重试次数达到最大重试次数之后,再给MQ返回nack,消息重新入队

 

第三种方法 RepublishMessageRecoverer:

 error.direct:接收失败消息的交换机

error:接收失败消息的交换机 与 队列绑定时的 RoutingKey

代码示例:

package com.itheima.config_RabbitMQ;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
public class ErrorConfiguration {
@Bean
public DirectExchange errorExchange() {//交换机
return new DirectExchange("error.direct");
}

@Bean
public Queue errorQueue() {//队列
return new Queue("error.queue");
}

@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) {
return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
}

@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}