深入理解Kafka(二)、消费者

发布时间 2023-09-13 23:56:21作者: Stitches

1、客户端开发

一个消费者应该具有的几个步骤:

  • 配置消费者客户端参数以及创建消费者实例;
  • 订阅主题;
  • 拉取消息并消费;
  • 提交消息位移;
  • 关闭消费者实例;

Kafka 消费者示例

public class KafkaConsumerAnalysis {
    public static final String brokerList = "139.159.147.192:9092";
    public static final String topic = "topic-demo";
    public static final String groupId = "group.demo";
    public static final AtomicBoolean isRunning = new AtomicBoolean(true);

    public static Properties initConfig() {
        Properties props = new Properties();
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("bootstrap.servers", brokerList);
        props.put("group.id", groupId);
        props.put("client.id", "consumer.client.id.demo");
        return props;
    }

    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅指定主题
        consumer.subscribe(Arrays.asList(topic));

        try {
            while (isRunning.get()) {
                ConsumerRecords<String, String> records =
                        consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("topic = " + record.topic()
                            + ", partition = " + record.partition()
                            + ", offset = " + record.offset());
                    System.out.println("key = " + record.key()
                            + ", value = " + record.value());
                    //do something to process record.
                }
            }
        } catch (Exception e) {
            log.error("occur exception ", e);
        } finally {
            consumer.close();
        }
    }
}

其中消费者通过 subscribe() 方法订阅主题,该方法有多个重载方法:

public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener);

public void subscribe(Collection<String> topics);

public void subscribe(Pattern pattern, ConsumerRebalanceListener listener);

public void subscribe(Pattern pattern);

消费者还可以直接订阅某些主题的特定分区,但是事先得通过 parittionsFor(xxx) 方法查询某个主题的分区列表,具体操作如下

// 订阅某些主题对应的分区
public void assign(Collection<TopicPartition> partitions);

// 具体实例操作
List<TopicPartition> partitions = new ArrayList<>();
List<TopicPartition> parititonInfors = consumer.parititonsFor(topic);
if (partitionInfors != null) {
    for (PartitionInfo tpInfo : partitionInfors) {
        partitions.add(new TopicPartition(tpInfo.topic(), tpInfo.partition()));
    }
}
consumer.assign(partitions);

2、消费者反序列化

Kafka 提供了 ByteBufferDeserializerByteArrayDeserializerBytesDeserializerDoubleDeserializerFloatDeserializerIntegerDeserializerLongDeserializerShortDeserializerStringDeserializer 等等,分别对应了基本类型的反序列化,这些反序列化类都实现了 Deserializer 接口:

public interface Deserializer<T> extends Closeable {

    /**
     * Configure this class.
     * @param configs configs in key/value pairs
     * @param isKey whether is for key or value
     */
    void configure(Map<String, ?> configs, boolean isKey);

    /**
     * Deserialize a record value from a byte array into a value or object.
     * @param topic topic associated with the data
     * @param data serialized bytes; may be null; implementations are recommended to handle null by returning a value or null rather than throwing an exception.
     * @return deserialized typed data; may be null
     */
    T deserialize(String topic, byte[] data);

    @Override
    void close();
}

列举 StringDeserializer 的源码:

public class StringDeserializer implements Deserializer<String> {
    private String encoding = "UTF8";

