kafka幂等性与重复消费

发布时间 2023-03-31 09:51:47作者: Vayne_Chen

1、Kafka生产者幂等性

1)Kafka 消息交付可靠性保障:

  • Kafka 默认是:至少一次
  • 最多一次 (at most once) : 消息可能会丢失,但绝不会被重复发送
  • 至少一次 (at least once) : 消息不会丢失,但有可能被重复发送
  • 精确一次 (exactly once) : 消息不会丢失,也不会被重复发送

2)Kafka实现幂等性

  • 幂等性 (Idempotence) :

    enable.idempotence = true 开启幂等性

    ProducerID:在每个新的Producer初始化时,会被分配一个唯一的ProducerID,这个ProducerID对客户端使用者是不可见的

    SequenceNumber:对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。

    由上面的 ProducerID、SequenceNumber、再加上分区号就可以是实现单个分区里面的消息幂等性,但是这里有两个缺点,ProducerID 在消费者重启的时候可能不一致、只实现了单分区内的幂等    

    性。

  • 事务 (Transaction) :

    为了解决上面的两个问题,就要引入kafka事务,开启事务,必须开启幂等性,事务如下图:

    

 

    设置全局唯一的事务ID --- TransactionID;事务ID与PID绑定,当producer重启后,会根据事务ID查找PID,因此能够保证全局at-exactly-once语义

    kafka的事务跟我们常见数据库事务概念差不多,也是提供经典的ACID,即原子(Atomicity)、一致性 (Consistency)、隔离性 (Isolation) 和持久性 (Durability)

    事务Producer保证消息写入分区的原子性,即这批消息要么全部写入成功,要么全失败。此外,Producer重启回来后,kafka依然保证它们发送消息的精确一次处理。

    这里需要提一下:事务ID是由用户指定的,而PID是生产者创建成功后,producer向kafka申请的

 

 2、kafka重复消费

1)重复消费的原因

  1. 消费者宕机、重启或者被强行kill进程,导致消费者消费的offset没有提交。
  2. 设置enable.auto.commit为true,如果在关闭消费者进程之前,取消了消费者的订阅,则有可能部分offset没提交,下次重启会重复消费。
  3. 消费后的数据,当offset还没有提交时,Partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout.ms时间,那么就会触发reblance重平衡,此时可能存在消费者offset没提交,会导致重平衡后重复消费。
  4. commit offset的保存时间offsets.retention.minutes只有1天,而消息log的保存时间log.retention.hours有7天,如果consumer是手动commit,当长时间没有新消息可以消费,也就长时间没有commit,造成commit offset被broker删除。之后一旦consumer重启,初始化时发现commit offset已经被删除,取到了0去fetch,必定会超出broker的留存消息范围,触发consumer的reset。如果reset=earliest 就会从留存的7天内的最小位消息开始消费,造成大量的重复消费。如果reset=latest 就会从最新消息开始消费,造成会丢失重启期间的消息。

2)处理方案

  1. 前面1、2、3中发生的几率发生概率比较小,但也是可能不完全的消除的,所以最好的办法是在业务端进行幂等性处理
  2. 第4点的话就是配置的问题了,也是比较容易出现,但也是排查比较隐晦的问题,数据全部被消费过, 只是正常的过期删除,所以这并没有任何问题,也不会发生reset,正常情况下,commit offset保存时间可以配置成消息log保存时间的2倍,如果log.retention.hours 仍然为 7 days,那 offsets.retention.minutes 可以配置成 14 days。习惯上把消息log配置保存 3 days,offsets配置保存 6 days