rabbitmq listener注解@RabbitListener里的queues是个数组,你用了吗?

发布时间 2023-12-21 09:04:07作者: buguge

靠谱的程序员具有注重实效的偏执,对于重复多行的代码,总会想办法消除重复。

我们zhongtai-channel里在调用服务商接口发起签约前,使用了mq进行异步处理。即:zhongtai-channel签约RPCAPI接收到上游的请求后,先同步持久化保存签约请求流水,然后将签约数据放入rabbitmq消息队列,等待程序里的消费者方法消费消息队列中的消息,调用服务商对接Service发起HTTP签约请求。

签约RPCAPI,参数校验,持久化数据,写入消息队列消息队列监听消息出队,调用服务商对接Service发起HTTP签约请求

系统运行过程中遇到一个问题,程序在消费消息时,有些三方服务商接口的响应时效比较长,进而阻塞消息队列,导致整体签约能力下降。鉴于自有服务商的签约量比重大,考虑自有服务商系统是我们自研的,接口响应时效相对可控,因此,为了提高系统生产能力,将自有服务商的签约与三方服务商的签约一分为二,分别放入两个不同的消息队列。

代码:

package com.emax.channel.provider.modules.mq.sign;

/**
 * @Description 服务商签约MQ异步实现
 * @Author Panda
 * @Date 2023/10/11
 **/
@Slf4j
@Component
@Configuration
public class LevySignApiInvokerMqBroker {

    ...省略一堆@Bean定义

    public void sendLevyGotoSignQueue(LevySignFlow levySignFlow, LevySignDTO levySignDTO) {
        log.info("httpInvokeLevySignApi 调用服务商api签约 mq异步实现 消息入队,{}_{}_{} levySignFlow={}", ...);
        Object[] obj = {levySignFlow, levySignDTO};

        // 获取服务商协议类型
        final LevyChannelConfig levyConfigInfo = levyChannelConfigService.getLevyConfigCache(Long.parseLong(levySignFlow.getTaxSourceId()));
        final String apiSuit = levyConfigInfo.getApiSuit();

        if (StrUtil.equals(apiSuit, LevyChannelRouteEnum.OWN.getCode())) {
            /**
             * 消费者参见{@link #onMessageByOwn(Object[])}
             **/
            rabbitTemplate.convertAndSend(exchangeName, internalQueueName, obj);

        } else {
            /**
             * 消费者参见{@link #onMessage(Object[])}
             **/
            rabbitTemplate.convertAndSend(exchangeName, externalQueueName, obj);
        }
    }


    @RabbitHandler
    @RabbitListener(queues = "#{levySignApiQueueExternal.name}",concurrency = "5", containerFactory = "signPreFetchLimitContainerFactory")
    public void onMessage(Object[] objects) throws Exception {
        LevySignFlow levySignFlow = (LevySignFlow) objects[0];
        LevySignDTO levySignDTO = (LevySignDTO) objects[1];
        log.info("httpInvokeLevySignApi 调用服务商api签约 mq异步实现 消息出队,{}_{}_{} levySignDTO={}", ...);
        long nowTime = System.currentTimeMillis();

        // 执行逻辑
        try {
            levyGotoSignAndSendSignQueryQueue(levySignFlow, levySignDTO);
        } catch (Exception e){
            log.error("httpInvokeLevySignApi 调用服务商api签约异常,levySignFlow:{}",JSON.toJSONString(levySignFlow),e);
        }
        log.info("httpInvokeLevySignApi 耗时={} 调用服务商api签约,签约流水号:{} 服务商名称:{}", System.currentTimeMillis() - nowTime,...);
    }

    @RabbitHandler
    @RabbitListener(queues = "#{levySignApiQueueInternal.name}",concurrency = "5", containerFactory = "signPreFetchLimitContainerFactory")
    public void onMessageByOwn(Object[] objects) throws Exception {
        LevySignFlow levySignFlow = (LevySignFlow) objects[0];
        LevySignDTO levySignDTO = (LevySignDTO) objects[1];
        log.info("httpInvokeLevySignApi_owm 调用服务商api签约 mq异步实现 消息出队,{}_{}_{} levySignDTO={}", ...);
        long nowTime = System.currentTimeMillis();

        // 执行逻辑
        try {
            levyGotoSignAndSendSignQueryQueue(levySignFlow, levySignDTO);
        }catch (Exception e){
            log.error("httpInvokeLevySignApi_own 调用服务商api签约异常,levySignFlow:{}",JSON.toJSONString(levySignFlow),e);
        }
        log.info("httpInvokeLevySignApi_owm 耗时={} 调用服务商api签约,签约流水号:{} 服务商名称:{}", System.currentTimeMillis() - nowTime,...);

    }

    private void levyGotoSignAndSendSignQueryQueue(LevySignFlow levySignFlow, LevySignDTO levySignDTO) {
        TaxSignStatusEnum taxSignStatusEnum = levyGotoSignService.syncGotoSign(levySignFlow, levySignDTO);
        if (TaxSignStatusEnum.isFinalState(taxSignStatusEnum)) {
            LevySignFlow levySignFlowFinal = levySignFlowManager.getById(levySignFlow.getId());
            levySignResultNotifyService.async(levySignFlowFinal);
        }else {
            levySignQueryApiInvokerMqBroker.sendLevySignQueryQueue(levySignFlow,5);
        }
    }
}

 

注意其中的两个onMessage方法,看到冗余代码了吗?靠谱的程序员觉得别扭!

 

先介绍一下两个技术点

  1. @RabbitListener注解的queues是一个数组。 - - - - 当两个或多个的其他属性如concurrency都相同时,此技术点满足。
  2. 一个onMessage方法可以同时标注两个@RabbitListener。 - - - - - - 当两个或多个的属性各不相同时, 使用此技术点。

 

So,代码重构就easy了。使用上面的技术点1。将两个onMessage合二为一。(BTW,此时已经没必要有levyGotoSignAndSendSignQueryQueue方法了)

    @RabbitHandler
    @RabbitListener(queues = "#{levySignApiQueueInternal.name}, #{levySignApiQueueExternal.name}", concurrency = "5", containerFactory = "signPreFetchLimitContainerFactory")
    public void onMessage(Object[] objects) throws Exception {
        LevySignFlow levySignFlow = (LevySignFlow) objects[0];
        LevySignDTO levySignDTO = (LevySignDTO) objects[1];
        log.info("httpInvokeLevySignApi 调用服务商api签约 mq异步实现 消息出队,{}_{}_{} levySignDTO={}", ...);
        long nowTime = System.currentTimeMillis();

        // 执行逻辑
        try {
            levyGotoSignAndSendSignQueryQueue(levySignFlow, levySignDTO);
        } catch (Exception e){
            log.error("httpInvokeLevySignApi 调用服务商api签约异常,levySignFlow:{}",JSON.toJSONString(levySignFlow),e);
        }
        log.info("httpInvokeLevySignApi 耗时={} 调用服务商api签约,签约流水号:{} 服务商名称:{}", System.currentTimeMillis() - nowTime,...);
    }

 

 

这时,善于思考的同学可能会提问了。 你消费者方法写成一个了,那方法里怎么知道是自有服务商签约还是三方服务商签约呢?我日后可能会添加不同的性能控制策略。

easy,只要思想不滑坡,办法总比困难多。

  • 办法1:生产者代码里写入消息时,增加自有服务商签约或三方服务商签约的标识。
  • 办法2:消费者代码根据levyId查询LevyConfigCache,即可知道是自有服务商还是三方服务商。
  • 办法3:你琢磨。