RocketMQ之消息轨迹

发布时间 2023-05-06 13:51:32作者: 夏尔_717

一、概述

消息轨迹是用来跟踪记录消息发送、消息消费的轨迹。

如何启用消息轨迹?

  • broker端

需要在broker端的配置文件中添加配置项:traceTopicEnable=true,注意:对于消息轨迹数据量较大的场景,可以在RocketMQ集群中选择其中一个Broker节点专用于存储消息轨迹,使得用户普通的消息数据与消息轨迹数据的物理IO完全隔离,互不影响。

  • 客户端

producer端和consumer端需要启用消息轨迹,具体是在初始化客户端时打开打开启用消息轨迹的开关并根据实际需求决定是否使用默认的topic来存储消息轨迹

producer端消息轨迹相关API

public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace);

public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic);

public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,
    final String customizedTraceTopic);

public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook, 
    boolean enableMsgTrace, final String customizedTraceTopic);

consumer端消息轨迹相关API

public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace);

public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic);

public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, 
    AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic);

public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
    AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic);
  • 消息轨迹存储的topic

默认情况下消息轨迹是存储在RMQ_SYS_TRACE_TOPIC,此外消息轨迹还可以存储在用户自定义的topic中,注意:自定义的topic需要提前创建

二、案例

public class TraceProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);
        producer.start();

        for (int i = 0; i < 128; i++)
            try {
                {
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        producer.shutdown();
    }
}
public class TracePushConsumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        // Here,we use the default message track trace topic name
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1", true);
        consumer.subscribe("TopicTest", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // Wrong time format 2017_0422_221800
        consumer.setConsumeTimestamp("20181109221800");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n",
                        Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

三、源码分析

下面分别从producer端、consumer端看看消息轨迹是如何产生以及如何发送到broker端。

3.1 producer端

3.1.1 初始化producer

producer初始化阶段会完成以下操作:

  • 初始化producer
  • 初始化AsyncTraceDispatcher对象traceDispatcherAsyncTraceDispatcher是实现消息轨迹功能的重点,后面会详细介绍
  • producer端会注册SendMessageHook
public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
        boolean enableMsgTrace, final String customizedTraceTopic) {
    this.namespace = namespace;
    this.producerGroup = producerGroup;
    defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
    //if client open the message trace feature
    if (enableMsgTrace) {
        try {
            AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup,
                TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
            dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
            traceDispatcher = dispatcher;
            this.getDefaultMQProducerImpl().registerSendMessageHook(
                    new SendMessageTraceHookImpl(traceDispatcher));
        } catch (Throwable e) {
            log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
        }
    }
}

3.1.2 启动producer

在启动producer的时候会启动traceDispatcher,下面详细看下启动traceDispatcher的过程中都完成了哪些操作:

  • AsyncTraceDispatchertraceProducer的作用是发送消息轨迹到broker,这里会启动traceProducer
  • 会启动一个worker线程,其完成的任务封装在AsyncRunnable
  • 注册一个shutdownHook
public void start() throws MQClientException {
    this.setProducerGroup(withNamespace(this.producerGroup));
    this.defaultMQProducerImpl.start();
    if (null != traceDispatcher) {
        try {
            traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
        } catch (MQClientException e) {
            log.warn("trace dispatcher start failed ", e);
        }
    }
}

public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
    if (isStarted.compareAndSet(false, true)) {
        traceProducer.setNamesrvAddr(nameSrvAddr);
        traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
        traceProducer.start();
    }
    this.accessChannel = accessChannel;
    this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
    this.worker.setDaemon(true);
    this.worker.start();
    this.registerShutDownHook();
}

下面详细分析AsyncRunnable,从其run函数可以看到它主要完成的任务是从traceContextQueue队列中获取消息轨迹上下文信息,这里会获取100条信息,然后会根据这100TraceContext初始化一个AsyncAppenderRequest对象,最后将其提交到traceExecutor线程池中(注意:traceContextQueuebatchSize以及traceExecutor是在初始化traceDispatcher时确定的)。这里会有一个疑问:AsyncAppenderRequest的作用是什么?带着这个问题我们继续看AsyncAppenderRequest

public void run() {
    while (!stopped) {
        List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
        for (int i = 0; i < batchSize; i++) {
            TraceContext context = null;
            try {
                //get trace data element from blocking Queue — traceContextQueue
                context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
            }
            if (context != null) {
                contexts.add(context);
            } else {
                break;
            }
        }
        if (contexts.size() > 0) {
            AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
            traceExecutor.submit(request);
        } else if (AsyncTraceDispatcher.this.stopped) {
            this.stopped = true;
        }
    }
}

