RabbitMQ如果保证消息可靠性

发布时间 2023-09-15 11:36:15作者: 程长新


这是RabbitMQ消息从生产者到消费者的流程。
从图中可以看出消息可能在以下几个地方丢失

  1. 生产者处丢失:消息没有正确到达RabbitMQ的交换机。 解决策略:confirm机制
  2. RabbitMQ本身将消息丢失:因为一些原因导致RabbitMQ重启,导致内存中的消息丢失。 解决策略:消息持久化
  3. 消费者处丢失:消费者拿到消息之后还没有消费完就宕机了,导致消息丢失。 解决策略:手动ack
    今天先说一个confirm机制和手动ack,持久化下次再说
    消息会经过三次传输
  4. 从生产者到交换机
  5. 从交换机到队列
  6. 从队列到消费者
    RabbitMQ也提供了对应的机制来保证这三次消息传输的可靠性,如下图
  7. 开启confirm机制后为每条消息分配一个唯一id,当生产者发送消息后RabbitMQ回调一个confirm方法,返回消息的id和是否成功发送的标识以及失败原因,来让我们消息是否成功发送,如果失败进行相应的处理
  8. 当消息从交换机没有正确到达队列时会回调一个return方法,返回消息、应答编码、失败原因、交换机以及路由键。回调后我们进行相应的处理。如果成功到达队列则不会回调。
  9. 默认是自动ack的,当消息到达消费者rabbitMQ就把消息删除了,这样是最高效的,但如果消费者因为某些原因没有正确消费消息,那么这条消息就丢失了。手动ack是指消费者正确处理完消息之后返回给rabbitMQ一个ack状态,告诉RabbitMQ可以把这条消息删除了,这样就保证了消息不会在消费者处丢失。

话不多说,上代码

创建工程,配置文件添加rabbitmq的配置

server:
  port: 8082

spring:
  rabbitmq:
    username: guest
    password: guest
    port: 5672
    host: localhost
    publisher-confirm-type: correlated  #设置发送者确认confirm机制
    publisher-returns: true #消息未从交换机正确到达队列时发送回调
    listener:
      simple:
        acknowledge-mode: manual  #指定消息确认模式为手动确认

RabbitMQ配置类

@Component
public class RabbitMQConfig {
    public static final String EXCHANGE_NAME = "message_confirm_exchange";
    public static final String QUEUE_NAME = "message_confirm_queue";
    private static final String ROUTING_KEY = "user.#";

    @Bean
    private TopicExchange topicExchange() {
        return new TopicExchange(EXCHANGE_NAME);
    }

    @Bean
    private Queue queue() {
        return new Queue(QUEUE_NAME);
    }

    @Bean
    private Binding bindingTopic(){
        return BindingBuilder.bind(queue()).to(topicExchange()).with(ROUTING_KEY);
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //设置mandatory为true才能让消息没有到达队列时回调return方法,如果不设置的话默认为false,当消息没有到达队列就直接丢弃了
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }
}

注册confirm和return回调

@Component
@Slf4j
public class CustomConfirmAndReturnCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    @Resource
    private RabbitTemplate rabbitTemplate;

    //依赖注入完成后执行此方法
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * 生产者确认confirm机制。消息无论是否到达交换机都会回调这个方法
     * 20年12月之前的CorrelationData对象中只有一个id属性,我们需要自己绑定id和具体消息的关系来根据id找到消息
     * 但20年12月之后的CorrelationData对象中多了一个returnedMessage属性,生产者在设置CorrelationData对象的消息id时可以把同时把发送的消息设置进去,这样在confirm回调中可以直接获取消息了,不用自己做绑定
     * @param correlationData
     * @param ack
     * @param s
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String s) {
        //将字节数组转化为消息对象
        MessageDTO messageDTO = ByteObjectConvert.byte2Object(correlationData.getReturnedMessage().getBody());
        if (ack){
            log.info("confirm,发送成功-----消息:{},id:{}",messageDTO,correlationData.getId());
        }else{
            log.info("confirm,发送失败-----消息:{},id:{},失败原因:{}",messageDTO,correlationData.getId(),s);
            //发送失败,将失败消息存入数据库。入库后可以使用定时任务去扫描发送失败的消息进行重发
            //获取要发送的交换机,路由键。在生产者发送消息时我自己定义了一个map用于存储消息要发送到的交换机和路由键,用于在confirm失败时获取然后进行相应处理
            String exchangeAndRoutingKey = DataContainer.getExchangeAndRoutingKey(correlationData.getId());
            String[] split = exchangeAndRoutingKey.split(",");
            String exchange = split[0];
            String routingKey = split[1];
            log.info("\n 交换机:{},路由键:{}",exchange,routingKey);
            //入库后删除map中的数据
            DataContainer.del(correlationData.getId());
        }
    }

    /**
     * 如果消息未从交换机正确到达队列会回调这个方法,正确到达不会回调这个方法
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        byte[] body = message.getBody();
        //将消息的字节数组转为对象
        MessageDTO o = ByteObjectConvert.byte2Object(body);
        //存入数据库
        log.info("returns-----:{}\n replyCode:{},replyText:{},exchange:{},routingKey:{}",o,replyCode,replyText,exchange,routingKey);
        //入库后删除map中的数据
        String msgID = message.getMessageProperties().getCorrelationId();
        DataContainer.del(msgID);
    }
}

定义一个用于存储消息要发送到的交换机和路由键的map

/*
 * @Description TODO (保存发送消息的交换机和路由键,为了在confirm为false时获取然后入库)
 * 创建人: 程长新
 * 创建时间:2023/9/14 15:08
 **/
@Slf4j
@NoArgsConstructor
public class DataContainer {
    private static ConcurrentHashMap<String,String> map = new ConcurrentHashMap<>();

    public static void saveExchangeAndRoutingKey(String key, String value){
        String put = map.put(key, value);
    }

    public static void del(String key){
        String remove = map.remove(key);
    }

    public static String getExchangeAndRoutingKey(String key){
        return map.get(key);
    }
}

随便定义一个发送消息的实体

/**
 * 因为要作为消息进行发送,所以这个实体必须是可序列化的。所以实现Serializable接口
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MessageDTO implements Serializable {
    private int id;
    private String msg;
}

自己定义一个byte数组与Object转换的工具类

/**
 * byte数组与对象转换的工具类
 */
public class ByteObjectConvert {

    public static byte[] object2Bytes(MessageDTO messageDTO) {
        byte[] messageDTOBytes;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        ObjectOutputStream objectOutputStream = null;
        try {
            objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
            objectOutputStream.writeObject(messageDTO);
            messageDTOBytes = byteArrayOutputStream.toByteArray();
        } catch (IOException e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        } finally {
            try {
                byteArrayOutputStream.close();
                objectOutputStream.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        return messageDTOBytes;
    }

    public static MessageDTO byte2Object(byte [] body) {
//        byte[] body = correlationData.getReturnedMessage().getBody();
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body);
        ObjectInputStream objectInputStream = null;
        MessageDTO messageDTO = null;
        try {
            objectInputStream = new ObjectInputStream(byteArrayInputStream);
            messageDTO = (MessageDTO) objectInputStream.readObject();

        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                byteArrayInputStream.close();
                objectInputStream.close();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        return messageDTO;
    }
}

生产者发送消息


@Slf4j
@RestController
public class Producer {
    @Resource
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/send")
//    @ResponseBody
    public String sendMessage(){
        for (int i = 0; i < 3; i++) {
            String correlationId = UUID.randomUUID().toString();
            CorrelationData correlationData = new CorrelationData(correlationId);
            String s = "message" + i;
            MessageDTO messageDTO = new MessageDTO(i, s);
            log.info("消息{}的id{}",s,correlationData.getId());
            //将要发送的消息转为字节数组用于构建Message对象
            byte[] messageDTOBytes = ByteObjectConvert.object2Bytes(messageDTO);
            //构建Message对象时同时将唯一id correlationId设置进去
            Message build = MessageBuilder.withBody(messageDTOBytes).setCorrelationId(correlationId).build();
            //将发送的消息设置到CorrelationData对象中,可以直接在confirm回调方法中获取Message消息
            correlationData.setReturnedMessage(build);
            if (i == 1){
                //为了保证消息confirm为false时还能拿到发送到的交换机和路由键,将消息id作为键,交换机和路由键作为值存入map
                DataContainer.saveExchangeAndRoutingKey(correlationData.getId(),"abc" + "," + "user.add");
//                rabbitTemplate.convertAndSend("abc","user.add",messageDTOBytes,correlationData);//如果设置了MessageProperties,那么发送的消息就要构造成Message对象,否则设置的MessageProperties不起作用,因为如果不是Message对象,底层发送的时候还是会将对象构造为Message对象并创建一个空的MessageProperties对象,所以自己设置的会不起作用
                rabbitTemplate.convertAndSend("abc","user.add",build,correlationData);//错误的交换机名称,用于模拟消息没有正确到达交换机的情况
            }else if (i == 2){
                DataContainer.saveExchangeAndRoutingKey(correlationData.getId(),RabbitMQConfig.EXCHANGE_NAME + "," + "abc");
                rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "abc", build, correlationData);//写一个匹配不到的路由键,用于模拟消息没有从交换机到达队列的情况
            }else {
                DataContainer.saveExchangeAndRoutingKey(correlationData.getId(),RabbitMQConfig.EXCHANGE_NAME + "," + "user.add");
                rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "user.add", build, correlationData);
            }
            log.info("生产者发送消息:{}",build);
        }
        return "发送成功";
    }
}

消费者监听队列处理消息--开启手动ack


@Component
@Slf4j
public class Consumer01 {

    @RabbitListener(queues = {RabbitMQConfig.QUEUE_NAME})
    public void receiveMessage01(MessageDTO msg, Message message, Channel channel){
//    public void receiveMessage01(Message message, Channel channel){
//        MessageDTO msg = ByteObjectConvert.byte2Object(message.getBody());
        try {
            /*if ("message1".equals(msg)){
                int i = 1/0;
            }*/
            log.info("消费者01成功接收到消息:{}",msg);
            //rabbitmq只有收到的消费者发送的ack消息,才会将消息删除,否则不会删除
            //第一个参数表示该消息的index,第二个参数表示是否批量确认,批量的话小于这个tag的都会ack掉
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (Exception e) {
            log.error("消费者01处理消息:{},发生异常:{}",msg,e.getMessage());
            try {
                //如果消息没有被拒绝过就将消息重新入队,如果被拒绝过一次一直接丢弃
                if (message.getMessageProperties().getRedelivered()){
                    //basicReject与basicNack一样,只不过basicNack可以批量处理,basicReject只能处理单个
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
                    log.info("消费者01丢弃消息:{}",msg);
                }else {
                    //参数2为批量标识,如果设为true的话小于这个index的消息都将被确认
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
                    log.info("消费者01拒绝消息:{}",msg);
                }
            } catch (IOException ex) {
                log.error("拒绝消息时发生异常:{}",ex.getMessage());
                e.printStackTrace();
            }
            e.printStackTrace();
        }
    }
}