RocketMQ消费暂停问题分析

发布时间 2023-06-08 16:30:50作者: 上好佳28

一、背景

客经使用rocketMq批量推送数据到pcr执行次贷策略引擎和互斥决策引擎,pcr将决策结果推送到前置路由。

二、问题现象描述

在客经推数据时,pcr-updateBorrowState消息积压越来越多,从日志上看,pcr不拉取消息,重启服务器后可以消费消息,过一会又消费变慢,不断重启才让所有消息消费完。

三、业务处理流程

1、 客经在hive筛选客户,经客经程序查出客户号,通过mq批量发送到pcr

2、 Pcr 接收批量客户,逐一执行次贷策略,并发送mq执行互斥策略

3、 将次贷策略输出的导流机构发送给adv执行导流路由

涉及到mq的topic:

pcr-hiveDiversionPolicyExecute:客经程序发送, pcr 消费后执行次贷策略引擎,每个消息包含100个客户号,每个消息间隔1s

pcr-updateBorrowState:pcr自产自销,执行互斥决策引擎的topic,每个消息一个客户

 
 

四、MQ部署架构

       MQ集群:主从模式,四台机器

 

 
 

五、原因分析

 

 
 

pullRequest对象:

分配队列时(rebanlance)会重新生成pullRequest对象

消息都取出后会更新偏移量,重新生成pullRequest对象

现象1、pcr-updateBorrowState消息积压:

原因:客经一个消息包含100个客户,每秒发送一次,即每秒推送一个消息(含100个客户号)到消息队列中,pcr的消费线程数为20个,在消费导流消息时会将实际执行导流的逻辑丢到只开了4个核心线程数的线程池中,一个消息100个客户对应100个任务在4个线程中执行,所以导流消息消费耗时较长,所以占用消费线程的时间较长。而每个客户执行完导流后还会发送消息执行互斥策略,因此导流消息和互斥策略消息比例为1:100,当大部分消费线程被导流消息占用,影响其他消息消费,而互斥策略消息生产速率是接近100个/s。所以互斥策略消息积压明显。因为客经发送的一个消息包含100个客户,平均每秒发送一次,及每秒推送100个客户到次贷导流的topic中,而pcr一次拉取32个消息,循环遍历32个消息,每个消息(含100个客户)开一个消费线程来消费,每个消费线程内部开4个线程并行处理,由于次导流消费耗时较长(1-10min),即使一分钟内完全消费完这32个消息,也远不及客经发送该topic的速度(一分钟可发送60个消息);

 

 
 

一个消息队列对应生成一个pullrequest对象,同一个消费组的所有topic消息(互斥消息、导流消息、或者其他消息)对应生成的pullRequest对象都放在一个队列中,由于客经不停的发送导流消息,就会导致pullRequest队列中连续的pullRequest对象全部是导流消息请求,从而消费线程逐渐全部被导流消息占满,其他消息(不仅仅pcr-updateBorrowState)就得不到资源消费,因为会全部积压;

现象2、 服务器不消费消息,重启才开始拉取消息:

原因:机器重启后会重新分配消息队列,此时会重新生成pullRequest对象,此时该对象中可能重新分派了互斥topic消息,此时该消息获得消费线程后就会被消费,因此可以观察到互斥消息积压下降,但很快当20个消费线程又都被占用来消费客经消息(导流消息消费很慢)时,就又回到了之前的场景,所以要不停重启才会消费其他消息

六、解决方案

1、互斥决策引擎mq(pcr-updateBorrowState)消费由单线程改为多线程处理

2、次贷导流由(100个客户/1s)下调频率

3、将导流的mq单独定义一个消费组,这样就可与线上其他topic隔离,并且也不共用主流程消费的线程

4、批量推送的数据只入库,消费线程不等直接返回;后续处理逻辑由调度捞取执行


作者:上好佳28
链接:https://www.jianshu.com/p/7d4969c1a535
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。