    /**
     * 区分是配置 key/value 的反序列化
     */
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("deserializer.encoding");
        if (encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    /**
     * 反序列化消息,返回字符串
     */
    @Override
    public String deserialize(String topic, byte[] data) {
        try {
            if (data == null)
                return null;
            else
                return new String(data, encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
        }
    }

    // 关闭释放资源
    @Override
    public void close() {
        // nothing to do
    }
}

也可以实现自定义的反序列化器,例如上面生产者实现了针对 Company 类的序列化器,下面为 Company 的反序列化器:

public class CompanyDeserailizer implements Deserializer<Company> {
    public void configure(Map<String, ?> configs, boolean isKey) {
    }

    /**
     *  利用 ByteBuffer 包装 data数组,然后解析
     */
    public Company deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }
        if (data.length < 8) {
            throw new SerializationException("Size of data received " +
                    "by DemoDeserializer is shorter than expected!");
        }
        ByteBuffer buffer = ByteBuffer.wrap(data);
        int nameLen, addressLen;
        String name, address;

        nameLen = buffer.getInt();
        byte[] nameBytes = new byte[nameLen];
        buffer.get(nameBytes);
        addressLen = buffer.getInt();
        byte[] addressBytes = new byte[addressLen];
        buffer.get(addressBytes);

        try {
            name = new String(nameBytes, "UTF-8");
            address = new String(addressBytes, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error occur when deserializing!");
        }

        return Company.builder().name(name).address(address).build();
    }

    public void close() {
    }
}

3、消息消费

Kafka 是基于拉模式的消息消费的,核心是通过 public ConsumerRecords<K, V> poll(final long timeout) 方法来获取消息。消息消费后获取到的每条消息类型为 ConsumerRecord,和生产者发送的消息类型 ProducerRecord 相对应,相关数据结构如下:

// 消费者记录
public class ConsumerRecord<K, V> {
    public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
    public static final int NULL_SIZE = -1;
    public static final int NULL_CHECKSUM = -1;

    private final String topic;   // 主题
    private final int partition;  // 分区编号
    private final long offset;    // 偏移量/位移
    private final long timestamp; 
    private final TimestampType timestampType; // CreateTime/LogAppendTime
    private final int serializedKeySize;       // Key序列化
    private final int serializedValueSize;     // Value序列化
    private final Headers headers;
    private final K key;
    private final V value;

    private volatile Long checksum;
}

// 生产者记录
public class ProducerRecord<K, V> {

    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;
}

ConsumerRecords 类提供了一个 record(TopicPartition) 方法来获取消息集中指定分区的消息,示例代码如下:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (TopicPartition tp : records.partitions()) {
    for (ConsumerRecord<String, String> record : records.records(tp)) {
        System.out.println(record.partition() + ":" + record.value());
    }
}

除了按照分区获取,还可以获取指定的主题相关的消息:

// 订阅指定主题
List<String> topicList = Arrays.asList(topic1, topic2);
consumer.subscribe(topicList);

// poll拉取消息消费
try {
    while (isRunning.get()) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (String topic : topicList) {
            for (ConsumerRecord<String, String> record : records.records(topic)) {
                System.out.println(record.topic() + ":" + record.value());
            }
        }
    }
} finally {
    consumer.close();
}

消费位移提交

Kafka 分区中的每条消息都有唯一的 offset,表示该条消息对应的位置。offset 既可以表示消息在分区中的位置,称为偏移量;也可以表示消费者消费到的位置,称为位移。在每次调用 poll() 方法时,会返回没有消费过的方法集合,每一次的消费位移都必须持久化保存,避免消费者重连时找不到起始位置而从头开始消费。

Kafka 提供了如下两个方法来区分提交的消费位移和当前消费到的消费位移:

// 当前消费到的消费位移
public long position(TopicPartition partition)

// 当前提交的消费位移
public OffsetAndMetadata committed(TopicPartition partition)

消费位移的演示:

// 消费者订阅指定主题的分区
TopicPartition tp = new TopicPartition(topic, 0);
consumer.assign(Arrays.asList(tp));
long lastConsumedOffset = -1;
while (true) {
    ConsumerRecords<String, String> records= consumer.poll(1000);
    if (records.isEmpty()) {
        break;
    }
    // 获取指定分区的数据
    List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);
    lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
    // 同步提交消费位移
    consumer.commitSync();
}
System.out.println("consumed offset is " + lastConsumedOffset);
OffsetAndMetadata offsetAndMetadata = consumer.committed(tp);
System.out.println("committed offset is " + offsetAndMetadata.offset());
long partition = consumer.position(tp);
System.out.println("the offset of the next record is " + position);

// output
// consumed offset is 377
// committed offset is 378
// the offset of the next record is 378

消费者消费到该分区的最大偏移量为 377,对应的消费位移为 377;但是显示所提交的消费位移为 378。可以发现当前消费位移和提交的消费位移不一定相同,那么什么时候提交消费位移比较合适?

