好烦呀为什么rocketmq监听打印的日志没有traceId啊

发布时间 2023-07-26 13:52:57作者: 程序媛-肖

在使用springboot整合rocketmq使用方便使用注解即可实现消费,十分简洁。随之而来的是问题怎么追踪,一个链路的日志能看到是多么美好的事情。搜寻很久没有找到解决方案,于是自己搞吧!

大前提:已经在使用spring-cloud-starter-sleuth,日志格式已经含有traceId、spanId
好的,那么我们来看看吧!

生产者端:生产者产生的消息被消费者消费,那么怎么做到两个日志产生关联呢?有两个比较好的办法,一个是生产者把自己的traceId存入消息头部,消费者获取到拿出来设置自己的traceId。另一个则是生产者只管发消息,消费者把messageId作为自己的traceId也能搜寻。第一种比较适合自产自销,第二种则是A服务产B服务消的情况。我们依次来看看。

1、生产者配置,便捷的使用rocketTemplate,没有其他多余的配置,组合方式使用扩展一下就好
`

@Component
public class MyRocketMqTemplate {

@Autowired
private RocketMQTemplate rocketMQTemplate;

/**
 * 封装发送失败,进行保存并定时重试
 *
 * @param topic   主题
 * @param payload 发送内容
 */
public void sendAndMakeUp(String topic, Object payload) {
    String traceId = (String) MDC.get(TraceFilter.MDC_TRACE_ID_KEY);
           GenericMessage messageExt = new GenericMessage(payload);
    messageExt.getHeaders().put("traceId",traceId);
    rocketMQTemplate.asyncSend(topic, messageExt, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            try {

              // 异步发送traceId连接
                MDC.put(TraceFilter.MDC_TRACE_ID_KEY, traceId);
                log.info("发送完毕,返回发送结果:{}", sendResult);
                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                    // 补偿
                    makeUp(topic, payload);
                }
            }finally {
                MDC.clear();
            }

        }

        @Override
        public void onException(Throwable throwable) {
            log.error("参数:{},mq消息发送异常",payload, throwable);
            // 补偿
            makeUp(topic, payload);
        }

        private void makeUp(String topic, Object payload) {
            try {
             // 补偿逻辑
            } catch (Exception e) {
                log.error("参数:{},补偿失败",JSON.toJSONString(payload), e);
            }
        }
    });
}

`

2、消费者:简单使用@RocketMQMessageListener注解就可以开启一个消费。那么需要关注的消费之前把traceId设置进去就可以。

`
@Component
@Slf4j
public class RocketMqTraceListener {

@EventListener
public void onApplicationEvent(ApplicationReadyEvent event) {
    ConfigurableApplicationContext applicationContext = event.getApplicationContext();
    Map<String, DefaultRocketMQListenerContainer> beansOfType = applicationContext.getBeansOfType(DefaultRocketMQListenerContainer.class);
    if(beansOfType.isEmpty()){
        return;
    }
    for (Map.Entry<String, DefaultRocketMQListenerContainer> stringDefaultRocketMQListenerContainerEntry : beansOfType.entrySet()) {
        DefaultRocketMQListenerContainer value = stringDefaultRocketMQListenerContainerEntry.getValue();
     value.getConsumer().getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageHook() {
            @Override
            public String hookName() {
                return null;
            }

            @Override
            public void consumeMessageBefore(ConsumeMessageContext context) {
                String traceId = null;
                // 可能是批量消费,这个时候取一个就不合适,那么我们就自定义一个uuid
                List<MessageExt> msgList = context.getMsgList();
                if(CollectionUtils.isNotEmpty(msgList) && msgList.size() == 1){
                    traceId = msgList.get(0).getMsgId();
                }
                // 可以取生产者传递
                if (StringUtils.isEmpty(traceId)) {
                         Map<String, String> props = context.getProps();
                        traceId = props.get("traceId");
                }
                // 自定义
                if (StringUtils.isEmpty(traceId)) {
                    traceId = UUID.randomUUID().toString().replaceAll("-", "");
                }
                MDC.put(TraceFilter.MDC_TRACE_ID_KEY, traceId);
            }

            @Override
            public void consumeMessageAfter(ConsumeMessageContext context) {
                MDC.clear();
            }
        });
    }
}

}
`
特别注意:getDefaultMQPushConsumerImpl()方法在升级版本可能不再提供获取 的入口,考虑自己封装consumer。

源码解读:看源码是一个枯燥的事情,代码路径我大概列一下,感兴趣的可以看看
RocketMQAutoConfiguration -> RocketMQListenerConfiguration->ListenerContainerConfiguration->DefaultRocketMQListenerContainer->DefaultMQPushConsumer->DefaultMQPushConsumerImpl