RocketMQ源码(六):RocketMQ消费者启动流程

发布时间 2023-09-20 21:13:24作者: 无虑的小猪

  RocketMQ通过Consumer消费消息,可并发和顺序的处理消息,这里以并发消费普通消息为例,分析消息下佛诶的整体流程。Consumer的示例代码如下:

 1 import com.snails.rmq.common.RMQConstant;
 2 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 3 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 4 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 5 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 6 import org.apache.rocketmq.client.exception.MQClientException;
 7 import org.apache.rocketmq.common.message.MessageExt;
 8 import java.io.UnsupportedEncodingException;
 9 import java.util.List;
10 
11 public class SyncConsumer {
12     public static void main(String[] args) throws MQClientException {
13         // 实例化消费者组名称
14         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-group");
15         // 指定name server地址
16         consumer.setNamesrvAddr("127.0.0.1:9876");
17         // 订阅至少一个主题以供消费
18         consumer.subscribe("TestTopic", "*");
19         //  注册回调,处理从服务端获取的消息
20         consumer.registerMessageListener(new MessageListenerConcurrently() {
21             @Override
22             public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
23                 for (MessageExt msg : msgs) {
24                     try {
25                         System.out.println(msg.getTags());
26                         System.out.println(msg.getKeys());
27                         System.out.println(String.format("线程%s,接收订单:%s", Thread.currentThread().getName(), new String(msg.getBody(), "UTF-8")));
28                     } catch (UnsupportedEncodingException e) {
29                         // TODO 补偿机制
30                         System.out.println(e.getMessage());
31                     }
32                 }
33                 // 消费消息确认
34                 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
35             }
36         });
37         // 启动消费者实例
38         consumer.start();
39         System.out.printf("消费者已启动.%n");
40     }
41 }

根据上述代码,生产者发送消息主要完成以下几件事:

  1、创建消费者对象DefaultMQPushConsumer、设置NameServer地址及注册消息回调处理消息

  2、启动消费者

1、创建消费者

  DefaultMQPushConsumer构造函数详情如下:

 1 // 指定消费者组名创建Consumer
 2 public DefaultMQPushConsumer(final String consumerGroup) {
 3     this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
 4 }
 5 
 6 public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,
 7     AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
 8     // 消费者组
 9     this.consumerGroup = consumerGroup;
10     this.namespace = namespace;
11     // 选择消息队列策略
12     this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
13     // 消费消息对象,实际干活的
14     defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
15 }

  与生成者类似,DefaultMQPushConsumer中持有关键属性DefaultMQPushConsumerImpl。DefaultMQPushConsumer是实际干活的,DefaultMQPushConsumer的启动与消息消费都依赖于DefaultMQPushConsumer中方法,利用装饰者模式。

2、注册消息监听

  DefaultMQPushConsumer#registerMessageListener 详情如下:
1 // 注册消息监听
2 public void registerMessageListener(MessageListenerConcurrently messageListener) {
3     this.messageListener = messageListener;
4     this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
5 }

  主要完成消息回调的属性赋值,针对DefaultMQPushConsumer的messageListener属性、DefaultMQPushConsumerImpl的messageListenerInner属性。

3、启动消费者

  DefaultMQPushConsumer#start 详情如下:

  

   实际完成消息消费功能的是 DefaultMQPushConsumerImpl。