对消费位移的提交需要仔细考虑,有可能造成重复消费或者消息丢失现象。

如上图,假设当前 poll() 拉取的消息集为 [x+2, x+7],其中 x+2 代表上一次提交的消费位移,说明当前已经完成了 x+1 之前的所有消息的消费;此时假设正好消费到 x+5 的位置。

  1. 如果一拉取消息就提交消费位移

    假如一拉取消息就提交了消费位移,即 x+8,那么此时消费者正好消费到 x+5 时出现了异常退出,等到消费者重连时重新拉取消息,会从 x+8 开始,那么 [x+5, x+8] 之间的消息就丢失了;

  2. 如果等到所有消息消费完再提交

    加入等到所有消息都消费完再提交位移,期间消费者因故障宕机,恢复后又会从头开始重新消费,造成了消息重复消费。

所以 Kafka 默认使用了自动提交方式,由客户端 enable.auto.commit 配置,它不是每消费一条消息就提交一次,而是定期提交,定期的时间参数由 auto.commit.interval.ms 配置,默认值为 5 秒。默认方式下,每隔 5 秒会拉取每个分区中最大的消息位移进行提交,自动提交在 poll() 逻辑中实现。

但是 kafka 的自动提交方式也有可能会导致消息丢失,例如有线程 A和B,线程 A负责不断拉取消息存入本地缓存(BlockingQueue),线程 B负责从缓存中读取消息并进行相应的逻辑处理。

  • 假设目前进行了第 y+1 次拉取,以及第 m次消费位移提交;
  • x+6 之前的位移已经确定提交了,但是线程B 才消费到 x+3 的位置;
  • 线程B 发生异常,内存数据丢失,等到线程B 恢复会从第 m次位移提交处开始拉取消息,那么 [x+3, x+6] 之间的消息就丢失了。

Kafka 还推出了手动提交消费位移方式,包括异步提交和同步提交,其中同步提交又包含了全量处理+同步提交、批量处理+批量提交、单次处理+单次提交、分区消费+分区粒度提交

public class OffsetCommitSync {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-demo";
    public static final String groupId = "group.demo";
    private static AtomicBoolean running = new AtomicBoolean(true);

    public static Properties initConfig() {
        Properties props = new Properties();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return props;
    }

    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
        try {
            while (running.get()) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    //do some logical processing.
                }
                /**
                 * 先对拉取到的每一条消息做逻辑处理,然后对整个消息集做同步提交
                 */
                consumer.commitSync();
            }
        } finally {
            consumer.close();
        }
    }
}

public class OffsetCommitSyncBatch {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-demo";
    public static final String groupId = "group.demo";
    private static AtomicBoolean running = new AtomicBoolean(true);

    public static Properties initConfig() {
        Properties props = new Properties();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return props;
    }

    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));

        final int minBatchSize = 200;
        List<ConsumerRecord> buffer = new ArrayList<>();
        while (running.get()) {

            // 批量存储拉取到的消息,然后批量提交
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                buffer.add(record);
            }
            if (buffer.size() >= minBatchSize) {
                //do some logical processing with buffer.
                consumer.commitSync();
                buffer.clear();
            }
        }
    }
}

另外还可以实现每消费一条消息就提交一次,但是这样会耗费一定的性能,一般不用:

public static void main(String[] args) {
    Properties props = initConfig();
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(topic));

    try {
        while (running.get()) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords =
                        records.records(partition);
                for (ConsumerRecord<String, String> record : partitionRecords) {
                    //do some logical processing.
                    long offset = record.offset();
                    TopicPartition partition = new TopicPartition(record.topic(), record.partition());
                    consumer.commitSync(Collections.singletonMap(partition,
                            new OffsetAndMetadata(offset + 1)));
                }
            }
        }
    } finally {
        consumer.close();
    }
}

按照分区粒度同步提交消费,并提交消费位移:

try {
    while (running.get()) {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        for (TopicPartition partition : records.partitions()) {
            List<ConsumerRecord<String, String>> partitionRecords =
                    records.records(partition);
            for (ConsumerRecord<String, String> record : partitionRecords) {
                //do some logical processing.
            }
            // 获取分区最后一条消息的位移
            long lastConsumedOffset = partitionRecords
                    .get(partitionRecords.size() - 1).offset();
            consumer.commitSync(Collections.singletonMap(partition,
                    new OffsetAndMetadata(lastConsumedOffset + 1)));
        }
    }
} finally {
    consumer.close();
}

Kafka 还支持异步方式的消费位移提交,异步方式下提交线程不会被阻塞,但是可能在提交结果还未返回前开始了新一次的拉取操作。异步提交的性能更好,但是存在消费位移滞后的问题。

public void commitAsync() {}

public void commitAsync(OffsetCommitCallback callback) {}

public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {}

异步消费位移提交有以上三种方式,在消费位移提交成功后触发回调函数执行,示例代码如下,代码中首先通过异步提交消费位移方式,并且在回调中判断是否有异常发生;若异步提交方式出现异常,可以引入重试机制(设置一个递增的序列号维持异步提交的顺序,每次位移提交之后就增加序号相对应的值,如果异常发生判断此时所提交的位移和序号的值的大小,如果前者小于后者就说明已经有更大的位移提交了不需要重试;如果两者相同说明可以重试)。但是重试机制会增加代码逻辑的复杂度,不重试又会增加消息重复消费的概率。

如果消费者异常退出,重复消费的问题就很难避免,因为位移无法及时提交;如果消费者正常退出或者发生再均衡的情况,那么可以在最后通过同步提交方式把关。

public class OffsetCommitAsyncCallback {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-demo";
    public static final String groupId = "group.demo";
    private static AtomicBoolean running = new AtomicBoolean(true);

    public static Properties initConfig() {
        Properties props = new Properties();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return props;
    }

    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));

        try {
            while (running.get()) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    //do some logical processing.
                }
                consumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
                                           Exception exception) {
                        if (exception == null) {
                            System.out.println(offsets);
                        } else {
                            log.error("fail to commit offsets {}", offsets, exception);
                        }
                    }
                });
            }
        } finally {
            consumer.close();
        }

        try {
            while (running.get()) {
                //poll records and do some logical processing.
                consumer.commitAsync();
            }
        } finally {
            try {
                // 退出前使用同步提交位移方式最后把关
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }
    }
}

控制或者关闭消费

有时候出现暂停某些分区的消费而先消费其它分区的情况,当达到一定条件后再恢复这些分区的消费,那么可以使用 pause()resume() 函数实现:

public void pause(Collection<TopicPartition> partitions)

public void resume(Collection<TopicPartition> partitions)

优雅地退出分区消费,可以通过两种方式:

  1. 更新全局 AtomicBoolean 变量 isRunningfalse 来退出 while 循环;
  2. 调用 KafkaConsumer.wakeup() 方法退出 poll() 并抛出 WakeupException 异常,可以选择忽略该异常,只作为跳出循环的一种方式。
consumer.subscribe(Arrays.asList(topic));
try {
    while (running.get()) {
        // consumer.poll(xxx)
        // process the record
        // commit offset
    }
} catch (WakeupException e) {
    // ignore the error
} catch (Exception e) {
    // do some logic process
} finally {
    // maybe commit offset
    consumer.close();
}

指定位移消费

前面介绍了消费位移的提交,正是对消费位移的持久化才能保证消费者在重启后能够继续消费,避免重复消费、消息丢失的情况。在 kafka 中消费者找不到记录的消费位移或者位移越界时,就会根据消费者客户端 auto.offset.reset 配置决定从哪里开始消费,可以是 latest、earliest、none,默认是 latest

消费者可以通过 seek(TopicPartiton partition, long offest) 来指定从分区的哪个位置开始消费,但是执行前必须先指定 poll() 方法确定当前消费者分配到了哪些分区。

另外每个消费者会固定分配一些分区,在指定消费位移前需要确定分区信息,通过 assignment() 方法获取。

public class SeekDemo {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-demo";
    public static final String groupId = "group.demo";

