Rocketmq学习1——Rocketmq架构&消息存储&刷盘机制

发布时间 2024-01-07 23:09:21作者: Cuzzz

系列文章目录和关于我

一丶什么是Rocketmq

RocketMQ是一款开源的分布式消息中间件,由阿里巴巴团队最初开发,并于2016年贡献给Apache软件基金会,后成为Apache顶级项目。RocketMQ设计用于处理高并发、高吞吐量的场景,支持丰富的消息交互模式。

以下是RocketMQ的一些关键特性:

  • 分布式架构:RocketMQ采用分布式集群架构,包含多个Broker服务器和NameServer。NameServer用于维护Broker节点和Topic路由信息,而Broker负责储存和转发消息。
  • 消息可靠性和高性能:RocketMQ提供高可靠性的消息传输保证,如消息持久化、消费者消息确认机制和容错机制。同时,它也注重高吞吐量的性能优化。
  • 多种消息模型支持:RocketMQ支持多种消息传递模式,包括同步发送、异步发送和单向发送,也支持广播和集群消费模式。
  • 顺序消息和延迟消息:RocketMQ支持严格的消息顺序和可配置的延迟消息传递。
  • 事务消息:RocketMQ支持事务性消息,允许在分布式系统中进行事务操作而不丢失消息。
  • 多语言客户端:提供多种语言的客户端,如Java、C++、Go等,方便不同开发环境中的集成。
  • 高度可扩展:RocketMQ通过水平扩展Broker和NameServer节点来支持更大规模的系统。
  • 多租户和多命名空间:RocketMQ支持多租户操作,每个租户可以有独立的命名空间,方便资源隔离和管理。
  • 监控和运维工具:提供了丰富的监控和运维工具,帮助管理和监控集群状态。

RocketMQ适用于大规模的分布式系统,广泛运用于实时消息处理、日志聚合、流数据处理、事务性消息传递等场景。由于其高性能和可靠性,它常被用于电商、金融、物联网、大数据等行业。

二丶部署架构与领域模型

1.架构

image

  • NameServer:独立的命名服务,用于管理Broker服务器,是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
  • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName和不同的BrokerId来定义,BrokerId为0 表示Master,非0表示Slave。Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。
  • Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server获取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
  • Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。

在rocketmq的架构中,【无状态】是非常巧妙的点,无状态具备诸多优点:

  • 可伸缩性:无状态节点可以轻松地进行水平扩展(Scale Out),因为没有状态信息需要同步。新的节点可以随时添加到系统中,而不需担心现有状态的迁移问题。
  • 负载均衡:由于每个节点之间是相互独立的,负载均衡器可以简单地将请求分配给任何一个节点,无需考虑节点间的状态同步,这样可以更有效地分散负载。
  • 容错性:在无状态架构中,若某个节点失败,其他节点可以无缝接管处理请求,因为所有节点都是等效的,没有持久状态的依赖。这简化了故障恢复流程。
  • 简化设计:无状态设计通常更加简单,因为开发者无需管理和同步跨多个节点的状态,降低了处理分布式数据一致性的复杂性。
  • 部署灵活性:无状态服务可以在任何时间被部署在任何机器上,不需要考虑状态信息的迁移和同步问题,使得自动化部署更为简单

2.领域模型

image-20240107164901324

  • Producer:消息生产者,负责创建和发送消息到消息服务器。在RocketMQ中,生产者将消息发送到指定的Topic。
  • Consumer:消息消费者,负责从消息服务器接收消息。消费者可以以推(Push)或拉(Pull)的方式获取消息,并处理这些消息。
  • Topic:消息主题,生产者将消息发布到特定的Topic,而消费者则从Topic订阅消息。Topic是消息分类的逻辑概念,用于区分不同类型或用途的消息。
  • MessageQueue:消息队列,是消息的物理载体。在RocketMQ中,一个Topic可以分成多个Queue,这些Queue位于不同的Broker上,以支持并行处理和负载均衡。
  • Subscription Group:订阅组,是逻辑上的消费者分组。在这个分组内部,消费者实例通常以负载均衡的方式来消费消息

三丶消息队列的使用场景

消息队列(Message Queue,MQ)是一种在消息的传输过程中保存消息的容器,它被广泛应用于系统解耦、异步消息、流量削峰等场景。以下是一些常见的消息队列使用场景和用途:

  • 异步处理:当前端系统提交任务后,不需要同步等待任务完成,而是通过消息队列异步处理,提高了系统的响应速度。
  • 系统解耦:在微服务或分布式架构中,各服务之间可以通过消息队列进行通信,而不是直接调用,减少了服务间的依赖性。
  • 流量削峰:在高流量事件如秒杀或促销期间,通过消息队列对请求进行缓存,防止瞬间大流量冲击数据库。
  • 负载均衡:消息队列可以平均分配任务给各个工作节点处理,使得任务处理更加均匀和高效。
  • 日志处理:日志信息可以实时发送到消息队列中,由后台服务进行异步处理,如日志收集、分析和存储。
  • 数据同步:在多个系统或组件之间使用消息队列同步数据,确保数据的一致性。
  • 集成异构系统:消息队列可以作为不同系统或应用之间的中介,实现平台无关性的数据交换。