1 //  消费者实际执行对象,DefaultMQPushConsumer 使用装饰者模式
2 //  实际调用是委派 DefaultMQPushConsumerImpl 完成消息消费
3 protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;

  DefaultMQPushConsumerImpl#start 启动详情如下:

  1 /**
  2  * 消费者的核心代码入口
  3  */
  4 public synchronized void start() throws MQClientException {
  5     switch (this.serviceState) {
  6         case CREATE_JUST:
  7             log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
  8                 this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
  9             this.serviceState = ServiceState.START_FAILED;
 10             // 1.检查配置信息
 11             this.checkConfig();
 12 
 13             // 2.加工订阅信息(同时,如果消息消费模式为集群模式,还需要为该消费组创建一个重试主题。)
 14             this.copySubscription();
 15 
 16             if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
 17                 this.defaultMQPushConsumer.changeInstanceNameToPID();
 18             }
 19 
 20             // 3.创建MQClientInstance实例,
 21             // 这个实例在一个JVM中消费者和生产者共用,MQClientManager中维护了一个factoryTable,类型为ConcurrentMap,保存了clintId和MQClientInstanc
 22             this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
 23 
 24             // 1.1.负载均衡(队列默认分配算法)
 25             this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
 26             this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
 27             // 1.2.队列默认分配算法
 28             this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
 29             this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
 30 
 31             // 5. 拉取消息(无论是拉模式,还是推模式 :数据都是拉),
 32             // pullAPIWrapper拉取消息的API包装类,主要有消息的拉取方法和接受拉取到的消息
 33             this.pullAPIWrapper = new PullAPIWrapper(
 34                 mQClientFactory,
 35                 this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
 36             this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
 37 
 38             // 7.消费进度存储,如果是集群模式,使用远程存储RemoteBrokerOffsetStore,如果是广播模式,则使用本地存储LocalFileOffsetStore
 39             if (this.defaultMQPushConsumer.getOffsetStore() != null) {
 40                 this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
 41             } else {
 42                 switch (this.defaultMQPushConsumer.getMessageModel()) {
 43                     // 广播模式
 44                     case BROADCASTING:
 45                         this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
 46                         break;
 47                     // 集群模式
 48                     case CLUSTERING:
 49                         this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
 50                         break;
 51                     default:
 52                         break;
 53                 }
 54                 this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
 55             }
 56             // 8.加载消息进度(offsetStore是用来操作消费进度的对象)
 57             // push模式消费进度最后持久化在broker端,但是consumer端在内存中也持有消费进度
 58             this.offsetStore.load();
 59 
 60             // 9.判断是顺序消息还是并发消息
 61             if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
 62                 this.consumeOrderly = true;
 63                 this.consumeMessageService =
 64                     new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
 65             } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
 66                 this.consumeOrderly = false;
 67                 this.consumeMessageService =
 68                     new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
 69             }
 70 
 71             // 10.消息消费服务并启动
 72             this.consumeMessageService.start();
 73 
 74             // 11.注册消费者
 75             boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
 76             if (!registerOK) {
 77                 this.serviceState = ServiceState.CREATE_JUST;
 78                 this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());
 79                 throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
 80                     + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
 81                     null);
 82             }
 83 
 84             // 12.MQClientInstance启动(第3步中创建了MQClientInstance)
 85             mQClientFactory.start();
 86             log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
 87             this.serviceState = ServiceState.RUNNING;
 88             break;
 89         case RUNNING:
 90         case START_FAILED:
 91         case SHUTDOWN_ALREADY:
 92             throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
 93                 + this.serviceState
 94                 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
 95                 null);
 96         default:
 97             break;
 98     }
 99 
100     // 更新主题路由信息
101     this.updateTopicSubscribeInfoWhenSubscriptionChanged();
102     // 检查在Broker上的状态
103     this.mQClientFactory.checkClientInBroker();
104     // 发送心跳
105     this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
106     // 唤醒负载均衡服务
107     this.mQClientFactory.rebalanceImmediately();
108 }

启动消费者流程:

  判断当前客户端的状态,不同的服务状态执行不同的流程,RocketMQ的服务状态ServiceState如下:
 1 public enum ServiceState {
 2     // 刚创建,未启动
 3     CREATE_JUST,
 4     // 运行中
 5     RUNNING,
 6     // 关闭
 7     SHUTDOWN_ALREADY,
 8     // 启动失败
 9     START_FAILED;
10 }

  这里我们主要看 CREATE_JUST 状态的流程,其他三个状态会抛出MQClientException异常。

3.1、检查配置信息

  检查配置信息,DefaultMQPushConsumerImpl#checkConfig。
  校验消费者组、消息模型、消息进度、分配消息队列策略、消息监听器、消费线程等信息。