AsyncAppenderRequestrun方法非常简单,就是执行sendTraceData方法,该方法是将消息轨迹上下文信息进行组装并调用flushData方法发送数据,在这个方法中还会调用encoderFromContextBean方法对消息轨迹上下信息重新编码的处理。flushData方法则是按照maxMessageSize(默认是4M)对数据进行分批次,然后调用sendTraceDataByMQ方法发送分批次后的数据,从sendTraceDataByMQ方法可以看到是traceProducer在承担发送消息轨迹的角色,发送消息轨迹是异步发送以及发送消息轨迹的请求与发送普通消息的请求类型是一样的

public void run() {
    sendTraceData(contextList);
}

public void sendTraceData(List<TraceContext> contextList) {
    Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();
    for (TraceContext context : contextList) {
        if (context.getTraceBeans().isEmpty()) {
            continue;
        }
        // Topic value corresponding to original message entity content
        String topic = context.getTraceBeans().get(0).getTopic();
        String regionId = context.getRegionId();
        // Use  original message entity's topic as key
        String key = topic;
        if (!StringUtils.isBlank(regionId)) {
            key = key + TraceConstants.CONTENT_SPLITOR + regionId;
        }
        List<TraceTransferBean> transBeanList = transBeanMap.get(key);
        if (transBeanList == null) {
            transBeanList = new ArrayList<TraceTransferBean>();
            transBeanMap.put(key, transBeanList);
        }
        TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);
        transBeanList.add(traceData);
    }
    for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {
        String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
        String dataTopic = entry.getKey();
        String regionId = null;
        if (key.length > 1) {
            dataTopic = key[0];
            regionId = key[1];
        }
        flushData(entry.getValue(), dataTopic, regionId);
    }
}


private void flushData(List<TraceTransferBean> transBeanList, String dataTopic, String regionId) {
    if (transBeanList.size() == 0) {
        return;
    }
    // Temporary buffer
    StringBuilder buffer = new StringBuilder(1024);
    int count = 0;
    Set<String> keySet = new HashSet<String>();

    for (TraceTransferBean bean : transBeanList) {
        // Keyset of message trace includes msgId of or original message
        keySet.addAll(bean.getTransKey());
        buffer.append(bean.getTransData());
        count++;
        // Ensure that the size of the package should not exceed the upper limit.
        if (buffer.length() >= traceProducer.getMaxMessageSize()) {
            sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);
            // Clear temporary buffer after finishing
            buffer.delete(0, buffer.length());
            keySet.clear();
            count = 0;
        }
    }
    //count大于0表示还有剩余的消息轨迹,但是剩余的数据没有达到4M,这里是发送剩余的消息轨迹
    if (count > 0) {
        sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);
    }
    transBeanList.clear();
}

到这里我们知道了producer端是如何组装消息轨迹以及发送消息轨迹,那么消息轨迹的来源呢?

3.1.3 发送消息

producer在发送消息前会判断其sendMessageHookList是否为空,这里由于在producer初始化时在producer端会注册SendMessageHook,所以不为空。由于sendMessageHookList不为空,此时会构建SendMessageContext对象,并调用executeSendMessageHookBefore函数在SendMessageContext对象中设置好TraceContextTraceBeanList。接着producer会发送消息,在发送完消息后会将消息发送的结果设置到SendMessageContext中并调用executeSendMessageHookAfter函数根据SendMessageContext构建TraceContext对象,最终会将构建的TraceContext对象放到traceContextQueue队列中。这里需要注意当在发送消息的过程中出现exception时会先将具体的exception设置到SendMessageContext然后再调用executeSendMessageHookAfter函数。

