Kafka 万字精讲|工作五年这些你都知道吗?

发布时间 2024-01-11 09:46:23作者: 白泽talk

前言

本文以 Kafka 官方文档 的内容为基石,结合参考文献处文章和笔者自身实践凝练而成,涵盖内容全面,详略得当。

这也是《一文搞懂》系列的第一篇技术长文,期待您的关注。

一、Kafka 简介

一个十分钟的视频带你了解 Kafka

image-20240109164254021

Apache Kafka 是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

image-20240109160136265

1.1 事件流平台

  • 什么是事件流

    1. 从技术上讲,事件流是以事件流的形式从数据库、传感器、移动设备、云服务和软件应用程序等事件源实时捕获数据的实践;持久存储这些事件流以供以后检索;实时和回顾性地操作、处理事件流并对其做出反应;并根据需要将事件流路由到不同的目标技术。

    2. 因此,事件流可确保数据的连续流动和解释,以便正确的信息在正确的时间出现在正确的地点。

  • 事件流可以做什么

    1. 实时处理支付和金融交易,例如在证券交易所、银行和保险中。
    2. 连接、存储并提供公司不同部门生成的数据。
    3. 作为数据平台、事件驱动架构和微服务的基础。
    4. 收集客户互动和订单并立即做出反应,等等。
  • Apache Kafka 作为一个事件流平台有什么功能:

    1. 发布(写入)和订阅(读取)事件流,包括从其他系统持续导入/导出数据。
    2. 根据需要持久可靠地存储事件流
    3. 在事件发生时或回顾性地处理事件流。

    ? 所有这些功能都是以分布式、高度可扩展、弹性、容错和安全的方式提供的。Kafka 可以部署在裸机硬件、虚拟机和容器上,也可以部署在本地和云端。您可以选择自行管理 Kafka 环境,也可以选择使用各种供应商提供的完全托管服务。

1.2 Kafka 主要概念和术语

Message(消息):作为一个事件流平台,Kafka 中的元数据或者说一条消息(事件)的示例结构如下:

  • Event key(键): "Alice"
  • Event value(值): "Made a payment of $200 to Bob"
  • Event timestamp(时间戳): "Jun. 25, 2020 at 2:06 p.m."

Broker:Kafka 集群中,一个 Kafka 服务器就是一个 Broker。

Topic(主题):用于存储不同类别的消息,是逻辑概念(Kafka 消息数据根据某 Topic 的Partition 区分,分别存储在各个磁盘上)。

Partition(分区):Topic 是分区的,这意味着一个 Topic 分布在位于不同 Kafka 代理上的多个“Partition 桶”中。当新事件发布到 Topic 时,它实际上会附加到 Topic 的 Partition 之一,这确保了同一个桶中的事件是有序的。

Replication:每个 Partition 分区可以有多个副本,分布在不同的 Broker 上。

Leader:每个 Partition 有多个副本,其中有且仅有一个作为 Leader,Leader 是当前负责数据的读写的 Partition

Follower:Follower 跟随 Leader,所有写请求都通过 Leader 路由,数据变更会广播给所有Follower,Follower 与 Leader 保持数据同步。如果 Leader 失效,则从 Follower 中选举出一个新的Leader。

Producer(生产者):将事件发布(写入)到 Kafka 的客户端应用程序,根据某种规则向 Kafka 的一个 Topic 的特定分区的 Leader 副本发布消息。

image-20240109173313099

Consumer(消费者):订阅(读取和处理)这些事件的客户端应用程序,消费或订阅某个 Topic 主题里的消息。消费者只需订阅 Topic,无需关注 Kafka 集群内实例对 Partition 的分配。