3.2、加工订阅信息

  DefaultMQPushConsumerImpl#copySubscription 详情如下:
 1 private void copySubscription() throws MQClientException {
 2     try {
 3         // 将消费者的订阅数据缓存值 rebalanceImpl 的 subscriptionInner 属性中
 4         Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
 5         if (sub != null) {
 6             for (final Map.Entry<String, String> entry : sub.entrySet()) {
 7                 final String topic = entry.getKey();
 8                 final String subString = entry.getValue();
 9                 SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
10                     topic, subString);
11                 this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
12             }
13         }
14 
15         if (null == this.messageListenerInner) {
16             this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
17         }
18 
19         switch (this.defaultMQPushConsumer.getMessageModel()) {
20             // 广播模式
21             case BROADCASTING:
22                 break;
23             // 集群模型
24             case CLUSTERING:
25                 // 创建一个重试主题
26                 final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
27                 SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
28                     retryTopic, SubscriptionData.SUB_ALL);
29                 this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
30                 break;
31             default:
32                 break;
33         }
34     } catch (Exception e) {
35         throw new MQClientException("subscription exception", e);
36     }
37 }

  将当前消费者客户端设置在本地缓存RebalanceImpl#subscriptionInner中,若当前消息消费模式是集群模式,创建一个重试Topic。

  

  重试Topic名称的生成规则:MixAll#RETRY_GROUP_TOPIC_PREFIX 加上 消费组名称。

3.3、创建MQClientInstance实例

  MQClientInstance实例在一个JVM中消费者和生产者共用,MQClientManager中维护了一个factoryTable,类型为ConcurrentMap,保存了clintId和MQClientInstanc。
  MQClientManager#getOrCreateMQClientInstance() 详情如下:

   

   消费者默认的消息队列分配策略:AllocateMessageQueueAveragely - 平均哈希队列策略,分配规则详情 AllocateMessageQueueStrategy#allocate 参数详情如下:

  0

  平均哈希队列策略 AllocateMessageQueueAveragely#allocate 实现详情如下:

    

   平均哈希队列算法,首先会获取当前消费者在消费者组中的位置,再计算当前消费者能分配的消息队列的数量,以及分配的消息队列的范围,也就是从哪里开始分配到哪里结束分配,最后分配消息队列的范围给消费者进行分配。

3.5、拉取消息包装类定义

  定义RocketMQ拉取消息的包装类PullAPIWrapper,PullAPIWrapper是拉取消息的API包装类。

  

   PullAPIWrapper中包含了实际从MQ中获取消息的方法 pullKernelImpl()。

  0

  这里主要定义了拉取消息的对象,实际拉取消息的操作在启动拉取消息服务 PullMessageService 之后才会进行,也就是在启动 PullMessageService 服务之后,拉取消息的描述在启动消费消息服务做消息介绍。

3.6、消费进度存储

  根据消费消息的模式,判断加载消费进度的,消费进度对象 DefaultMQPushConsumerImpl#offsetStore,OffsetStore接口有两个子类LocalFileOffsetStore 和 RemoteBrokerOffsetStore,类图如下:

  0

  LocalFileOffsetStore:广播模式下,从本地读取消费进度,在该模式下会将offset信息转化成json保留到本地文件中;

  RemoteBrokerOffsetStore:集群模式下,获取远程存储信息,从 Broker 中获取消费进度,在该模式下offsetTable将需要提交的MessageQueue的offset信息通过          MQClientAPIImpl提供的接口updateConsumerOffsetOneway()提交到broker进行长久化存储。consumer的shutdown()办法会被动触发一次offset持久化到broker的操作。

  0

  根据消息消费模式选择不同的同步方式:广播模式从本地文件中获取偏移量,集群模式会根据偏移量的读取方式选择是读取内存中的偏移量还是Broker磁盘中的偏移量。

  将获取的消息偏移量对象赋值给 DefaultMQPushConsumerImpl#offsetStore 属性,后续可通过此对象加载消费进度。

 

3.7、消费进度加载
根据不同消费消息的模式获取到偏移量对象 DefaultMQPushConsumerImpl#offsetStore后,开始加载消息进度。
0
加载消费进度,RemoteBrokerOffsetStore中的load()是空实现;
LocalFileOffsetStore中的load()方法读取本地文件 offsets.json,本地文件存储路径详情:
0