private SendResult sendKernelImpl(final Message msg,
    final MessageQueue mq,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final TopicPublishInfo topicPublishInfo,
    final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    
    long beginStartTime = System.currentTimeMillis();
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    if (null == brokerAddr) {
        tryToFindTopicPublishInfo(mq.getTopic());
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }

    SendMessageContext context = null;
    if (brokerAddr != null) {
        brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);

        byte[] prevBody = msg.getBody();
        try {
            //for MessageBatch,ID has been set in the generating process
            if (!(msg instanceof MessageBatch)) {
                MessageClientIDSetter.setUniqID(msg);
            }

            boolean topicWithNamespace = false;
            if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
                msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
                topicWithNamespace = true;
            }

            int sysFlag = 0;
            boolean msgBodyCompressed = false;
            if (this.tryToCompressMessage(msg)) {
                sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                msgBodyCompressed = true;
            }

            final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
            if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
            }

            if (hasCheckForbiddenHook()) {
                CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                checkForbiddenContext.setCommunicationMode(communicationMode);
                checkForbiddenContext.setBrokerAddr(brokerAddr);
                checkForbiddenContext.setMessage(msg);
                checkForbiddenContext.setMq(mq);
                checkForbiddenContext.setUnitMode(this.isUnitMode());
                this.executeCheckForbiddenHook(checkForbiddenContext);
            }

            if (this.hasSendMessageHook()) {
                context = new SendMessageContext();
                context.setProducer(this);
                context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                context.setCommunicationMode(communicationMode);
                context.setBornHost(this.defaultMQProducer.getClientIP());
                context.setBrokerAddr(brokerAddr);
                context.setMessage(msg);
                context.setMq(mq);
                context.setNamespace(this.defaultMQProducer.getNamespace());
                String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (isTrans != null && isTrans.equals("true")) {
                    context.setMsgType(MessageType.Trans_Msg_Half);
                }

                if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                    context.setMsgType(MessageType.Delay_Msg);
                }
                this.executeSendMessageHookBefore(context);
            }

            SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
            requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
            requestHeader.setTopic(msg.getTopic());
            requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
            requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setSysFlag(sysFlag);
            requestHeader.setBornTimestamp(System.currentTimeMillis());
            requestHeader.setFlag(msg.getFlag());
            requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
            requestHeader.setReconsumeTimes(0);
            requestHeader.setUnitMode(this.isUnitMode());
            requestHeader.setBatch(msg instanceof MessageBatch);
            if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                if (reconsumeTimes != null) {
                    requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                }

                String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                if (maxReconsumeTimes != null) {
                    requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                }
            }

            SendResult sendResult = null;
            switch (communicationMode) {
                case ASYNC:
                    Message tmpMessage = msg;
                    boolean messageCloned = false;
                    if (msgBodyCompressed) {
                        //If msg body was compressed, msgbody should be reset using prevBody.
                        //Clone new message using commpressed message body and recover origin massage.
                        //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                        tmpMessage = MessageAccessor.cloneMessage(msg);
                        messageCloned = true;
                        msg.setBody(prevBody);
                    }

                    if (topicWithNamespace) {
                        if (!messageCloned) {
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            messageCloned = true;
                        }
                        msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
                    }

                    long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeAsync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        tmpMessage,
                        requestHeader,
                        timeout - costTimeAsync,
                        communicationMode,
                        sendCallback,
                        topicPublishInfo,
                        this.mQClientFactory,
                        this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                        context,
                        this);
                    break;
                case ONEWAY:
                case SYNC:
                    long costTimeSync = System.currentTimeMillis() - beginStartTime;
                    if (timeout < costTimeSync) {
                        throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                    }
                    sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                        brokerAddr,
                        mq.getBrokerName(),
                        msg,
                        requestHeader,
                        timeout - costTimeSync,
                        communicationMode,
                        context,
                        this);
                    break;
                default:
                    assert false;
                    break;
            }

            if (this.hasSendMessageHook()) {
                context.setSendResult(sendResult);
                this.executeSendMessageHookAfter(context);
            }

            return sendResult;
        } catch (RemotingException e) {
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } catch (MQBrokerException e) {
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } catch (InterruptedException e) {
            if (this.hasSendMessageHook()) {
                context.setException(e);
                this.executeSendMessageHookAfter(context);
            }
            throw e;
        } finally {
            msg.setBody(prevBody);
            msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
        }
    }

    throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

3.2 consumer端

producer一样,consumer在其初始化阶段会初始化AsyncTraceDispatcher对象并在consumer中注册hook,只是这里注册的是ConsumeMessageHook。在其启动阶段会启动AsyncTraceDispatcher对象。也就是说在consumer端消息轨迹的组装以及发送与producer是完全一样的。那么在consumer端消息轨迹是如何产生的呢?

consumer端会在ConsumeMessageService中构建ConsumeRequest并将其提交到consumeExecutor线程池中执行。消息轨迹就是在执行ConsumeRequest时产生的,与producer不同,consumer会在执行ConsumeRequest时产生两条消息轨迹,在调用consumeMessage方法前后会分别调用executeHookBeforeexecuteHookAfter,根据ConsumeMessageContext来构建TraceContext,并将其添加到traceContextQueue。这里有个细节需要注意:由于consumer在消费消息时会一次消费多条消息,所以会给每条消息构建一个TraceBean,而在producer端由于每次发送一条Message或者MessageBatch所以在构建TraceContext时其TraceBean对象只有一个。