Consumer Group(消费者组)

  • 每个 Consumer 属于一个特定的 Consumer Group,多个 Consumer 可以属于同一个 Consumer Group。
  • kafka 消息存放在 Partition 的日志中,同一条消息可以被不同消费者组消费。
  • 但在同一个消费者组中,Partition 内的消息只由组内的某一个消费者实例绑定消费。(⚠️ 那消费者实例没有宕机情况下,为什么还会出现重复消费呢?下文讲解分区同步的时候你就明白了
  • 当消费者组中消费者实例初次连接 Kafka 时,分配 Partition,Consumer 向 Kafka 发送心跳检测,后续超过心跳周期(session.timeout.ms)将默认离线。会触发 rebalance,将该 Partition 重新分配给消费者组中的其他消费者实例。

为了使数据具有容错性和高可用性,下图更清晰展示了多个生产者和多个消费者组同时工作的结构图。

image-20240109172548867

1.3 Zookeeper

在绝大多数搭建 Kafka 集群的情况下,都会选择 Zookeeper 维护 Kafka 集群的状态,然而,从 Kafka 2.8.0版本开始,Kafka 引入了 KRaft(Kafka Raft)协议,作为对Zookeeper 的替代方案,但这不在本文的考虑范畴,故提前说明。

image-20240109220408844

Zookeeper 承担的主要功能

  • 协调 Kafka 的正常运行,Kafka 将元数据信息保存在 ZooKeeper 中,但发送给 Topic 本身的消息数据并不存储在 ZK 中,而是存储在磁盘文件的 Partition 分区日志中。
  • 元数据信息包括:Kafka 有多少个节点、有哪些主题、主题叫什么、有哪些分区、备份信息等。

二、Kafka 集群搭建和使用

? 往期收藏过百的 Docker 学习文章

docker | jenkins 实现自动化部署项目,后端躺着把运维的钱挣了!(上)

docker | jenkins 自动化CI/CD,后端躺着把运维的钱挣了!(下)

2.1 使用 Docker Compose 搭建 Kafka 集群

为了不偏离主题,将重点放在 Kafka 的学习上,这里的示例使用一个容器运行 Zookeeper 负责 Kafka 集群管理,并使用三个 Kafka 容器模拟独立服务器构建集群。(文末的参考文献中罗列了 Zookeeper 集群搭建文章,有需要自行查看

version: '3'
services:
  zookeeper_01:
    image: wurstmeister/zookeeper:3.4.6
    volumes:
      - ./zookeeper_data:/opt/zookeeper-3.4.6/data
    container_name: zookeeper_01
    ports:
      - "12181:2181"
    restart: always

  kafka_02:
    image: wurstmeister/kafka
    container_name: kafka_02
    depends_on:
      - zookeeper_01
    ports:
      - "19092:9092"
    volumes:
      - ./kafka_log_02:/kafka
    environment:
      - KAFKA_BROKER_ID=2
      - KAFKA_LISTENERS=PLAINTEXT://kafka_02:9092
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://$IP:19092
      - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper_01:2181
      - KAFKA_HEAP_OPTS=-Xmx512M -Xms16M
      - KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS=300000
    restart: always

  kafka_03:
    image: wurstmeister/kafka
    container_name: kafka_03
    depends_on:
      - zookeeper_01
    ports:
      - "19093:9092"
    volumes:
      - ./kafka_log_03:/kafka
    environment:
      - KAFKA_BROKER_ID=3
      - KAFKA_LISTENERS=PLAINTEXT://kafka_03:9092
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://$IP:19093
      - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper_01:2181
      - KAFKA_HEAP_OPTS=-Xmx512M -Xms16M
      - KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS=300000
    restart: always

  kafka_04:
    image: wurstmeister/kafka
    container_name: kafka_04
    depends_on:
      - zookeeper_01
    ports:
      - "19094:9092"
    volumes:
      - ./kafka_log_04:/kafka
    environment:
      - KAFKA_BROKER_ID=4
      - KAFKA_LISTENERS=PLAINTEXT://kafka_04:9092
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://$IP:19094
      - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper_01:2181
      - KAFKA_HEAP_OPTS=-Xmx512M -Xms16M
      - KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS=300000
    restart: always

⚠️ Note:将配置文件中 $IP 替换成服务器 IP

运行 docker-compose 命令部署 Kafka 集群

docker-compose up -d

环境变量介绍

  • KAFKA_BROKER_ID:Kafka Broker 的唯一标识符,每个 Broker 在集群中必须有唯一的 ID。

  • KAFKA_LISTENERS:指定 Kafka Broker 监听的地址和端口。在这里,使用了PLAINTEXT协议,表示明文通信。kafka_02是该Kafka Broker的主机名,9092是该 Kafka Broker 监听的容器内部端口。

  • KAFKA_ADVERTISED_LISTENERS:指定 Kafka Broker 对外宣布的地址和端口。这是在集群之外的其他组件连接到 Kafka 时使用的地址。在这里,使用了PLAINTEXT协议,$IP是主机的公共 IP 地址,19092是对外宣布的端口。

  • KAFKA_INTER_BROKER_LISTENER_NAME:指定 Broker 之间通信使用的监听器的名称。在这里,使用了PLAINTEXT协议。

  • KAFKA_ZOOKEEPER_CONNECT:指定 Kafka Broker 连接到 Zookeeper 的地址和端口。在这里,zookeeper_01:2181表示 Zookeeper 的主机名和容器端口。

  • KAFKA_HEAP_OPTS:指定 Kafka Broker 的 Java 虚拟机堆内存选项。在这里,设置了最大堆内存为512MB,初始堆内存为16MB。

  • KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS:Kafka 的默认会话超时时间是6000毫秒(即6秒)。在生产环境中,通常需要将会话超时时间设置得更长一些。

2.2 Kafka 集群的使用

  • 进入集群中任意一个 Kafka 容器的 /bin 目录可以看到各种连接 Kafka 的脚本:

image-20240110143050264

  • Topic 创建:
# 以交互式模式进入容器
docker exec -it 容器id /bin/bash
# 进入Kafka目录
cd /opt/kafka_2.13-2.8.1/
# 创建名为TOPIC_A的主题,3个分区,每个分区1个副本
bin/kafka-topics.sh --create --zookeeper zookeeper_01:2181 --replication-factor 1 --partitions 3 --topic TOPIC_A
# 查看TOPIC_A主题信息
bin/kafka-topics.sh --describe --zookeeper zookeeper_01:2181 --topic TOPIC_A
# 三个Partition分别位于2、3、4三台Broker实例上,同时副本位置和状态信息也可看到
Topic: TOPIC_A TopicId: V565or4ES9-nSFCR4zpcpw PartitionCount: 3 ReplicationFactor: 1    Configs: 
        Topic: TOPIC_A  Partition: 0    Leader: 3       Replicas: 3     Isr: 3
        Topic: TOPIC_A  Partition: 1    Leader: 4       Replicas: 4     Isr: 4
        Topic: TOPIC_A  Partition: 2    Leader: 2       Replicas: 2     Isr: 2
  • 生产与消费消息:
# 进入某个Kafka实例,通过生产者脚本建立连接后输入数据
bin/kafka-console-producer.sh --bootstrap-server kafka_02:9092,kafka_03:9092,kafka_04:9092 --topic TOPIC_A

image-20240110144254164

# 进入某个Kafka实例,通过消费者脚本建立连接后输出数据(当前是消费者脚本运行之后,生产者才逐个输入6个单词,此时消费者端按顺序打印6个字符串)
bin/kafka-console-consumer.sh --bootstrap-server kafka_02:9092,kafka_03:9092,kafka_04:9092 --topic TOPIC_A --from-beginning

image-20240110144315997

2.3 offset 偏移量的提交

Offset(偏移量):用于标识消费者在一个特定分区中已经消费到哪个位置的标识。每个分区都有其独立的偏移量,因此每个消费者都需要跟踪每个分区的偏移量。

Offset 的作用

  1. 消息的顺序: 偏移量可以用于保持消息的顺序。消费者按照消息的偏移量来确保消息的有序性。
  2. 容错和恢复: 消费者通过跟踪偏移量来实现容错和恢复。当消费者组中的某个消费者实例失败并重新启动时,它可以使用先前提交的偏移量来继续消费。
  3. 提交和控制: 消费者可以手动提交偏移量,以便在重新启动时能够从上次提交的位置继续。这也是为了避免重复消费或消息丢失的情况。

--from-beginning 参数:启动消费者时,消费者将从主题的最早消息开始消费,并且不会提交消费的偏移量(offset)。

? 为了印证没有提交 Offset,这里先退出消费者脚本,在生产者脚本端再输入几个字符串

image-20240110150355450

? 重新运行消费者脚本(数据重复消费、乱序)

image-20240110150547752

? 前6个消息被再次消费,证明没有 Offset 没有被提交。

关于乱序的解释

在 Kafka 中,消息是有序的,但是分区之间的消息顺序是不受保证的。每个分区内的消息是按照写入的顺序有序的,但是不同分区之间的消息可能是无序的。

当你启动消费者脚本时,消费者会从各个分区拉取消息,并在本地按照每个分区的顺序进行处理。因此,每个分区内的消息是有序的,但由于从不同分区拉取的消息是交织在一起的,整体上看可能是乱序的。

感兴趣可以关注公众号 「白泽talk」,白泽目前也打算打造一个氛围良好的行业交流群,欢迎加入:622383022。

三、Kafka 高级

3.1 生产者发送数据

3.1.1 发送数据的6个步骤

image-20240110152503378

  1. 生产者查询 Leader: 生产者通过元数据请求(Metadata Request)从 Kafka 集群的某个 Broker 获取关于 Topic、Partition 和 Leader 的元数据信息。这个信息包含了每个 Partition 的 Leader 以及其他副本(Followers)的信息。
  2. 找到 Leader 后发送消息: 生产者向获取到的 Leader 发送消息。Leader 接收到消息后,将消息追加到该 Partition 的本地日志(log)中。
  3. Leader 落盘: Leader 将消息追加到本地日志,并根据配置的刷新策略(flush policy)将消息刷新到磁盘。
  4. Leader 通知 Follower: Leader 将新写入的消息信息通知给该 Partition 的所有 Follower。
  5. Follower 从 Leader 拉取数据: Followers 从 Leader 中拉取新的数据,并将这些数据追加到各自的本地日志中。Follower 会不断地拉取数据以保持与 Leader 同步。
  6. Kafka 向生产者回应 ACK: 生产者发送消息后,可以选择是否等待集群的确认。如果设置为等待确认模式,生产者会等待来自 Leader 的确认(ack)或其他副本的确认。确认表示消息已经被成功写入 Leader 和所有的副本。

3.1.2 生产者分区选择策略

  • 指明 Partition(这里的指明是指第几个分区)的情况下,直接将指明的值作为 Partition 的值
  • 没有指明 Partition 的情况下,但是存在值 Key,此时将 key 的 Hash 值与 Topic 的 Partition 总数进行取余得到 Partition 值
  • 值与 Partition 均无的情况下,第一次调用时随机生成一个整数,后面每次调用在这个整数上自增,将这个值与 Topic 可用的 Partition 总数取余得到 Partition 值,即 round-robin 算法。

3.1.3 消息同步 ISR

在Kafka中,ISR(In-Sync Replicas)是指处于同步状态的副本集合。ISR 包括了那些与 Leader副本保持同步的副本。

当生产者将消息发送到 Kafka 的一个分区时,消息首先被写入 Leader 副本,然后 Leader 副本会将消息同步到 ISR 中的其他副本。只有在 ISR 中的副本都成功地接收并复制了消息之后,Leader 才会向生产者发送确认(acknowledgment)。

ISR 的存在有助于保障数据的可靠性和一致性。如果一个副本不能及时地跟上 Leader 的写入,Kafka 会将其从 ISR 中移除,以确保只有跟上同步的副本才会参与消息的写入。这样可以有效地防止数据的不一致,同时提高系统的可用性。

ISR 机制是 Kafka 的一项关键特性,确保了在节点故障或网络分区的情况下,系统仍然能够保持数据的一致性。

? 示例回顾与分析

image-20240110165219282

这是上文 Kafka 集群创建 TOPIC_A 后查询的结果:

对于TOPIC_APartition 0,Leader 是Broker 3,Replicas 和 In-Sync Replicas (ISR) 也都在Broker 3上。这意味着当你向Partition 0发送消息时,它们将被发送到Broker 3上的Leader。生产者将等待来自 Leader 和 ISR 列表中的副本的确认,然后才将消息视为已确认。

  • 生产者消息重发

image-20240110164439437

如果 Leader 一直没有发送 ack,生产者会不断重试发送消息,直到达到配置的最大重试次数(通过retries配置项指定)。一旦重试次数达到上限,生产者将放弃发送消息,并根据配置中的错误处理策略来处理。

?️ 触发 Leader 重新选举:如果发生 Leader 选举,并且新的 Leader 已经被选举出来,生产者会尝试将消息发送到新的 Leader。这是因为 Leader 选举可能发生在 ISR(In-Sync Replicas)中的某个副本上。生产者会尝试将消息发送到新的 Leader,并等待确认。如果新的 Leader 成功接收并确认消息,那么整个过程就会继续。

生产者的配置中通常包含了一个retry.backoff.ms参数,用于指定在重试发送消息之前等待的时间。这个参数表示在两次重试之间等待的时间间隔,以毫秒为单位。这个时间间隔的设置可以帮助避免在网络瞬时故障或 Leader 选举等情况下过于频繁地重试发送消息。

总之,生产者在发送消息时会进行重试,直到达到最大重试次数,然后根据错误处理策略来处理。

3.1.4 ACK 参数配置

生产者根据确认的方式可以分为三种:

  • acks=0: 生产者不等待确认,直接发送下一条消息。这一操作提供了最低的延迟,Broker 接收到还没有写入磁盘就已经返回,当 Broker 故障时有可能丢失数据
  • acks=1: Leader 在将消息写入本地日志后会向生产者发送确认。如果在 Follower 同步成功之前 Leader 故障,那么将丢失数据。(只是 Leader 落盘

image-20240110170738591

  • acks=all(或acks=-1): Leader 在将消息写入本地日志并等待所有副本确认后才向生产者发送确认。但是如果在 Follower 同步完成后,Broker 发送 Ack 之前,如果 Leader 发生故障,会造成数据重复。(这里的数据重复是因为没有收到,所以继续重发导致的数据重复)

image-20240110170917422

3.2 数据一致性问题

image-20240110171906713

  • LEO(Log End Offset):每个副本最后的一个offset
  • HW(High Watermark):高水位,指代消费者能见到的最大的offset,ISR队列中最小的LEO。

3.2.1 分区故障

  • Follower 分区故障:

    • 当 Follower 发生故障时,它会被临时从 ISR 中提出。在恢复后,Follower 会读取本地磁盘记录的上次的 HW(High Watermark),并将 log 文件高于 HW 的部分截取掉。接着,Follower 从 HW 开始向 Leader 进行同步,等待 Follower 的 LEO(Log End Offset)大于等于该 Partition 的 HW。一旦 Follower 追上 Leader,它就可以重新加入 ISR。
  • Leader 故障:

    • 当 Leader 发生故障时,Kafka 会从 ISR 中选举出一个新的 Leader。为了保证多个副本之间的数据一致性,其余的 Follower 会先将各自的 log 文件高于 HW 的部分截掉,然后从新的 Leader 中同步数据。

一致性和数据不丢失或不重复:

  • ISR 机制确保了在正常情况下,所有的 Follower 副本都能够追赶上 Leader,从而确保了副本之间的数据一致性。然而,ISR 机制并不能解决所有可能的问题,比如数据不丢失或者不重复。
  • 在极端情况下,如果 Leader 在写入数据后发生故障,而所有的 Follower 也同时发生故障,那么可能会导致部分数据的丢失。Kafka 强调的是在正常运行的情况下提供高吞吐和容错性,而不是对所有极端情况都提供强一致性保证。

3.2.2 At Most Once

  • 特性: 这种语义保证消息最多被传递一次,但不保证一定被传递。
  • 适用场景: 适用于对消息重复出现非常敏感,但对消息丢失的容忍度较高的场景。在这种语义下,如果消息在传递过程中丢失,可能不会被再次传递。

? 将服务器 ACK 级别设置为0,可以保证生产者每条消息只会被发送一次,但是不能保证数据不丢失。

3.2.3 At Least Once

  • 特性: 这种语义保证消息至少被传递一次,但允许重复。
  • 适用场景: 适用于对消息重复出现不太敏感,但要求不丢失消息的场景。在消息系统中,可能会存在网络故障、生产者重试等情况,导致消息可能被传递多次。

? 将服务器的 ACK 级别设置为-1(all),可以保证 Producer 到 Server 之间不会丢失数据,但是不能保证数据不重复。

此时再思考最初提出的问题,当消费者实例与 Partition 绑定消费的时候,重复消费更多是因为 Partition 分区同步数据时,出现 rebalance 导致日志存储了多条相同消息。

3.2.4 Exactly Once

  • 特性: 这种语义既保证消息不丢失,又保证消息不重复,是最高级别的语义。

  • 适用场景: 适用于对消息的完整性和一致性要求非常高的场景。在这种语义下,每条消息被确保只传递一次且不会重复。

? Tips:

0.11版本的 Kafka 之前,只能保证数据不丢失,在下游对数据的重复进行去重操作,多余多个下游应用的情况,则分别进行全局去重,对性能有很大影响。

0.11版本的 Kafka,引入了一项重大特性:幂等性,幂等性指代 Producer 不论向 Server发送了多少次重复数据,Server 端都只会持久化一条数据。幂等性结合 At Least Once 语义就构成了 Kafka 的 Exactly Once 语义。

启用幂等性,即在 Producer 的参数中设置 enable.idempotence=true 即可,Kafka 的幂等性实现实际是将之前的去重操作放在了数据上游来做,开启幂等性的 Producer 在初始化的时候会被分配一个 PID,发往同一个 Partition 的消息会附带 Sequence Number ,而 Broker 端会对<PID,Partition,SeqNumber> 做缓存,当具有相同主键的消息的时候,Broker 只会持久化一条。

但 PID 在重启之后会发生变化,同时不同的 Partition 也具有不同的主键,所以幂等性无法保证跨分区跨会话的 Exactly Once。

3.3 日志文件存储

3.3.1 内存日志段与磁盘持久化日志

针对数据写入,Kafka 的设计充分发挥了内存和磁盘的优势,使 Kafka 在高吞吐量和数据持久性之间取得了良好的平衡。

  • 性能优化: 将消息首先追加到当前的活跃日志段(在内存中的磁盘页缓存中)可以提高写入性能,因为写入内存相对于直接写入磁盘来说是更快的操作。这使得 Kafka 能够更高效地处理大量的写入请求。

  • 顺序写入: 将消息追加到本地的活跃日志段通常是一个顺序写操作,而磁盘上的写操作是相对较慢的。通过先将消息追加到内存中的日志段,Kafka 可以最大程度上进行顺序写入,提高写入性能。

  • 批量处理: Kafka 通常会以批量的方式处理消息,将一批消息一次性追加到磁盘日志中,这也有助于提高写入性能。

  • 持久性: 尽管消息首先被追加到内存中的日志段,但由于这些日志段最终会刷新到磁盘上,数据仍然是持久的。

3.3.2 磁盘日志文件结构

在 Kafka 中,消息只有在它们被成功地写入到磁盘并且在多个副本中得到确认之后才被认为是已提交的。下图是磁盘 Topic 日志文件的逻辑结构:

image-20240110223914523

? 查看一下前文创建的 TOPIC_A 主题在 Brober 2 实例上的日志内容:

image-20240110225419216

生产者不断的向 Log 文件追加消息文件,为了防止 Log 文件过大导致定位效率低下,Kafka 的 Log文件以 1G 为一个分界点,当 .log 文件大小超过 1G 的时候,此时会创建一个新的 .log 文件,同时为了快速定位大文件中消息位置,Kafka 采取了分片和索引的机制来加速定位。

在 kafka 的存储 Log 的地方,即文件的地方,会存在消费的偏移量以及具体的分区信息,分区信息的话主要包括 .index.log 文件组成。

分区目的是为了备份,所以同一个分区存储在不同的 Broker 上,即当 TOPIC_A-2 存在当前机器kafka_02上,实际上通过配置多副本,kafka_03 中也可以有这个分区的文件(副本),副本中有一个是 Leader,其余为 Follower。

如果 .log 文件超出大小,则会产生新的 .log 文件,如下所示:

00000000000000000000.index
00000000000000000000.log
00000000000000000006.index
00000000000000000006.log

3.3.3 快速定位数据

在Kafka中,.index 文件存储了消息的偏移量(offset)和对应物理日志文件的索引信息,用于快速定位消息。通过 .index 文件,你可以找到指定 offset 的消息所在的物理日志文件(.log 文件)。下图第一列是 offset,第二列是物理索引,指向消息具体位置

image-20240110230030815

  • ? .index 文件命名规则:

在 Kafka 中,.index 文件的名称与 offset 有一定的关系。每个 .index 文件通常包含一定范围的消息的索引信息,其中包括消息的 offset。.index 文件的名称通常包含该文件所覆盖的消息范围的开始 offset。

.index 文件的命名规则一般是以索引文件所覆盖的消息范围的第一条消息的 offset 作为基准来命名的。例如,如果一个 .index 文件覆盖了消息 offset 1000 到 2000,那么它的文件名可能是类似于 00000000000100000000.index 的形式。

这种设计方便了 Kafka 在进行消息查找时,通过二分查找快速定位到包含目标 offset 的 .index 文件,然后在该文件中查找具体的消息位置。

  • ? 具体来说,你可以按照以下步骤进行判断:
  1. 通过二分查找 .index 文件: 打开相应的 .index 文件,使用二分查找算法查找目标 offset 所在的索引块。索引块包含了一系列索引项,每个索引项对应一个消息的偏移量。通过比较索引项的偏移量,你可以确定目标 offset 在哪个索引块中。
  2. 获取物理日志文件的名称: 找到了目标 offset 所在的索引块后,可以从索引项中获取对应物理日志文件的名称。通常,物理日志文件的名称与索引文件的名称相似,只是文件扩展名不同(例如,.log 文件)。
  3. 在物理日志文件中查找消息: 打开对应的物理日志文件,按照文件格式,找到目标 offset 对应的消息数据。

3.4 消费者消费数据

3.4.1 消费者概念回顾

Consumer(消费者):订阅(读取和处理)这些事件的客户端应用程序,消费或订阅某个 Topic 主题里的消息。消费者只需订阅 Topic,无需关注 Kafka 集群内实例对 Partition 的分配。

Consumer Group(消费者组)

  • 每个 Consumer 属于一个特定的 Consumer Group,多个 Consumer 可以属于同一个 Consumer Group。
  • kafka 消息存放在 Partition 的日志中,同一条消息可以被不同消费者组消费。
  • 但在同一个消费者组中,Partition 内的消息只由组内的某一个消费者实例绑定消费。
  • 当消费者组中消费者实例初次连接 Kafka 时,分配 Partition,Consumer 向 Kafka 发送心跳检测,后续超过心跳周期(session.timeout.ms)将默认离线。会触发 rebalance,将该 Partition 重新分配给消费者组中的其他消费者实例。

3.4.2 消费者 offset 信息存储

image-20240110235725606

在 Kafka 中,__consumer_offsets 目录是用于存储消费者组(Consumer Group)的 offset 信息的地方。每个消费者组的 offset 信息都会被保存在该目录下对应的分区文件中,文件名的结构通常是 __consumer_offsets-xx,其中 xx 表示消费者组 ID

# 如果没有显式地指定消费者组 (groupId),那么 Kafka 会为你的消费者生成一个随机的消费者组 ID。这个消费者组 ID 会存储在消费者的配置中,并且在不同的启动中可能会发生变化。
bin/kafka-console-consumer.sh --bootstrap-server kafka_02:9092,kafka_03:9092,kafka_04:9092 --topic TOPIC_A --from-beginning

这些文件包含了消费者组在不同分区上的消费进度,以及它们所消费的 Topic 和 Partition 信息。Kafka 使用这些信息来维护消费者组的偏移量,以确保每个消费者在重新加入消费组或者发生重新平衡时能够继续从上次的位置消费消息。

image-20240110235702067

? 在 Kafka 中,__consumer_offsets 主题用于存储消费者组的偏移量信息。每个消费者组的每个分区都有一个专用的偏移量提交日志。消费者将当前所消费的消息的偏移量提交到这个专门的主题,以便 Kafka 可以跟踪每个分区的消费进度。

? 上图 __consumer_offsets-20 目录下的日志文件是空的,是因为该分区的消费者没有提交过偏移量,或者提交的偏移量被清理(根据配置的保留策略)。

? 不同于 __consumer_offsets 目录,TOPIC_A-2 中存储的是实际的消息数据,而不是消费者组的偏移量信息。所以,TOPIC_A-2 目录下的文件会包含该分区中的实际消息,因此是有数据的。

image-20240110225419216

3.4.3 消费消息不等于删除旧消息

即使不断消费消息,提交 offset,历史的 segment 文件仍然会被保留在磁盘上。这是因为 Kafka 的日志分段机制导致了历史数据的保留。老的 segment 文件不会被删除,而是保留在磁盘上,以便后续的查询和检索。

这也就促成了:多个消费者组可以独立消费同一个 Topic 的内容,但是同一个消费者组内只能有一个实例消费某一条消息的功能实现。

这个机制保证了 Kafka 的高吞吐量和持久性。但如果你希望限制 Kafka 的存储大小,你可能需要考虑定期清理旧的数据或者采用其他策略来管理磁盘空间。在实际生产环境中,通常会配置相应的参数和策略来控制日志的大小和保留时间。

四、参考文献

五、小结

本文讲述了 Kafka 所涉及的核心概念,如果能看到这里,你定然收获颇丰。当然本篇文章是偏重理论的,虽然结合了大量的命令行效果和图解,难免有些难以消化,希望值得你一个收藏,多多回顾。

? 思考一个问题:

既然你已经明白 Kafka 的设计实现,知晓 Exactly Once 的概念,那如果上游业务存在重试,你如何避免任务丢失和重复消费呢?或者当多人同时提交同一个消费任务,又只需消费其中一份又该如何处理?

Kafka 系列的下一篇文章将侧重于实践,为你解答这个问题。

感兴趣可以关注公众号 「白泽talk」,白泽目前也打算打造一个氛围良好的行业交流群,欢迎加入:622383022。