3.8、并发消息与顺序消息的判断

  并发消费消息与顺序消费消息创建的消费消息服务不同,并发消费消息创建ConsumeMessageConcurrentlyService、顺序消息消息创建ConsumeMessageOrderlyService。

0

  这里仅仅是创建服务对象,未启动。

3.9、启动消费消息服务

3.9.1、顺序消息消费服务

  顺序消息是基于JUC的ReadWriteLock、ReentrantLoc实现的,为消息队里加锁,保证同一时间只有一个消费者可以消费该队列的消息,在拉取到消息后,通过MessageListenerOrderly接口的consumeMessage方法来处理消息。

  在ConsumeMessageOrderlyService启动时,进行加锁操作;关闭时,释放锁。

  0

1、加锁处理

  加锁操作流程如下:客户端通过Netty传递功能号LOCK_BATCH_MQ=41远程调用Broker对消息队列进行加锁,最终将加锁的消息队列存放在RebalanceLockManager#mqLockTable 缓存表中。

  0

  Broker对消息队列的加锁处理,RebalanceLockManager#tryLockBatch 详情如下:

 1 // 加锁处理
 2 public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,
 3     final String clientId) {
 4     // 创建存放 加锁的消息队列 与 不加锁的消息队列 的容器
 5     Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size());
 6     Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size());
 7     
 8     // 将消费者组中已加锁的消息队列与未加锁的消息队列区分开,便于对未加锁的消息队列进行加锁处理
 9     for (MessageQueue mq : mqs) {
10         if (this.isLocked(group, mq, clientId)) {
11             lockedMqs.add(mq);
12         } else {
13             notLockedMqs.add(mq);
14         }
15     }
16     // 未加锁的消息队列不为空
17     if (!notLockedMqs.isEmpty()) {
18         try {
19             // 加锁
20             this.lock.lockInterruptibly();
21             try {
22                 // 锁对象添加进消息队列缓存表中
23                 ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
24                 if (null == groupValue) {
25                     groupValue = new ConcurrentHashMap<>(32);
26                     this.mqLockTable.put(group, groupValue);
27                 }
28 
29                 // 为未加锁的消息队列,创建 LockEntity 对象,并设置客户端id
30                 for (MessageQueue mq : notLockedMqs) {
31                     LockEntry lockEntry = groupValue.get(mq);
32                     if (null == lockEntry) {
33                         lockEntry = new LockEntry();
34                         lockEntry.setClientId(clientId);
35                         groupValue.put(mq, lockEntry);
36                     }
37 
38                     // 若已加锁,重置最后更新时间戳,重新设置到 缓存 的 容器中
39                     if (lockEntry.isLocked(clientId)) {
40                         lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
41                         lockedMqs.add(mq);
42                         continue;
43                     }
44 
45                     String oldClientId = lockEntry.getClientId();
46                     // 若锁到期,续期、再重新设置到 缓存 的 容器中
47                     if (lockEntry.isExpired()) {
48                         lockEntry.setClientId(clientId);
49                         lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
50                         lockedMqs.add(mq);
51                         continue;
52                     }
53                 }
54             } finally {
55                 // 释放锁
56                 this.lock.unlock();
57             }
58         } catch (InterruptedException e) {
59             log.error("putMessage exception", e);
60         }
61     }
62     return lockedMqs;
63 }

2、释放锁处理

  释放锁流程与加锁操作流程类似,客户端通过Netty传递功能号UNLOCK_BATCH_MQ=42远程调用Broker对消息队列进行锁释放,最终遍历RebalanceLockManager#mqLockTable 加锁缓存表消息队列,将缓存表中指定客户端Id的消息队列移除。
  0

  Broker对消息队列的释放锁的处理,RebalanceLockManager#unlockBatch 详情如下:

 1 // 释放锁
 2 public void unlockBatch(final String group, final Set<MessageQueue> mqs, final String clientId) {
 3     try {
 4         // 加锁
 5         this.lock.lockInterruptibly();
 6         try {
 7             ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
 8             if (null != groupValue) {
 9                 // 移除锁对象中的clientId
10                 for (MessageQueue mq : mqs) {
11                     LockEntry lockEntry = groupValue.get(mq);
12                     if (null != lockEntry) {
13                         if (lockEntry.getClientId().equals(clientId)) {
14                             groupValue.remove(mq);
15                         }
16                     }
17                 }
18             } 
19         } finally {
20             // 释放锁
21             this.lock.unlock();
22         }
23     } catch (InterruptedException e) {
24         log.error("putMessage exception", e);
25     }
26 }

