Kafka—生产者和消费者的内部结构

发布时间 2023-07-02 20:57:30作者: 二进制狂人

 

生产者

将数据发布到 Kafka 主题的应用程序称为生产者。应用程序集成了一个Kafka 客户端库来写入 Kafka。编写过程从创建 ProducerRecird开始。

 

Kafka Producers 中的组件/流程

  • 拦截器——可以在发送之前改变记录的拦截器,例如Claim-check-interceptor。
  • 生产者元数据——管理生产者所需的元数据:集群中的主题和分区、充当分区领导者的代理节点等。
  • 序列化器——将对象转换为字节数组的键/值序列化器。
  • Partitioner — 计算给定记录的分区。如果 ProducerRecord 中指定了分区,则分区器将返回相同的分区,否则,它将根据分区策略(轮询、哈希键或自定义分区)为消息键选择分区。org.apache.kafka.clients.producer.internals.DefaultPartitioner, org.apache.kafka.clients.producer.RoundRobinPartitioner, org.apache.kafka.clients.producer.UniformStickyPartitioner, org.apache.kafka.clients.producer.Partitioner (Inteface)
  • Record Accumulator - 累积记录并按主题分区将它们分组为批次。一批未发送的记录保存在缓冲存储器中。一个单独的 I/O 线程负责将这些批次的记录作为请求发送到 Kafka 代理。
  • 事务管理器——管理事务并维护必要的状态以确保幂等生产。
  • 通道选择器——创建一个网络客户端来与代理建立通信。

生产者确认设置

Kafka 生产者将数据写入分区的当前领导代理。如果我们希望消息在被视为成功写入之前必须写入最少数量的副本,我们需要设置在被视为成功写入之前acks需要确认收到消息的代理数量。

 

注意:当acks=all使用 a时replication.factor=N,min.insync.replicas=M我们可以容忍N-M代理出于主题可用性的目的而关闭。

生产者重试

 

重试
delivery.timeout.ms
retry.backoff.ms

max.in.flight.requests.per.connection

幂等生产者

重试发送失败消息的重复风险很小。如果数据被复制到 ISR 但确认未到达生产者并因此重试,则可能会发生这种情况。为了避免这种情况,Kafka 使用了不断增加的 PID 序列。Kafka 总是采用成功写入的最大 PID-Sequence Number 组合。当接收到较低的序列号时,将其丢弃。

enable.idempotence=true
acks=all

Kafka 消息压缩

compression.type
none、gzip、lz4、snappy 和 zstd

如果我们使用生产者级压缩,那么我们应该将代理级设置设置为compression.type=producer. 如果生产者级别压缩和代理级别压缩不匹配,代理将解压并再次压缩。

Kafka 生产者批处理

linger.ms — 发送批次前的等待时间
batch.size — 批次中包含的最大字节数

Broker

 

  • 生产者记录落在套接字接收缓冲区上。网络线程之一拾取消息并将其传递到共享请求队列。
  • 记录由 I/O 线程拾取。它验证数据的CRC。然后将记录写入提交日志。
  • I/O 线程将响应逻辑交给 Purgatory map(管理延迟操作的代理)。此映射等待其他代理确认写入 (ISR)。这个映射是用 ConcurrentHashMap 和 ConcurrentLinkedQueue 实现的。
  • 复制消息后,响应将被放入响应队列。
  • 网络线程从队列中拉取响应并将其放入套接字发送缓冲区。

消费者

从 Kafka 主题读取数据的应用程序称为消费者。应用程序集成了一个 Kafka 客户端库来读取 Apache Kafka。消费者从一个或多个分区中读取,并在每个分区内维护排序。Kafka 消费者实现了“拉模型”。这意味着消费者向代理发送获取请求以获取数据。

 

从上图中,我们可以看到 Kafka 消费者中的以下组件。

  • Coordinator - 管理组成员,偏移量
  • 元数据——管理消费者所需的元数据:集群中的主题和分区、充当分区领导者的代理节点等。
  • 网络客户端——处理对代理的连接/请求
  • Fetcher — 从经纪人那里获取成批的记录。
  • 反序列化器——将字节数组转换为对象的键/值反序列化器。
  • 拦截器——可能改变记录的拦截器

消费者的交付语义

enable.auto.commit=true
auto.commit.interval.ms

  • 最多一次:收到消息后立即提交偏移量。如果出现错误,消息可能会丢失。
  • 至少一次:在处理完消息后提交偏移量。可能导致多次读取。确保消息处理是幂等的。
  • 恰好一次:仅在 Kafka → Kafka 与事务一起流动。

其他配置:

fetch.min.bytes
fetch.max.wait.ms

增加 fetch.min.bytes 和时间将导致吞吐量增加,而减少它将导致更好的延迟。