public void run() {
    if (this.processQueue.isDropped()) {
        log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
        return;
    }

    MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
    ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
    ConsumeConcurrentlyStatus status = null;
    defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());

    ConsumeMessageContext consumeMessageContext = null;
    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext = new ConsumeMessageContext();
        consumeMessageContext.setNamespace(defaultMQPushConsumer.getNamespace());
        consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
        consumeMessageContext.setProps(new HashMap<String, String>());
        consumeMessageContext.setMq(messageQueue);
        consumeMessageContext.setMsgList(msgs);
        consumeMessageContext.setSuccess(false);
        ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
    }

    long beginTimestamp = System.currentTimeMillis();
    boolean hasException = false;
    ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
    try {
        if (msgs != null && !msgs.isEmpty()) {
            for (MessageExt msg : msgs) {
                MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
            }
        }
        status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
    } catch (Throwable e) {
        log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}", 
            RemotingHelper.exceptionSimpleDesc(e),
            ConsumeMessageConcurrentlyService.this.consumerGroup,
            msgs,
            messageQueue);
        hasException = true;
    }     
    long consumeRT = System.currentTimeMillis() - beginTimestamp;
    if (null == status) {
        if (hasException) {
            returnType = ConsumeReturnType.EXCEPTION;
        } else {
            returnType = ConsumeReturnType.RETURNNULL;
        }
    } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
        returnType = ConsumeReturnType.TIME_OUT;
    } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) {
        returnType = ConsumeReturnType.FAILED;
    } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) {
        returnType = ConsumeReturnType.SUCCESS;
    }

    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
    }

    if (null == status) {
        log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
            ConsumeMessageConcurrentlyService.this.consumerGroup,
            msgs,
            messageQueue);
        status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }

    if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
        consumeMessageContext.setStatus(status.toString());
        consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
        ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
    }

    ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
        .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);

    if (!processQueue.isDropped()) {
        ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
    } else {
        log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
    }
}

四、消息轨迹中存储的信息

在分析完RocketMQ中是如何实现消息轨迹后,我们来总结下消息轨迹中都记录了哪些信息。在RocketMQ中消息轨迹的信息封装在TraceContextTraceBean以及TraceType中。

4.1 TraceContext

TraceContext中存储的是消息轨迹的上下文信息,具体包含:消息轨迹的类型、broker所属RegionIDbroker所属Region的名称、组名、耗时、被追踪的消息是否发送成功、请求ID、消费结果返回的状态码、轨迹数据的集合

/**
 * The context of Trace
 */
public class TraceContext implements Comparable<TraceContext> {

    private TraceType traceType;
    private long timeStamp = System.currentTimeMillis();
    private String regionId = "";
    private String regionName = "";
    private String groupName = "";
    private int costTime = 0;
    private boolean isSuccess = true;
    private String requestId = MessageClientIDSetter.createUniqID();
    private int contextCode = 0;
    private List<TraceBean> traceBeans;

    //...
}

4.2 TraceBean

TraceBean中存储的是消息轨迹数据,具体包含:消息topic、消息ID、消息offset、消息tag、消息key、消息存储地址、消息客户端地址、消息存储时间、消息重试次数、消息长度、消息类型

public class TraceBean {
    private static final String LOCAL_ADDRESS = UtilAll.ipToIPv4Str(UtilAll.getIP());
    private String topic = "";
    private String msgId = "";
    private String offsetMsgId = "";
    private String tags = "";
    private String keys = "";
    private String storeHost = LOCAL_ADDRESS;
    private String clientHost = LOCAL_ADDRESS;
    private long storeTime;
    private int retryTimes;
    private int bodyLength;
    private MessageType msgType;
}

4.3 TraceType

TraceType表示消息轨迹的类型

  • Pub:消息发送
  • SubBefore:消息消费前
  • SubAfter:消息消费后
public enum TraceType {
    Pub,
    SubBefore,
    SubAfter,
}

五、总结

最后我们用一张图来总结消息轨迹的工作原理:

从图中可以看出以下三点:

  1. 消息轨迹的来源:
  • producer端在发送消息后会将TraceContext添加到traceContextQueue队列中
  • consumer端在消费消息前后会分别向traceContextQueue队列中添加TraceContext
  1. 对消息轨迹的处理:

producer端和consumer端都会启动一个worker线程从traceContextQueue队列中获取一批TraceContext,然后new一个AsynAppenderRequest并提交到traceExecutor线程池中执行,最终在执行AsynAppenderRequest时会通过traceProducer来将消息轨迹发送到broker中。

  1. 消息轨迹的实现方法:

producer端和consumer端通过注册hook函数来构建消息轨迹

https://blog.csdn.net/qq_25145759/article/details/117410275

https://codingw.blog.csdn.net/article/details/98376981