Kafka消费端抛出异常Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group的解决方案

发布时间 2024-01-04 16:41:30作者: 朱季谦

总结/朱季谦

在一次测试Kafka通过consumer.subscribe()指定偏移量Offset消费过程中,因为设置参数不当,出现了一个异常提示——

[2024-01-04 16:06:32.552][ERROR][main][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator|1050][Consumer clientId=consumer-group.id-1, groupId=group.id] Offset commit with offsets {topic-123-0=OffsetAndMetadata{offset=124, leaderEpoch=null, metadata=''}} failed
org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.

这个异常翻译过来,是“偏移提交不能完成,因为消费者不是自动分区分配的活动组的一部分;这名消费者很可能被踢出了该组合。”

说明出现消费组断开的问题。

出现这个问题,需要关注一个参数properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 500)。

这个ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG是max.poll.interval.ms,表示最大轮询间隔时间,若手动设置为500,意味着消费者在两次连续轮询之间最多只能等待500毫秒。如果超过该最大轮询时间,消费者将被认为已经失去连接,从而触发重新平衡操作,将其分配给其他消费者。

该参数如果设置较小,可能会导致频繁重新平衡,而消费者本身没有问题的情况下,设置过小反而影响频繁导致该消费者无法正常工作,就会抛出以上异常。但是,若设置过大的话,可能导致消费者在长时间无法处理新的记录。

因此,这个参数需要比较合理设置比较好。

同时,还需要关注另外一个参数——

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));

这行代码表示尝试从Kafka的topic中在最多 500 毫秒内从主题中获取的一批记录的对象。

消费者两次连续轮询之间的等待时间,除了跟业务处理有关外,还跟这个拉取条数有关,若一次拉取过多,其轮询时间必然跟着变长。

模拟一下线上拉取代码消费做处理业务逻辑如下——

while (true){
    long start = System.currentTimeMillis();
    ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(500));
    for (ConsumerRecord<Integer, String> record : records){
        //模拟处理业务
      	Thread.sleep(10);
        System.out.println("处理业务中");
    }
    long end = System.currentTimeMillis();
    System.out.println("耗时:" + ( end- start) );
    consumer.commitAsync();
}

max.poll.interval.ms设置的大小,应该在Duration.ofMillis(500)基础上,加上其业务处理耗时的时间。

假如该处理逻辑平均耗时为:1151毫秒,那么max.poll.interval.ms应该设置比1151毫秒大,当然,还需考虑一些额外突发耗时情况在内。

反正不能比1151毫秒小,若比1151毫秒小,就会抛出org.apache.kafka.clients.consumer.CommitFailedException异常。

除了调整max.poll.interval.ms比消费逻辑耗时大之外,还可以调整consumer.poll(Duration.ofMillis(500))和max.poll.records,控制每次poll处理耗时降低。