3.9.2、并发消息消费服务

  并发消息消费服务在启动时,会清除过期的消息,根据消息的偏移量移除ProcessQueue#msgTreeMap中的MessageExt,通过读写锁ProcessQueue#lockTreeMap来保证清理消息时的线程安全,ProcessQueue#removeMessage, 核心代码如下:

0

  在拉取到消息后,通过 MessageListenerConcurrently 消息监听处理器的consumeMessage方法来处理消息。

3.10、注册消费者

  将消费者注册到消费者缓存表 MQClientInstance#consumerTable ,MQClientInstance#registerConsumer 详情如下:

 1 // 消费者缓存表
 2 private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
 3 
 4 // 注册消费者
 5 public boolean registerConsumer(final String group, final MQConsumerInner consumer) {
 6     if (null == group || null == consumer) {
 7         return false;
 8     }
 9     // 将消费者注册到消费者缓存表中
10     MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
11     if (prev != null) {
12         log.warn("the consumer group[" + group + "] exist already.");
13         return false;
14     }
15 
16     return true;
17 } 

3.11、启动MQClientInstance

1、启动消息发送服务,启动Netty

  开启Netty通信的服务,NettyRemotingClient#start(),启动 Netty远程客户端服务。

2、开启定时任务

  开启定时任务,MQClientInsatnce#startScheduledTask 详情如下:

 1 // 开启定时任务
 2 private void startScheduledTask() {
 3     // 2min同步NameServer地址服务
 4     if (null == this.clientConfig.getNamesrvAddr()) {
 5         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 6             @Override
 7             public void run() {
 8                 try {
 9                     MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
10                 } catch (Exception e) {
11                     log.error("ScheduledTask fetchNameServerAddr exception", e);
12                 }
13             }
14         }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
15     }
16 
17     // 30s同步NameServer中的Topic路由信息
18     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
19         @Override
20         public void run() {
21             try {
22                 MQClientInstance.this.updateTopicRouteInfoFromNameServer();
23             } catch (Exception e) {
24                 log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
25             }
26         }
27     }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
28 
29     // 30s剔除下线的broker、发送心跳检测到broker
30     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
31         @Override
32         public void run() {
33             try {
34                 MQClientInstance.this.cleanOfflineBroker();
35                 MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
36             } catch (Exception e) {
37                 log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
38             }
39         }
40     }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
41 
42     // 5s 持久化消费进度
43     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
44         @Override
45         public void run() {
46             try {
47                 MQClientInstance.this.persistAllConsumerOffset();
48             } catch (Exception e) {
49                 log.error("ScheduledTask persistAllConsumerOffset exception", e);
50             }
51         }
52     }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
53 
54     // 调整线程池异步任务
55     this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
56         @Override
57         public void run() {
58             try {
59                 MQClientInstance.this.adjustThreadPool();
60             } catch (Exception e) {
61                 log.error("ScheduledTask adjustThreadPool exception", e);
62             }
63         }
64     }, 1, 1, TimeUnit.MINUTES);
65 }

3、开启拉取消息服务

  启动拉取消息服务 PullMessageService,拉取消息的核心方法 PullMessageService#pullMessage。

  0

  执行DefaultMQPushConsumerImpl#pullMessage 方法:

  0

  根据不同的发送方式,调用 MQClientAPIImpl#pullMessage 拉取消息 详情如下:

 1 public PullResult pullMessage(
 2     final String addr,
 3     final PullMessageRequestHeader requestHeader,
 4     final long timeoutMillis,
 5     final CommunicationMode communicationMode,
 6     final PullCallback pullCallback
 7 ) throws RemotingException, MQBrokerException, InterruptedException {
 8     RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader);
 9 
