Kafka-生产者、broker、消费者的调优参数总结

发布时间 2023-10-31 10:28:53作者: 业余砖家

 生产环境下,为了尽可能提升Kafka的整体吞吐量,可以对Kafka的相关配置参数进行调整,以达到提升整体性能的目的。

本文主要从Kafka的不同组件出发,讲解各组件涉及的配置参数和参数含义。

一、生产者producer.properties或者代码中

1acksProducer需要Leader确认的Producer请求的应答数。

1acks = 0: 表示Producer请求立即返回,不需要等待Leader的任何确认。这种方案有最高的吞吐率,但是不保证消息是否真的发送成功。

2acks = -1: 表示分区Leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为Producer请求成功。这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的。

3acks = 1: 表示Leader副本必须应答此Producer请求并写入消息到本地日志,之后Producer请求被认为成功。如果此时Leader副本应答请求之后挂掉了,消息会丢失。这个方案,提供了不错的持久性保证和吞吐。

2buffer.memory:该参数用于指定Producer端用于缓存消息的缓冲区大小,单位为字节,默认值为:3355443232MB

3compression.type:压缩器,目前支持none(不压缩),gzipsnappylz4

4retriesProducer发送消息失败重试的次数。重试时Producer会重新发送之前由于瞬时原因出现失败的消息。瞬时失败的原因可能包括:元数据信息失效、副本数量不足、超时、位移越界或未知分区等。倘若设置了retries > 0,那么这些情况下Producer会尝试重新发送。

5batch.size:默认值为16KBProducer按照batch进行发送,当batch满了后,Producer会把消息发送出去。

6linger.msProducer是按照batch进行发送的,但是还要看linger.ms的值,默认是0,表示不做停留。为了减少了网络IO,提升整体的性能。建议设置5-100ms

二、Brokerserver.properties

1replica.lag.time.max.msISR中,如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值,默认30s

2auto.leader.rebalance.enable:默认是true。自动Leader Partition 平衡。

3leader.imbalance.per.broker.percentage:默认是10%。每个Broker允许的不平衡的Leader的比率。如果每个Broker超过了这个值,控制器会触发Leader的平衡。

4leader.imbalance.check.interval.seconds:默认值300秒。检查Leader负载是否平衡的间隔时间。

5log.segment.bytesKafkalog日志是分成一块块存储的,此配置是指log日志划分 成块的大小,默认值1G

6log.index.interval.bytes:默认4KBKafka里面每当写入了4KB大小的日志(.log),然后就往index文件里面记录一个索引。

7log.retention.hoursKafka中数据保存的时间,默认7天。

8log.retention.minutesKafka中数据保存的时间,分钟级别,默认关闭。

9log.retention.msKafka中数据保存的时间,毫秒级别,默认关闭。

10log.retention.check.interval.ms:检查数据是否保存超时的间隔,默认是5分钟。

11log.retention.bytes:默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的segment

12log.cleanup.policy:默认是delete,表示所有数据启用删除策略;如果设置值为compact,表示所有数据启用压缩策略。

13num.io.threads:默认是8。负责写磁盘的线程数。整个参数值要占总核数的50%

14num.replica.fetchers:副本拉取线程数,这个参数占总核数的50%1/3

15num.network.threads:默认是3。数据传输线程数,这个参数占总核数的50%2/3

16log.flush.interval.messages:强制页缓存刷写到磁盘的条数,默认是long的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。

17log.flush.interval.ms:每隔多久,刷数据到磁盘,默认是null。一般不建议修改,交给系统自己管理。

三、消费者consumer.properties或者代码中

1bootstrap.servers:向Kafka集群建立初始连接用到的host/port列表。

2key.deserializervalue.deserializer:指定接收消息的keyvalue的反序列化类型。一定要写全类名。

3group.id:标记消费者所属的消费者组。

4enable.auto.commit:默认值为true,消费者会自动周期性地向服务器提交偏移量。

5auto.commit.interval.ms:如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s

6auto.offset.reset:当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理?

1earliest:自动重置偏移量到最早的偏移量。

2latest:默认,自动重置偏移量为最新的偏移量。

3none:如果消费组原来的偏移量不存在,则向消费者抛异常。

7offsets.topic.num.partitions__consumer_offsets的分区数,默认是50个分区。

8heartbeat.interval.msKafka消费者和coordinator之间的心跳时间,默认3s。该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 1/3

9session.timeout.msKafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。

10max.poll.interval.ms:消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。

11fetch.min.bytes:默认1个字节。消费者获取服务器端一批消息最小的字节数。

12fetch.max.wait.ms:默认500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。

13fetch.max.bytes:默认值: 52428800字节,即50MB。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes broker configor max.message.bytes topic config)影响。

14max.poll.records:一次poll拉取数据返回消息的最大条数,默认500条。

四、总结

本文总结了Kafka参数,包含了ProducerBrokerConsumer的参数,并且给出了调优Kafka的关键参数配置,可以直接用于生产环境。