在同一个Java进程中连接多个RocketMQ集群

发布时间 2023-07-30 15:26:24作者: 沐春风-燕南飞
 RocketMQ使用场景:
  1. 作为消费者:用户应用 --> MQ集群A --> 权益应用 消息内容:客户开户/销户相关消息
  2. 作为生产者:权益应用 --> MQ集群B --> 信贷应用 消息内容:卡券事件消息
问题现象:
一个Java进程要连接多个RocketMQ集群时,作为消费者功能无法正常使用,作为生产者功能可以正常使用
原因:RocketMQ Client 有一个核心类 MQClientManager, 在我们需要使用 MQ Client 实例的时候,实际上都是通过它的 getAndCreateMQClientInstance 方法进行创建的;
public MQClientInstance getAndCreateMQClientInstance(ClientConfig clientConfig, RPCHook rpcHook) {
String clientId = clientConfig.buildMQClientId();
MQClientInstance instance = (MQClientInstance)this.factoryTable.get(clientId);
if (null == instance) {
instance = new MQClientInstance(clientConfig.cloneClientConfig(), this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
MQClientInstance prev = (MQClientInstance)this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) {
instance = prev;
log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
} else {
log.info("Created new MQClientInstance for clientId:[{}]", clientId);
}
}
 
return instance;
}
我们可以看到它利用客户的配置信息生成一个固定的 clientId,以此去缓存 factoryTable 中查找,不存在才会创建全新一个实例。
那么,可以理解一个 clientID 仅能存在一个连接实例了,可这个 clientId 是怎么产生的呢?继续跟踪看看这段代码
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
代码层面上对 clientId 进行了约定,格式为 “ClientIp@InstanceName” 格式,当 unitName 不为空的时候还会在后面加上 “@unitName”。
解决方案:

方法1:设定不同的 instanceName:

instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
从系统属性中读取出来的,也就是一般在 JVM 启动时设定的。当然此处可以改变,但是需要评估修改之后的影响。这是为什么多少 RocketMQ Client 都只能连接一个服务器的原因。

方法2:可以将RocketMQ Client升级到4.9.0,解决这个问题的:

public void changeInstanceNameToPID() {
if (this.instanceName.equals("DEFAULT")) {
this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
}
}

方法3:可以设定不同的 unitName

DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1");
producer1.setNamesrvAddr("192.168.2.230:9876");
producer1.setUnitName("producer1");
producer1.start();
 
DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_1");
producer2.setNamesrvAddr("192.168.2.231:9876");
producer2.setUnitName("producer2");
producer2.start();

方法4:使用一个工具,将 MQClientInstance实例 放到工具创建的隔离环境中, 从而实现多个实例完全隔离的效果。

参考:来自平行世界的救赎_学习的锅,跳舞的桌的技术博客_51CTO博客
综上:前两种方法的逻辑是通过修改 clientId 实现多个实例,而方法 4 的逻辑则是 “既然你的缓存已经有这个 key,我就换个缓存”。

代码地址

github: https://github.com/vancoo/multi-mq-demo
gitee: https://gitee.com/vancoo/multi-mq-demo