RocketMQ

发布时间 2023-08-11 16:43:07作者: 江南烟雨行舟

使用的版本是 4.9.4,下载地址:https://rocketmq.apache.org/zh/release-notes/2022/03/04/4.9.4/

Apache RocketMQ 使用的是 异步通信 方式和 发布与订阅 的消息传输模型。

使用 MQ 的好处:

  • 流量削峰:防止流量突然增加导致应用程序挂了;先将请求写入 MQ,然后再从 MQ 中获取数据。
  • 应用解耦:在当前模块中不需要调用其它模块;用户注册的时候需要发短信,用户模块只需要将手机号放到 MQ 中,短信模块消费 MQ 中的数据就行了。
  • 异步处理:和应用解耦类似,只是其它模块处理完后会将结果返回。

NameServer

主要就是用来:

  1. 注册 Broker 并保存 Broker 的集群名称、IP地址、端口号。
  2. 做健康检查,剔除(2m)不可用的 Broker。
  3. 管理 Topic 的路由信息,就是保存了哪些 Topic 在哪些 Broker 上。

NameServer 集群之间的节点不通信。

Broker

主要就是用来:

  1. 持久化消息和主从同步。
  2. 推送消息,推送方式支持 广播模式集群模式
  3. 过滤消息,推送 满足条件 的消息到指定消费者。
  4. 保证消息顺序性。

发送和接收的基本流程

  1. 生产者先连接上 NameServer,在发送消息的时候会创建 Topic,然后再返回 Topic 关联的 Broker,最后再连接这个 Broker,把消息发送到这个 Broker 里面。
  2. 消费者先连接上 NameServer,通过主题从 NameServer 中获取 Broker,然后再连接这个 Broker,等待消息。

生产者

生产者就是用来生产消息的,send 方法本身支持内部重试,重试逻辑如下:

  • 至多重试 2 次(同步发送为 2 次,异步发送为 0 次)。
  • 如果发送失败,则轮转到下一个 Broker。这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。
  • 如果本身向 broker 发送消息超过 sendMsgTimeout,就不会再重试。

普通消息

普通消息就是普通的一条消息,这个消息会发送到不同的消息队列中,但是不会保证消息被顺序消费

普通消息有三种发送方式:

  • 同步发送:发出一条消息后,必须等待服务端返回响应,才能发送下一条。

    同步发送-6a8c78dae434afe4fbd970a2836f740c.png
  • 异步发送:就是不需要等待服务端返回响应,就直接发送下一条。

    异步发送-c05e8e1111d99d8b8b4626e419e9f8e5.png
  • 单项发送:只是发送消息,不会处理客户端响应。

    Oneway发送-bc1379bd3b8f382c23ff7abac1e0ed95.png
import org.springframework.messaging.support.MessageBuilder;
Message<String> message = MessageBuilder.withPayload("一个普通消息").build();
// asyncSend: 异步
// syncSend:  同步
rocketMQTemplate.syncSend("greetings-topic:tags", message);

延迟消息

给消息设置一个延迟时间,延迟一段时间后,才将消息投递到消费者。

Message<String> message = MessageBuilder.withPayload("一个普通消息").build();
// 支持的延迟等级:1s 5s 10s 30s 1min 2min 3min 4min 5min 6min 7min 8min 9min 10min 20min 30min 1h 2h
// 这里的等级是 3,也就是延迟 10s。
producer.syncSend("greetings-topic:tags", message, 1000, 3);

批量消息

多条消息同时发送,可以减少 API 和网络调用次数。

batch-241308ac9ed97b3a1fbf0e5e6417f74d.png
List<Message<String>> messageList = new LinkedList<>();
for (int i = 0; i < 10; i++) {
    messageList.add(MessageBuilder.withPayload("一个普通消息:" + (i + 1)).build());
}
SendResult result = rocketMQTemplate.syncSend("greetings-topic", messageList);

这些消息必须是同一个主题,而且这些消息会放到同一个消息队列。

过滤消息

通过 tag 将消息交给指定的消费者组处理。

订阅关系一致#正确例子 中写了消费者的 tag 例子,下面是生产者的例子:

rocketMQTemplate.syncSend("topic:tag", "任意消息");

顺序消息

就是把发送的消息都放到同一个消息队列里面,消费顺序和发送顺序是一样的。