10     // 通讯方式的判断
11     switch (communicationMode) {
12         case ONEWAY:
13             assert false;
14             return null;
15         // 异步
16         case ASYNC:
17             this.pullMessageAsync(addr, request, timeoutMillis, pullCallback);
18             return null;
19         // 同步
20         case SYNC:
21             return this.pullMessageSync(addr, request, timeoutMillis);
22         default:
23             assert false;
24             break;
25     }
26 
27     return null;
28 }

4、开启负载均衡服务

  负载均衡服务RebalanceService,最终执行 RebalanceImpl#doRebalance,详情如下:

 1 public void doRebalance(final boolean isOrder) {
 2     // 获取订阅数据
 3     Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
 4     if (subTable != null) {
 5         for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
 6             // 获取Topic
 7             final String topic = entry.getKey();
 8             try {
 9                 // 负载均衡处理
10                 this.rebalanceByTopic(topic, isOrder);
11             } catch (Throwable e) {
12                 if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
13                     log.warn("rebalanceByTopic Exception", e);
14                 }
15             }
16         }
17     }
18     // 清除未订阅主题的消息队列
19     this.truncateMessageQueueNotMyTopic();
20 }

5、开启消息推送服务

  启动消息推送服务,用于将消息消费结果通知给Broker,将生产者添加进生产者缓存表 MQClientInstance#producerTable。向所有的Broker发送心跳,并移除超时请求,并执行回调方法onException。

  DefaultMQProducerImpl#start() 详情如下:

 1 /**
 2  * 启动生产者
 3  * @param startFactory
 4  * @throws MQClientException
 5  */
 6 public void start(final boolean startFactory) throws MQClientException {
 7     switch (this.serviceState) {
 8         // 刚创建未启动
 9         case CREATE_JUST:
10             this.serviceState = ServiceState.START_FAILED;
11             // todo 检查配置
12             this.checkConfig();
13             // 更改当前instanceName为进程ID
14             if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
15                 this.defaultMQProducer.changeInstanceNameToPID();
16             }
17 
18             //todo 获取MQ客户端实例
19             //整个JVM中只存在一个MQClientManager实例,维护一个MQClientInstance缓存表
20             //ConcurrentMap<String/* clientId */, MQClientInstance> factoryTable = new ConcurrentHashMap<String,MQClientInstance>();
21             //同一个clientId只会创建一个MQClientInstance。
22             //MQClientInstance封装了RocketMQ网络处理API,是消息生产者和消息消费者与NameServer、Broker打交道的网络通道
23             this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
24 
25             // 注册Producer到MQClientInstance客户端实例
26             boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
27             // 未注册成功,抛出异常
28             if (!registerOK) {
29                 this.serviceState = ServiceState.CREATE_JUST;
30                 throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
31                     + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
32                     null);
33             }
34 
35             // 路由信心添加进缓存表中
36             this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
37 
38             // 启动MQ客户端实例
39             if (startFactory) {
40                 // todo 最终调用MQClientInstance
41                 mQClientFactory.start();
42             }
43 
44             log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
45                 this.defaultMQProducer.isSendMessageWithVIPChannel());
46             this.serviceState = ServiceState.RUNNING;
47             break;
48         case RUNNING:
49         case START_FAILED:
50         case SHUTDOWN_ALREADY:
51             throw new MQClientException("The producer service state not OK, maybe started once, "
52                 + this.serviceState
53                 + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
54                 null);
55         default:
56             break;
57     }
58 
59     // 向所有的broker发送心跳
60     this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
61 
62     // 扫描并移除超时请求,并执行回调方法onException
63     this.timer.scheduleAtFixedRate(new TimerTask() {
64         @Override
65         public void run() {
66             try {
67                 RequestFutureTable.scanExpiredRequest();
68             } catch (Throwable e) {
69                 log.error("scan RequestFutureTable exception", e);
70             }
71         }
72     }, 1000 * 3, 1000);
73 }

  消息推送服务,在RocketMQ源码(四):RocketMQ生产者发送消息流程中已经做了详细介绍,此处不再赘述。

 

6、更新服务状态

  更新服务状态为运行中,同时更新订阅信息,并发送心跳。

  0

4、总结

  RocketMQ消费者启动流程如下: