RocketMq发送消息之事务消息

发布时间 2023-09-24 22:53:25作者: 自学Java笔记本

概述

事务消息共有三种状态,提交状态、回滚状态、中间状态:

  • TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
  • TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
  • TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。

使用普通消息和订单事务无法保证一致的原因,本质上是由于普通消息无法像单机数据库事务一样,具备提交、回滚和统一协调的能力。 而基于 RocketMQ 的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。
image
        事务消息发送分为两个阶段。第一阶段会发送一个半事务消息,半事务消息是指暂不能投递的消息,生产者已经成功地将消息发送到了 Broker,但是Broker 未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,如果发送成功则执行本地事务,并根据本地事务执行成功与否,向 Broker 半事务消息状态(commit或者rollback),半事务消息只有 commit 状态才会真正向下游投递。如果由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,Broker 端会通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback)。这样最终保证了本地事务执行成功,下游就能收到消息,本地事务执行失败,下游就收不到消息。总而保证了上下游数据的一致性
整个事务消息的详细交互流程如下图所示:
image

事务消息步骤

事务消息发送步骤如下:

  • 1、生产者将半事务消息发送至 RocketMQ Broker。

  • 2、RocketMQ Broker 将消息持久化成功之后,向生产者返回 Ack 确认消息已经发送成功,此时消息暂不能投递,为半事务消息。

  • 3、生产者开始执行本地事务逻辑。

  • 4、生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  • 5、在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。

  • 6、:::note 需要注意的是,服务端仅仅会按照参数尝试指定次数,超过次数后事务会强制回滚,因此未决事务的回查时效性非常关键,需要按照业务的实际风险来设置 :::

事务消息回查步骤如下: 7. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。 8. 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

创建事务性生产者

使用 TransactionMQProducer类创建生产者,并指定唯一的 ProducerGroup,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复
在创建之前先了解几个陌生的名词:

  • TransactionListener:事务监听器,该接口有两个方法分别是执行本地事务和检查本地事务状态,需要我们自己实现
  • sendMessageInTransaction:发送事务消息

生产者代码:

/**
 * 事务消息
 */
public class Producer {

    public static void main(String[] args) throws MQClientException, InterruptedException {

        //创建事务监听器
        TransactionListener transactionListener = new TransactionListenerImpl();
        //创建消息生产者
        TransactionMQProducer producer = new TransactionMQProducer("group5");
        producer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
        //生产者这是监听器
        producer.setTransactionListener(transactionListener);
        //启动消息生产者
        producer.start();
       //这里发送三个不同的tag消息,用于模拟事务的三种状态
        String[] tags = new String[]{"TagA", "TagB", "TagC"};
        for (int i = 0; i < 3; i++) {
            try {
                Message msg = new Message("TransactionTopic", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 发送事务消息
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);
                TimeUnit.SECONDS.sleep(1);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
        //因为需要回查,所以这里就不关闭
        //producer.shutdown();
    }
}

实现事务的监听接口

当发送半消息成功时,我们使用 executeLocalTransaction 方法来执行本地事务。它返回前一节中提到的三个事务状态之一。checkLocalTransaction 方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。

public class TransactionListenerImpl implements TransactionListener {

    /**
     * 当发送transactional prepare(half)消息成功时,将调用此方法来执行本地事务。
     * @param msg 半(准备)消息
     * @param arg 自定义业务参数
     * @return 事务处理状态
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 模拟消息的提交、回滚、和重新回写,这里对应三种不同的事务状态
        if(StringUtils.equals("TagA",msg.getTags())){
            // 消息提交
            return LocalTransactionState.COMMIT_MESSAGE;
        }else if(StringUtils.equals("TagB",msg.getTags())){
            // 消息回滚
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }else if (StringUtils.equals("TagC",msg.getTags())){
            // 不做处理,不明白到底要做什么,此时会事务消息回查
            return LocalTransactionState.UNKNOW;
        }
        return LocalTransactionState.UNKNOW;
    }

    /**
     * checkLocalTransaction 方法用于检查本地事务状态,并回应消息队列的检查请求。
     * 当没有响应准备(半)消息时。broker将发送check消息来检查事务状态,并调用此方法来获取本地事务状态。
     * @param msg 检查消息
     * @return 事务处理状态
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 就是mq进行消息事务状态的回查
        System.out.println("消息的Tag:"+msg.getTags());
        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

事务消息回查是 RocketMQ 用于确保半事务消息最终一致性的机制之一。它的工作原理如下:

  • 生产者发送半事务消息并执行本地事务逻辑(executeLocalTransaction 方法)。
  • 生产者根据本地事务执行结果返回 LocalTransactionState.COMMIT(表示本地事务执行成功)、LocalTransactionState.ROLLBACK(表示本地事务执行失败)或者 LocalTransactionState.UNKNOW(表示本地事务状态未知)。
  • 如果返回 LocalTransactionState.COMMIT 或者 LocalTransactionState.ROLLBACK,RocketMQ 立即处理消息,不再进行回查。
  • 如果返回 LocalTransactionState.UNKNOW,RocketMQ 将等待一段时间(由配置参数控制,默认为 15 秒)。
  • 超过等待时间后,RocketMQ 会发起事务消息回查,它会再次调用 checkLocalTransaction 方法来查询本地事务的最终状态。
  • 如果 checkLocalTransaction 返回 LocalTransactionState.COMMIT,则消息被提交,将被投递给消费者。
  • 如果 checkLocalTransaction 返回 LocalTransactionState.ROLLBACK,则消息被回滚,不会被投递给消费者。
  • 如果 checkLocalTransaction 返回 LocalTransactionState.UNKNOW,RocketMQ 会继续进行回查,直到达到最大回查次数(由配置参数控制,默认为 15 次)。
  • 事务消息回查的目的是确保本地事务的最终一致性,即使在网络故障或应用程序崩溃的情况下也能保证消息的正确处理。如果你的应用程序在处理半事务消息时返回 LocalTransactionState.UNKNOW,RocketMQ 将通过回查机制来检查事务状态,以确保消息最终得到正确的处理结果。

需要注意的是,回查机制需要在生产者和消费者的配置中进行相应的设置,包括回查的等待时间和最大回查次数。这些设置应该根据你的业务需求来调整。同时,确保你的本地事务逻辑是幂等的,因为 RocketMQ 可能会多次调用 checkLocalTransaction 方法进行回查。

消费着

/**
 * 消费事务消息
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        // 1.创建消费者Consumer,指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group5");
        // 2.指定NameServer地址
//        consumer.setNamesrvAddr("192.168.78.129:9876;192.168.78.130:9876");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 3.订阅主题TransactionTopic中的所有消息
        consumer.subscribe("TransactionTopic","*");
        // 4.设置回调函数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            // 接收消息内容的方法
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("接收到的消息是:"+new String(msg.getBody()));
                }
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 5.启动消费者consumer
        consumer.start();
    }
}

image

消费情况:
image

事务消息使用上的限制

  • 事务消息不支持延时消息和批量消息。
  • 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。
  • 事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
  • 事务性消息可能不止一次被检查或消费。
  • 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
  • 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。