    public static Properties initConfig() {
        Properties props = new Properties();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        return props;
    }

    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
        consumer.poll(Duration.ofMillis(2000));

        // 获取消费者分配到的分区集合信息
        Set<TopicPartition> assignment = consumer.assignment();
        System.out.println(assignment);
        for (TopicPartition tp : assignment) {
            consumer.seek(tp, 10);
        }
        while (true) {
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(1000));
            //consume the record.
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.offset() + ":" + record.value());
            }
        }
    }
}

如果消费组内的消费者在启动时能够找到消费位移,除非发生位移越界,否则 auto.offset.reset 参数不会生效,此时如果想从开头或者结尾开始消费,就需要 seek() 方法设置:

// seek 设置从分区末尾开始消费
public class SeekToEnd {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-demo";
    public static final String groupId = "group.demo";

    public static Properties initConfig() {
        Properties props = new Properties();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        return props;
    }

    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 订阅指定主题消息
        consumer.subscribe(Arrays.asList(topic));
        Set<TopicPartition> assignment = new HashSet<>();
        // 获取消费者对应分区信息
        while (assignment.size() == 0) {
            consumer.poll(Duration.ofMillis(100));
            assignment = consumer.assignment();
        }
        // 获取每个分区的末尾消息偏移量
        Map<TopicPartition, Long> offsets = consumer.endOffsets(assignment);
        for (TopicPartition tp : assignment) {
            // 设置每个分区的消费位移
            consumer.seek(tp, offsets.get(tp) + 1);
        }
        System.out.println(assignment);
        System.out.println(offsets);

        while (true) {
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(1000));
            //consume the record.
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.offset() + ":" + record.value());
            }
        }
    }
}

// seek 设置从分区开头开始消费
public class SeekToEnd {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-demo";
    public static final String groupId = "group.demo";

    public static Properties initConfig() {
        Properties props = new Properties();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        return props;
    }

    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 订阅指定主题消息
        consumer.subscribe(Arrays.asList(topic));
        Set<TopicPartition> assignment = new HashSet<>();
        // 获取消费者对应分区信息
        while (assignment.size() == 0) {
            consumer.poll(Duration.ofMillis(100));
            assignment = consumer.assignment();
        }
        // 获取每个分区的末尾消息偏移量
        Map<TopicPartition, Long> offsets = consumer.beginningOffsets(assignment);
        for (TopicPartition tp : assignment) {
            // 设置每个分区的消费位移
            consumer.seek(tp, offsets.get(tp) + 1);
        }
        System.out.println(assignment);
        System.out.println(offsets);

        while (true) {
            ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(1000));
            //consume the record.
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.offset() + ":" + record.value());
            }
        }
    }
}

存在一些特殊场景,我们不知道具体的消费位置,只知道一个相关的时间点,比如消费昨天 8点后的消息等等,此时需要先根据时间戳确定具体的消息偏移量,即方法 offsetsForTimes(Map<TopicPartition, Long>),示例:

// 构造每个分区的查询时间条件
Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
for (TopicPartitoin tp : assignment) {
    timestampToSearch.put(tp, System.currentTimeMillis()-1*24*3600*1000);
}
// 开始查询
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampToSearch);
for (TopicPartition tp : assignment) {
    OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
    if (offsetAndTimestamp != null) {
        // 设置分区消费位移
        consumer.seek(tp, offsetAndTimestamp.offset());
    }
} 

消息位移也可以保存在数据库中,下次消费时直接读取对应位移的消息消费:

consumer.subscribe(Arrays.asList(topic));
//省略poll()方法以及assignment的逻辑
for(TopicPartition tp: assignment){
    long offset = getOffsetFromDB(tp);//从DB中读取消费位移
    consumer.seek(tp, offset);
}
while(true){
    ConsumerRecords<String, String> records =
            consumer.poll(Duration.ofMillis(1000));
    for (TopicPartition partition : records.partitions()) {
        List<ConsumerRecord<String, String>> partitionRecords =
                records.records(partition);
        for (ConsumerRecord<String, String> record : partitionRecords) {
            //process the record.
        }
        long lastConsumedOffset = partitionRecords
                .get(partitionRecords.size() - 1).offset();
         //将消费位移存储在DB中
        storeOffsetToDB(partition, lastConsumedOffset+1);
    }
}

