RocketMQ之通信机制

发布时间 2023-05-06 13:51:32作者: 夏尔_717

一、概述

RocketMQ消息队列集群主要包括NameServerBroker(Master/Slave)、ProducerConsumer4个角色,基本通讯流程如下:

  1. Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定时向NameServer上报Topic路由信息。
  2. 消息生产者Producer作为客户端发送消息时候,需要根据消息的Topic从本地缓存的TopicPublishInfoTable获取路由信息。如果没有则更新路由信息会从NameServer上重新拉取,同时Producer会默认每隔30sNameServer拉取一次路由信息。
  3. 消息生产者Producer根据2)中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker作为消息的接收者接收消息并落盘存储。
  4. 消息消费者Consumer根据2)中获取的路由信息,并再完成客户端的负载均衡后,选择其中的某一个或者某几个消息队列来拉取消息并进行消费。

从上面1~3中可以看出在消息生产者,BrokerNameServer之间都会发生通信(这里只说了MQ的部分通信),因此如何设计一个良好的网络通信模块在MQ中至关重要,它将决定RocketMQ集群整体的消息传输能力与最终的性能。

rocketmq-remoting模块是RocketMQ消息队列中负责网络通信的模块,它几乎被其他所有需要网络通信的模块(诸如rocketmq-clientrocketmq-brokerrocketmq-namesrv)所依赖和引用。为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ消息队列自定义了通信协议并在Netty的基础之上扩展了通信模块。

二、Remoting通信类结构

rocketmqDesign3.png

  • RemotingService是最上层的接口,定义了三个方法
void start();
void shutdown();
void registerRPCHook(RPCHook rpcHook);
  • RemotingServer:定义了服务端的接口,继承了上层接口RemotingService
  • RemotingClient:定义了客户端的接口,继承了上层RemotingService

RemotingServerRemotingClient定义的方法是类似的,主要包含了同步、异步、oneway方式的通信和注册处理器processor,其余的就是针对服务端和客户端特定的接口方法,比如服务端根据requestCode获取处理器的getProcessorPair()方法,客户端获取NameServer地址列表getNameServerAddressList()方法。

  • NettyRemotingAbstract:Netty通信抽象类,定义并封装了服务端与客户端公共方法。这个也是RocketMQ网络通信的核心类。
  • NettyRemotingServer:服务端的实现类,实现了RemotingServer接口,继承NettyRemotingAbstract抽象类。
  • NettyRemotingClient:客户端的实现类,实现类RemotingClient接口,继承NettyRemotingAbstract抽象类。

三、协议设计与编解码

ClientServer之间完成一次消息发送时,需要对发送的消息进行一个协议约定,因此就有必要自定义RocketMQ的消息协议。同时,为了高效地在网络中传输消息和对收到的消息读取,就需要对消息进行编解码。在RocketMQ中,RemotingCommand这个类在消息传输过程中对所有数据内容的封装,不但包含了所有的数据结构,还包含了编码解码操作。

Header字段 类型 Request说明 Response说明
code int 请求操作码,应答方根据不同的请求码进行
不同的业务处理
应答响应码。
0表示成功,非0则表示各种错误
language LanguageCode 请求方实现的语言 应答方实现的语言
version int 请求方程序的版本 应答方程序的版本
opaque int 相当于requestId,在同一个连接上的不同
请求标识码,与响应消息中的相对应
应答不做修改直接返回
flag int 区分是普通RPC还是onewayRPC的标志 区分是普通RPC还是onewayRPC的标志
remark String 传输自定义文本信息 传输自定义文本信息
extFields HashMap 请求自定义扩展信息 响应自定义扩展信息

rocketmqDesign4.png

可见传输内容主要可以分为以下4部分:

  1. 消息长度:总长度,四个字节存储,占用一个int类型;
  2. 序列化类型&消息头长度:同样占用一个int类型,第一个字节表示序列化类型,后面三个字节表示消息头长度;
  3. 消息头数据:经过序列化后的消息头数据;
  4. 消息主体数据:消息主体的二进制字节数据内容;

四、消息的通信方式和流程

RocketMQ消息队列中支持通信的方式主要有同步(sync)、异步(async)、单向(oneway)三种。其中“单向”通信模式相对简单,一般用在发送心跳包场景下,无需关注其Response。这里,主要介绍RocketMQ的异步通信流程。

rocketmqDesign5.png

五、Reactor多线程设计

RocketMQRPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。

rocketmqDesign6.png

上面的框图中可以大致了解RocketMQNettyRemotingServerReactor多线程模型。一个Reactor主线程(eventLoopGroupBoss,即为上面的1)负责监听TCP网络连接请求,建立好连接,创建SocketChannel,并注册到selector上。RocketMQ的源码中会自动根据OS的类型选择NIOEpoll,也可以通过参数配置),然后监听真正的网络数据。拿到网络数据后,再丢给Worker线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),在真正执行业务逻辑之前需要进行SSL验证、编解码、空闲检查、网络连接管理,这些工作交给defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为8)去做。而处理业务操作放在业务线程池中执行,根据RomotingCommand的业务请求码codeprocessorTable这个本地缓存变量中找到对应的processor,然后封装成task任务后,提交给对应的业务processor处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的“M2”)。从入口到业务逻辑的几个步骤中线程池一直再增加,这跟每一步逻辑复杂性相关,越复杂,需要的并发通道越宽。

线程数 线程名 线程具体说明
1 NettyBoss_%d Reactor主线程
N NettyServerEPOLLSelector_%d_%d Reactor线程池
M1 NettyServerCodecThread_%d Worker线程池
M2 RemotingExecutorThread_%d 业务processor处理线程池