Message<String> message = MessageBuilder.withPayload("一个有序的消息").build();
rocketMQTemplate.syncSendOrderly("greetings-topic", message, "用来选择消息队列的 Key 可以是订单 ID");

消费者不能使用并发模式,需要使用顺序模式:

// consumer.registerMessageListener(new MessageListenerConcurrently()
consumer.registerMessageListener(new MessageListenerOrderly()

假设现在的线程数是 10,一共有 4 个消息队列:

在顺序模式下,它的并发数就是 4,就是一个线程对应一个消息队列,处理完一个消息才会处理对应队列的下一个消息。

在并发模式下,它的并发数就是 10,只要任何一个消息队列中还有消息,线程就会处理这个消息。

消费者

同一个组内的消费者,必须保证 订阅关系一致

RocketMQ.消费者订阅2.png

消费者组有两种消费模式:

  1. 集群模式: 需要指定一个消息分配策略,然后根据这个策略将消息投递给对应的的消费者。

    集群消费模式-7f4462d200247db35ca90bb67df7c9b1.png
    // 下面指定了平均分配策略(轮询策略)。
    // 还有机房优先分配策略、一致性 hash 分配策略等。
    consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
    
  2. 广播模式: 直接发送给组内的所有消费者。
    广播消费模式-59abf13c1dfde37423a4b9ac552dc1f3.png

组 A 和组 B 都可以订阅同一个主题 A,这两个组都会收到主题 A 的消息,但是具体怎么分发到消费者,就要看消息模式了。

RocketMQ_消费者订阅.png

订阅关系一致

  • 订阅关系:消费者组名、Topic、Tag。

  • 订阅关系一致:同一个消费者组下的,所有消费者的 Topic、Tag 必须完全一致,如果订阅关系不一致,会导致消费消息紊乱,甚至消息丢失。

正确例子

C1、C2、C3 的订阅关系一致,即 C1、C2、C3 订阅消息的代码必须完全一致,代码示例如下:

@RocketMQMessageListener(topic = "TopicA", selectorExpression = "*")
@RocketMQMessageListener(topic = "TopicB", selectorExpression = "Tag2||Tag3")

错误例子

重平衡(reBalance)

重平衡是针对集群模式的,集群模式也可以叫负载均衡模式。

下面是平均分配算法的重平衡:

一个主题默认有四个队列,当新加入了一个消费者后,两个客户端会平分这四个队列。

消费者扩容1-2409cbfb4077f47f2e473b18eb78656b.jpeg

以此类推当添加了四个消费者后,这四个队列就平均分配给了四个消费者。

消费者扩容2-7d9c1d1dd8caea665a4a74b91f017560.jpeg

当添加第五个消费者后,第五个消费者永远不会收到消息。

消费者扩容3-65293ca6c2a01bf0a186821ba3432417.jpeg

同一个消费者组中的消费者数量,必须小于等于主题队列的数量。

不要随便调整消息队列数量和消费者组内的消费者数量。

消费位点

test_topic 主题有四个消息队列,消费者组 consumer-group 中有两个消费者,分别消费消息队列 0、1 和消息队列 2、3。

批注2023-07-26103855.png
  • 代理者位点:就是消息队列中的最大消息数量。
  • 消费者位点:消费到了这个队列中的第几个数据。
    在同一个消息队列中,如果消息 M2 比 M1 先消费成功,那么消费者位点暂时不会后移,只有 M1 消费成功后才会一起后移。
  • 差值:还有多少数据没有被消费。

集群模式下,消费者点位是由客户端返回给服务端的。

广播模式下,消费位点是由客户端自己保存的。

重复消费

两种情况会导致,消费者重复消费:

  1. 生产者发送了多次,比如程序逻辑问题或者生产者没有收到服务端响应然后进行了重试。
  2. 消费者组发生了重平衡。
    假设集群模式下,C1 正在消费 Q2 队列中的消费位点 2,然后添加了一个消费者 C2,消费者组就会进行重平衡将 Q2 分配给 C2,
    如果这个 C1 没有返回消费成功,那么服务端是不会将消费位点后移的,所以 C2 还是会消费 Q2 队列中的消费位点 2,这样就造成了重复消费。

解决重复消费就是解决幂等性

解决方法就是:把业务的唯一 ID 添加到数据库中的去重表中,添加失败就是重复消费。

消息重试和死信队列

  • 消息重试:消息消费失败后,也会将消费者位点后移,然后将这个消息投递到重试队列。

    重试队列的主题格式是:%RETRY% + 消费者组名

  • 死信队列:消息到达重试次数后,会将消息投递到死信队列。

    死信队列的主题格式是:%DLQ% + 消费者组名

// 指定重试次数。
// 并发模式重试:默认 16 次,每次重试间隔和延迟等级一样,第一次重试间隔就对应延迟等级 3,第二次重试就对应延迟等级 4。
// 顺序模式重试:重试次数是 Integer.MAX_VALUE,重试间隔是立即进行下一次重试。
consumer.setMaxReconsumeTimes(2);

// 在消息的回调方法中可以获取重试次数,可以自己判断到达重试次数后的逻辑,这样可以避免写很多死信消费者。
// msgs.get(0).getReconsumeTimes();

到达重试次数后还没有消费成功,就会发送到死信队列

消息堆积

消费者消费消息的速度太慢了,导致堆积了很多消息没有消费。

有几个解决方案:

  1. 生产者做限流。
  2. 增加消费者的并发数量。
  3. 添加新的消费者。
  4. 扩容消息队列数量。
    如果要缩容,就要等到队列都被消费完成后才去缩容,不然的话队列中的数据都丢失了。

消息补偿

消息补偿就是为了保证消息不丢失,比如保证下面这两种情况不丢:

  1. 生产者发送消息的时候一直发送失败。
  2. RocketMQ 有没有刷盘的时候,服务器宕机了。

解决方案:

  1. 将 RocketMQ 的刷盘机制改为同步刷盘。
  2. 做主从,消息保存到不同的服务器上。
  3. 启动消息追踪机制。
  4. 开发一个消息补偿器。

下面是消息补偿器的流程:

RocketMQ-消息补偿.png

消息补偿器一天执行两三次就行了,没有必要频繁执行;因为消息能不能成功被消费,和消息重发次数是没有任何关系的,消息重发的再快,消息该消费失败还是会消费失败,所以失败的消息太多了要排查业务。

RocketMQ Connect

用来做实时增量同步,比如捕获 MySQL 的 INSERT、UPDATE、DELETE 变化然后同步到 Redis。

overview-195cf6b6249dc8488e721970527cc533.png

每个 Connector 包含:

  • 一个 Connector 连接器,就是用来定义数据从哪复制到哪。
  • 多个 Task 线程,就是真正干活的,真正复制数据的。
Connector-Task-process-deec60b757a7689d932d86e7cfcadfaa.png

高并发设计

RocketMQ-高并发设计.png
  • DNS 轮询:用来做 nginx 集群。

    同一个域名每次请求的 IP 地址不一样,每个 IP 地址有自己的 nginx。

  • 发送到 MQ:把一些耗时操作单独提取出来,这样接口可以立即返回和增加并发,另外也可以对数据库削峰。

有些时候还需要使用分布式锁或分布式事务。

下面是一些对于程序的优化:

  1. 能异步就一步。
  2. 减少 IO。
  3. 控制锁的力度。
  4. 控制事务的力度。

docker 中使用

sudo wget https://archive.apache.org/dist/rocketmq/4.9.4/rocketmq-all-4.9.4-bin-release.zip

sudo unzip rocketmq-all-4.9.4-bin-release.zip

修改文件
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g"


修改配置文件
autoCreateTopicEnable=true
autoCreateSubscriptionGroup=true
# 注册到 NameServer 时候的 IP 地址
brokerIP1=IP

sudo apt-get -y install unzip





启动
nohup sh bin/mqnamesrv &

nohup sh bin/mqbroker -n localhost:9876 -c $ROCKETMQ_HOME/conf/broker.conf &

rocketmq-dashboard.jar --server.port=8001 --rocketmq.config.namesrvAddrs=127.0.0.1:9876

jps -l


https://blog.csdn.net/zbj18314469395/article/details/86064849


sudo wget https://download.bell-sw.com/java/8u382+6/bellsoft-jdk8u382+6-linux-aarch64.tar.gz

Spring Boot 中使用

依赖:

<!-- rocketmq client version 4.9.3 -->
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.2.2</version>
</dependency>

在 application.properties 中添加配置:

# 配置服务地址
# 消费者 只需要配置这个就行了,不用配置别的。
rocketmq.name-server=172.16.70.130:9876;192.168.0.2:9876

############################# 生产者 #############################

# 注意:由数字、字母、下划线、横杠(-)、竖线(|)或百分号组成;不能为空;长度不能超过 255。
rocketmq.producer.group=greetings-producer-group
# 命名空间
rocketmq.producer.namespace=greetings-producer-namespace

# 同步模式下,消息重新发送的最大次数,超过这个次数后会返回失败。
# 默认值:2,即:默认情况下一条消息最多会被投递 3 次;有可能会导致消息重复。
rocketmq.producer.retryTimesWhenSendFailed=2

# 和同步模式的作用一样,只不过这个是在异步模式下有用
rocketmq.producer.retryTimesWhenSendAsyncFailed=2

在 Java 中使用:

// 生产者使用
@Autowired
private RocketMQTemplate rocketMQTemplate;


// 消费者使用
// 下面的 MessageExt 就是 greetings-topic 主题的消息类型。
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(topic = "greetings-topic", consumerGroup = "simple-group")
public class SimpleConsumer implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {}
}