四丶消息存储机制

image-20240107222440436

上图描述了rocketmq的消息存储机制:

  • CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移 量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为 1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏 移量为1073741824,以此类推。

    在一个broker上面多个topic的消息都使用同一个commitLog进行存储,这样做的好处是:

    • 顺序写盘:由于所有消息都追加到同一个文件,RocketMQ的消息写入操作可以充分利用顺序IO的优势,这比随机写入具有更高的效率。并且不会出现多个topic抢占io资源的情况
    • 简化设计:这种全局的CommitLog设计简化了消息存储的复杂性,开发者无需为每个Topic管理独立的文件或文件集。
    • 易于扩展:共用一个CommitLog文件使得在消息量增长时,RocketMQ能够方便地通过添加更多的Broker节点来扩展系统的消息写入能力,而不需要对每个Topic进行单独的调整。

    然而,这种设计也意味着,当消费者需要读取特定Topic的消息时,不能直接从CommitLog读取,因为CommitLog中包含了来自所有Topic的消息。为了解决这个问题,RocketMQ引入了另外两个重要的组件:ConsumerQueue和IndexFile。

  • ConsumerQueue:

    RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行 如果要遍历commitlog文件根据topic检索消息是非常低效。 Consumer即可根据ConsumeQueue来查找待消费的消息。 其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引:

    1. 保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset
    2. 消息大小size
    3. 消息Tag的HashCode值。

    并且ConsumerQueue中存储的内容是定长的,image-20240107213900491

    这样设计的好处是,当消息消费者拉取消息的时候,broker可用使用内存映射的方式将文件映射到内存中的某一个区域,避免内核空间和用户空间的来回拷贝。

    另外ConsumerQueue中存储了消息tag的hash值,消费者订阅消息的时候可指定tag(如:tagA || tagB)broker会根据这些条件,对ConsumerQueue中的内容进行过滤,然后再将commitLog中真正的内容读取返回给消费者,但是hash是存在冲突可能性的,消费者需要根据消息中的tag进一步过滤。这个tag过滤机制减少了网络资源的浪费!

  • IndexFile:索引文件提供了一种可以通过key或时间区间来查询消息的方法。固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故RocketMQ的索引文件其底层实现为hash索引。

    随口一句基于文件的hash索引,看似很简单,但是其实这里还是有很多难点的:例如总不能一口气读完文件到内存序列化成hashMap然后进行索引吧。如下是rocketmq indexFile的设计:

    40 Byte 的Header用于保存一些总的统计信息,4*500W的 Slot Table并不保存真正的索引数据,而是保存每个槽位对应的单向链表的头。20*2000W 是真正的索引数据,即一个 Index File 可以保存 2000W个索引。

    这样的设计在进行索引的时候可用先根据hash取模得到Slot内容,然后根据slot得到对应链表的偏移,然后在索引中每一个元素都记录上一个元素的偏移,从而实现遍历。

    这个过程并不需要读取所有文件内容到内存,只需使用内存映射MappedByteBuffer按需读取即可!

五丶刷盘机制

不只是rocketmq ,很多中间件都有同步刷盘和异步刷盘

RocketMQ 的刷盘机制是确保消息持久化,以避免进程崩溃或系统故障导致数据丢失的重要手段。RocketMQ 提供了两种刷盘模式:

  • 同步刷盘(SYNC_FLUSH):

    在这种模式下,每当生产者发送消息并得到Broker的响应之前,消息都会被立即写入磁盘(CommitLog文件)。这提供了最高级别的持久化保证,但牺牲了一定的吞吐量与延迟性能。具体过程如下:

    1. 消息被发送到Broker。
    2. Broker将消息追加到内存中的CommitLog(预写日志缓冲区)。
    3. Broker会等待直到CommitLog数据被实际写入磁盘(fileChannel.force(true))。
    4. 消息成功写入磁盘后,Broker给生产者发送确认响应。
    5. 这种模式下,如果在消息确认写入磁盘前系统崩溃,消息不会丢失。
  • 异步刷盘(ASYNC_FLUSH):

    在这种模式下,消息先写入内存映射文件(MappedByteBuffer),然后Broker会在未来的某个时间点异步将数据刷写到磁盘。异步刷盘模式提供了更好的性能和吞吐量,但在Broker进程崩溃或系统故障时,可能会丢失最近写入的一些消息。具体过程如下:

    1. 消息被发送到Broker。
    2. Broker将消息追加到内存中的CommitLog(内存映射文件区)。
    3. Broker直接给生产者发送确认响应(不等待数据实际写入磁盘)。
    4. 内存中的数据将定期通过另外的刷盘线程或操作系统的页缓存机制异步写入磁盘。
    5. 在这种模式下,如果Broker崩溃,那么还未被刷写到磁盘的消息可能会丢失。

在刷盘策略的选择上,需根据具体业务的数据持久化要求和性能需求进行权衡。如果对数据安全性要求极高,可以选择同步刷盘,如果对性能有较高要求,可以选择异步刷盘。RocketMQ 默认使用的是异步刷盘模式。