再均衡

再均衡是指将分区的所有权从一个消费者转移到另一个消费者,使得消费组具备高可用性和伸缩性,使得我们可以方便又安全地删除消费者组内的消费者或者往消费者组内添加消费者。但是再均衡发生期间,涉及到的消费者组不可用,另外当一个分区被重新分配给另一个消费者时,会丢失当前消费者的状态。若转移前消费者还没来得及提交消费位移,转移后就会造成消息的重复消费,应尽量避免这种情况。

有两种方式解决再均衡时的重复消费问题:

  1. 每次操作记录当前分区消费位移,所有操作完毕异步提交消费位移;再均衡发生之前/消费者停止读取之后 同步提交消费位移
public class CommitSyncInRebalance {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-demo";
    public static final String groupId = "group.demo";
    public static final AtomicBoolean isRunning = new AtomicBoolean(true);

    public static Properties initConfig() {
        Properties props = new Properties();
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        return props;
    }

    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
        consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
            // 再均衡发生前/消费者停止读取消息后触发
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                // 再均衡发生之前/消费者停止读取之后调用,同步提交消费位移
                consumer.commitSync(currentOffsets);
            }

            // 重新分配分区后和消费者开始读取消费前调用
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                //do nothing.
            }
        });

        try {
            while (isRunning.get()) {
                ConsumerRecords<String, String> records =
                        consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    // 消费时记录每个分区的消费位移
                    currentOffsets.put(
                            new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(record.offset() + 1));
                }

                // 消费完毕异步提交消费位移
                consumer.commitAsync(currentOffsets, null);
            }
        } finally {
            consumer.close();
        }
    }
}
  1. 再均衡发生前/消息停止读取消息后 持久化保存消费位移到数据库,在再均衡发生后从 DB 中读取消费位移
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
    // 再均衡发生前/消费者停止读取消息后触发
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // store offset in DB
    }

    // 重新分配分区后和消费者开始读取消费前调用
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        for (TopicPartition tp : partitions) {
            consumer.seek(tp, getOffsetFromDB(tp));
        }
    }
});

4、消费者拦截器

类似于生产者拦截器,消费者也有拦截器,消费者拦截器实现了 ConsumerInterceptor 接口,该接口包含以下方法:

// 消费到消息时触发(poll 方法返回前触发),可以用于修改消息内容、按照某种规则过滤消息
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);

// 提交消息位移时触发,可以用于记录所提交的位移信息
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);

public void close();

利用消费者拦截器来实现简单的消息 TTL 功能,自定义的消费者拦截器 ConsumerInterceptorTTL 使用消息的 timestamp 字段来判断是否过期,若消息的时间戳和当前时间戳超过 10秒则判定为过期,该消息就不投递给具体的消费者:

public class ConsumerInterceptorTTL implements
        ConsumerInterceptor<String, String> {
    private static final long EXPIRE_INTERVAL = 10 * 1000;

    @Override
    public ConsumerRecords<String, String> onConsume(
            ConsumerRecords<String, String> records) {
        System.out.println("before:" + records);
        // 获取当前时间
        long now = System.currentTimeMillis();
        Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords
                = new HashMap<>();

        // 获取到消息分区信息
        for (TopicPartition tp : records.partitions()) {
            // 获取到每个分区的消息集合
            List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
            List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();
            // 按照时间过滤消息集合
            for (ConsumerRecord<String, String> record : tpRecords) {
                if (now - record.timestamp() < EXPIRE_INTERVAL) {
                    newTpRecords.add(record);
                }
            }
            if (!newTpRecords.isEmpty()) {
                newRecords.put(tp, newTpRecords);
            }
        }
        return new ConsumerRecords<>(newRecords);
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        offsets.forEach((tp, offset) ->
                System.out.println(tp + ":" + offset.offset()));
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}

使用时通过配置消费者 ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG 信息即可。