下面是 RocketMQMessageListener 注解的配置:

@RocketMQMessageListener(
  // 命名空间
  namespace = "",
  
  // 必须指定一个消费者分组
  // 消费者组的消费者实例必须订阅完全相同的 Topic
  consumerGroup = "simple-group",
  
  // 主题名称
  topic = "greetings-topic",
  
  // 过滤表达式,只接收满足表达式的消息
  // 也可以写成 TAGA || TAGB
  selectorExpression = "*",

  // 消费模式
  //   ConsumeMode.CONCURRENTLY: 并发处理(默认值)
  //   ConsumeMode.ORDERLY: 顺序处理
  // 注意:ConsumeMode.ORDERLY 不能和 MessageModel.BROADCASTING 一起使用
  consumeMode = ConsumeMode.CONCURRENTLY,
  
  // 消息模式
  // MessageModel.CLUSTERING: 集群模式,  topic 中的一条消息,只会被一个消费组内的 某一个 消费者处理。
  // MessageModel.BROADCASTING: 广播模式,topic 中的一条消息,会被一个消费组内的 所有 消费者处理。
  messageModel = MessageModel.CLUSTERING,

  // 消费线程数
  // consumeMode = ConsumeMode.ORDERLY: 最大值是主题中消息队列的数量,
  //                                    也就是说,消息的最大并发消费数量 = 主题中消息队列的数量
  // consumeMode = ConsumeMode.CONCURRENTLY: 消息的最大并发消费数量
  consumeThreadNumber = 20,
    
  // 消费超时时间(以分钟为单位)
  // 超过这个时间后,就会认为消息消费失败,消费者就会重新发送这条消息到服务端
  consumeTimeout = 15L,

  // 重新投递到服务端的最大次数
  // 消费超时(consumeTimeout)后会重新发送到服务端,到达最大次数后会将消息发送到 DLQ 死信队列
  // 并发模式:-1 表示 16
  // 有序模式:-1 表示 Integer.MAX_VALUE
  // 注意: 广播模式下: 消费失败消息会丢弃
  //      集群模式下: 消费失败的消息需要回传给 Broker
  maxReconsumeTimes = -1,
    
  // 并行模式下的消息重试策略
  // -1: 代表不重试,直接扔到死信队列
  // 0:  代表由 Broker 控制重试频率
  // >0: 代表由客户端控制重试频率
  delayLevelWhenNextConsume = 0,
  // 有序模式下重试
  // 暂停拉取的时间间隔,单位是毫秒
  // 最小值为 10,最大值为 30000
  suspendCurrentQueueTimeMillis = 1000,

  // 投递到服务端的超时时间
  replyTimeout = 3000,

  // 是否启用消息追踪
  enableMsgTrace = false,
  // 消息跟踪的主题名
  customizedTraceTopic = "RMQ_SYS_TRACE_TOPIC",
)

参考资料

rocketmq-dashboard

https://rocketmq.apache.org/zh/docs/4.x/consumer/01concept2

https://rocketmq.apache.org/zh/docs/4.x/bestPractice/07subscribe