Kafka基础学习笔记

发布时间 2023-10-29 23:20:46作者: 爱文(Iven)

一、Kafka:

1、简介:

Kafka是由Apache开源,具有分布式、分区的、多副本的、多订阅者,基于Zookeeper协调的分布式处理平台,由Scala和Java语言编写。最大的特性就是可以实时并高速的处理大量数据来满足需求,同时对消息数据进行持久化存储

2、优点:

Kafka与其他消息队列MQ(如ActiveMQ、RabbitMQ等)相比,有以下几个区别:

特点

说明

磁盘存储

Kafka将所有消息都保存在磁盘上,并使用内存映射文件进行读写。这种存储方式可以支持大量的消息数据,而且数据还可以保留很长时间,比如几个月甚至几年。而其他MQ的存储方式多是基于内存,不适合存储大量的数据

大数据处理

Kafka最初是为大数据处理而设计的,它可以非常高效地处理海量数据,适合用于数据仓库、日志处理、统计分析等场景。其他MQ则更多用于异步通信、任务调度、实时通知等领域。

分布式设计

Kafka是基于Zookeeper协调的分布式处理平台,可以在多个节点之间实现消息的高效传输和处理。其他MQ也支持分布式部署,但Kafka在这方面更加优秀

发布/订阅模式

Kafka采用发布/订阅模式,允许多个消费者同时订阅同一个主题,而且Kafka消费者可以自定义从哪个位置开始消费消息。其他MQ中,消费者一般需要通过消费者组来进行负载均衡,而且其他MQ消费者只能从当前位置开始消费

生态系统

Kafka拥有非常丰富的生态系统,包括Kafka Connect、Kafka Streams等工具和框架,可以方便地与大数据处理平台(如Hadoop、Spark、Flink等)进行集成。其他MQ的生态系统相对较小

注:

多线程方式与消息队列方式的区别:

当需要进行任务处理,并且任务处理之间没有明显的依赖关系时,使用消息队列更适合。

当需要对任务进行精细控制,并且任务处理之间存在明显的依赖关系时,使用多线程更适合。

 

二、Kafka基础架构(主题、分区、副本、消息代理):

专业术语

说明

Broker(消息代理,即交换机)

一台kafka服务器就是一个broker,多个broker构成一个kafka集群。一个broker可以容纳多个topic(主题)。负责读写请求,将所有消息都保存在磁盘上。

Producer(生产者)

消息生产者,就是向kafka broker发消息的客户端。

Topic(主题)

Kafka的消息存储到topic中,topic可以理解为一个队列。

Partition(分区)

Partition即分区,最小的并行单位。一个庞大的topic可以分为多个partition,不同的partition可以分布到 Kafka 集群中的多个broker上,而每个partition是一个有序的队列。kafka只能保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体的顺序。

Kafka的分区分配策略通常采用的是轮询(Round-robin)的方式,即依次将分区分配给可用Broker,以实现负载均衡,有助于提高系统的吞吐量和可扩展性,同时,分布在多个 Broker上partition中的Replica(副本)也提供了数据的冗余备份,以保证消息的可靠性和高可用性。

Replica(副本)

——Leader副本

——Follower副本

一个 topic的每个分区都有若干个副本,一个 Leader 和若干个Follower。Leader副本,即多副本中的“主”,负责处理生产者发送的消息和消费者的请求;其他副本被称为 Follower副本,即多副本中的“从”,它们与 Leader 副本保持同步,并在Leader 副本故障时接替其工作。

Offset(偏移量)

当消息存储在Topic的Partition分区中后,消息就不可变更,kafka会为分区中的每一条消息分配一个Offset(偏移量),这个Offset(偏移量)可以记录每条消息的位置,kafka可以通过Offset来对消息进行提取,但是不能对消息的内容进行检索与查询。Offset(偏移量)在每个Partition(分区)中是唯一且递增的,不同分区之间偏移量可以重复

Record(消息)

Kafka的消息是以key-value键值对方式存储的,如果不指定key(即key值为空),Kafka会以轮询(Round-robin)的方式将消息写入不同的分区中。如果指定key值,那么相同key的消息(通过哈希算法计算是否key相同)会被写到同一个分区中

Consumer(消费者)

消息消费者,向kafka broker取消息的客户端。

ConsumerGroup(消费组)

消费者组,由多个Consumer(消费者)组成。

注:

一个Consumer(消费者)可以消费多个分区;

一个分区可以被多个ConsumerGroup(消费组)里的消费者消费;

但是,一个分区不能同时被同一个ConsumerGroup(消费组)里的多个消费者消费,即Kafka 中的一个分区同一时间只能被同一个消费者组中的一个消费者消费

 

三、Kafka安装部署(基于Docker安装):

1、Docker安装zookeeper:

# 安装镜像
docker pull wurstmeister/zookeeper
#启动容器(定义zookeeper启动名:zookeeper_server)
docker run -d --name zookeeper_server  --restart always  -p 2181:2181  wurstmeister/zookeeper
# 查看端口是否启动成功
netstat -anp |grep 2181

2、Docker安装kafka:

# 安装镜像
docker pull wurstmeister/kafka
#启动容器(定义kafka名:kafka_server)
docker run -d --name kafka_server --restart always --log-driver json-file --log-opt max-size=100m --log-opt max-file=2 -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.158.1.130:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.158.1.130:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka
#查看kafka是否运行正常
docker ps -a

相关参数

说明

-e KAFKA_BROKER_ID=0

在kafka集群中,每个kafka都有一个BROKER_ID来区分自己

-e KAFKA_ZOOKEEPER_CONNECT=zookeeper地址:端口/kafka

配置zookeeper管理kafka的路径(不能localhost)

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka地址:端口

把kafka的地址端口注册给zookeeper(不能localhost)

-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092

配置kafka的监听端口

-v /etc/localtime:/etc/localtime

容器时间同步虚拟机的时间

3、测试:

# 进入kafka容器
docker exec -it kafka_server /bin/bash
# 进入bin,注意版本号的区别
cd /opt/kafka_2.13-2.8.1/bin/
#创建一个新主题(test)来存储事件
./kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
#显示新主题:test的分区信息
./kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092
#测试生产消息
./kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
#另起窗口,测试消费消息
./kafka-console-consumer.sh --topic test  --from-beginning --bootstrap-server localhost:9092

4、参考备注:

安装包方式部署kafka

Docker安装部署kafka

Docker安装部署kafka集群

kafka常见命令集锦

 

四、Kafka实战(基于SpringBoot整合):

1、基础配置信息:

POM依赖:

<!-- 引入kafka依赖 -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

YML配置:

spring:
  kafka:
    bootstrap-servers: 192.158.1.130:9092,192.192.158.1.131:9092 # kafka集群信息,多个用逗号间隔
    # 生产者配置
    producer:
      #启用 kafka 事务管理器
      transaction-id-prefix: tx-my-transaction-id
      # 消息发送失败时的最大重试次数,默认值为 0,表示不进行重试。
      # 如果设置为大于 0 的值,当消息发送失败时,Producer 会尝试重新发送消息,直到达到指定的最大重试次数。
      # 但需要注意的是,重试只会发生在未收到服务端响应的情况下,一旦收到响应,无论成功或失败,Producer 都不会再进行重试
      retries: 3
      batch-size: 16384 # 每次批量发送消息的数量,produce积累到一定数据,一次发送批量处理大小,16K
      buffer-memory: 33554432 # produce积累数据一次发送,数据缓冲存储大小,32M
      # 指定了Producer在发送消息后等待的确认数
      # acks=0:当 acks 设置为 0 时,Producer不会等待来自 Broker的任何确认或回应。消息被立即发送到 Broker,但没有任何保证,可能会丢失消息。
      # acks=1:当 acks 设置为 1 时,Producer在成功写入主题分区领导者(Leader)后会收到来自Broker的确认。这种模式下,消息至少会被传递给 Broker 并写入 Leader 分区,但并不能保证消息在备份分区(Replica)中被复制,有一定概率丢失。
      # acks=all:当 acks 设置为 all 或 -1 时,Producer 在写入消息后必须等待 ISR(In-Sync Replica,同步副本集合)中的所有备份分区都收到确认。这是最安全的模式,确保消息不会丢失,但会增加延迟。
      # 通常,推荐将 acks 设置为 1 或 all,具体取决于对可靠性和延迟的要求。如果对延迟敏感,可以使用 acks=1 来降低确认的延迟。如果对数据的完整性和可靠性要求较高,可以使用 acks=all 来确保消息不会丢失。
      acks: all
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #properties:
        # 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
        #linger.ms: 1
        # 自定义分区策略
        #partitioner.class: com.iven.kafkademo.config.CustomizePartitionerConfig
    # 消费者配置
    consumer:
      # 是否自动提交
      #enable-auto-commit: true  # true自动提交消费位移(offset)
      #auto-commit-interval: 1000  # 设置自动提交的时间间隔
      enable-auto-commit: false # false手动提交消费位移(offset)
      # 消费偏移配置:用于指定消费者在启动时或者发生偏移(offset)失效情况下的处理方式
      # none:如果消费者没有有效的消费位移,即没有自动维护偏移量,也没有手动维护偏移量,就抛出异常。这意味着消费者启动时必须有有效的消费位移。
      # earliest:从最早的可用消息开始消费。即在各分区下有提交的offset时:从offset处开始消费;在各分区下无提交的offset时:从头开始消费
      # latest:从最新的消息开始消费。即在各分区下有提交的offset时:从offset处开始消费;在各分区下无提交的offset时:从最新的数据开始消费
      auto-offset-reset: earliest
      # 控制一个poll()调用返回的记录数,即consumer每次批量拉多少条数据。
      #max-poll-records: 10
      # 指定消息key和消息体的编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    # 监听器配置
    listener:
      # record:当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
      # batch:当每一批poll()的数据被ListenerConsumer处理之后提交
      # time:当每一批poll()的数据被ListenerConsumer处理之后,距离上次提交时间大于TIME时提交
      # count:当每一批poll()的数据被ListenerConsumer处理之后,被处理record数量大于等于COUNT时提交
      # count_time:TIME或COUNT中有一个条件满足时提交
      # manual:当每一批poll()的数据被ListenerConsumer处理之后, 手动调用Acknowledgment.acknowledge()后提交
      # manual_immediate:手动调用Acknowledgment.acknowledge()后立即提交,一般推荐使用这种
      ack-mode: manual_immediate
      # 消费监听接口监听的主题不存在时,默认会报错
      missing-topics-fatal: false
      # 使用批量消费需要将listener的type设置为batch,该值默认为single
      #type: batch

 

2、指定消费组的简单生产消费:

相关YML配置:

spring:
  kafka:
    bootstrap-servers: 192.158.1.130:9092,192.192.158.1.131:9092 # kafka集群信息,多个用逗号间隔
    # 生产者配置
    producer:
      retries: 3
      batch-size: 16384 # 每次批量发送消息的数量,produce积累到一定数据,一次发送批量处理大小,16K
      buffer-memory: 33554432 # produce积累数据一次发送,数据缓冲存储大小,32M
      acks: all
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    # 消费者配置
    consumer:
      # 是否自动提交
      enable-auto-commit: true  # true自动提交消费位移(offset)
      auto-commit-interval: 1000  # 设置自动提交的时间间隔
      auto-offset-reset: earliest
      # 指定消息key和消息体的编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    # 监听器配置
    listener:
      # 消费监听接口监听的主题不存在时,默认会报错
      missing-topics-fatal: false
 

生产者:

    @Autowired
    private KafkaTemplate kafkaTemplate;
    
    /**
     * 指定消费组的简单生产消费
     * */
    @GetMapping("/msg1")
    public void msg1(@RequestParam(value = "msg") String msg) {
        /**
         * 不指定分区,则默认通过hash运算和对分区总数取模来确定分区
         *
         * @Param1: 主题
         * @Param2: 消息体
         * */
        Future<SendResult> send = kafkaTemplate.send("ONE_TOPIC", msg);
        try {
            System.out.println(
                    "主题="+send.get().getRecordMetadata().topic()+ "" +
                    "偏移量="+send.get().getRecordMetadata().offset()+"" +
                    "分区="+send.get().getRecordMetadata().partition());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

消费者:

@Component
public class OneTopicConsumer {
    /**
     * 指定一个消费者组:ONE_TOPIC_TEST,一个主题主题:ONE_TOPIC。
     * 如果没指定分区,生产者发送了多个分区,全都能接收
     * @param record
     *
     * 配置offset自动提交:enable-auto-commit=true
     */
    @KafkaListener(topics = "ONE_TOPIC", groupId = "ONE_TOPIC_TEST")
    public void topicListener1(ConsumerRecord<String, String> record) {
        System.out.println(
          "主题="+record.topic()+ "" +
                "偏移量="+record.offset()+"" +
                "分区="+record.partition()+"" +
                "key="+record.key()+"" +
                "内容="+record.value()+"" +
                "创建消息的时间戳="+record.timestamp()+"");
        System.out.println("消费成功");
    }
}

 

3、消息的发布确认机制:

相关YML配置:

spring:
  kafka:
    bootstrap-servers: 192.158.1.130:9092,192.192.158.1.131:9092 # kafka集群信息,多个用逗号间隔
    # 生产者配置
    producer:
      retries: 3
      batch-size: 16384 # 每次批量发送消息的数量,produce积累到一定数据,一次发送批量处理大小,16K
      buffer-memory: 33554432 # produce积累数据一次发送,数据缓冲存储大小,32M
      acks: all
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    # 消费者配置
    consumer:
      # 是否自动提交
      enable-auto-commit: false # false手动提交消费位移(offset)
      auto-offset-reset: earliest
      # 指定消息key和消息体的编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    # 监听器配置
    listener:
      # manual_immediate:手动调用Acknowledgment.acknowledge()后立即提交,一般推荐使用这种
      ack-mode: manual_immediate
      # 消费监听接口监听的主题不存在时,默认会报错
      missing-topics-fatal: false

生产者:

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 消息的发布确认机制
     * */
    @GetMapping("/msg2")
    public void msg2(@RequestParam(value = "msg") String msg) {
        /**
         * 不指定分区,则默认通过hash运算和对分区总数取模来确定分区
         * addCallback回调方法:监控消息是否发送成功,失败补偿机制
         *
         * @Param1: 主题
         * @Param2: 消息体
         * */
        kafkaTemplate.send("TWO_TOPIC", msg).addCallback(
                success  -> {
                    System.out.println("发送消息成功:" + msg);
                },
                failure  -> {
                    // 发送失败的回调处理
                    System.err.println("发送消息失败:" + msg);
                    failure .printStackTrace();
                }
        );
    }

消费者:

    /**
     * 指定一个消费者组:TWO_TOPIC_TEST,一个主题主题:TWO_TOPIC。
     * @param record
     *
     * 配置offset手动提交:enable-auto-commit=false
     */
    @KafkaListener(topics = "TWO_TOPIC", groupId = "TWO_TOPIC_TEST")
    public void topicListener2(ConsumerRecord<String, String> record, Acknowledgment ack) {
        try {
            System.out.println(
                    "主题="+record.topic()+ "" +
                    "偏移量="+record.offset()+"" +
                    "分区="+record.partition()+"" +
                    "key="+record.key()+"" +
                    "内容="+record.value()+"" +
                    "创建消息的时间戳="+record.timestamp()+"");
            //手动提交offset
            ack.acknowledge();
            System.out.println("消费成功");
        } catch (Exception e) {
            System.out.println("消费失败");
            e.printStackTrace();
        }
    }

 

4、批量生产消费:

相关YML配置:

spring:
  kafka:
    bootstrap-servers: 192.158.1.130:9092,192.192.158.1.131:9092 # kafka集群信息,多个用逗号间隔
    # 生产者配置
    producer:
      retries: 3  # 消息发送失败时的最大重试次数
      batch-size: 16384 # 每次批量发送消息的数量,produce积累到一定数据,一次发送批量处理大小,16K
      buffer-memory: 33554432 # produce积累数据一次发送,数据缓冲存储大小,32M
      acks: all
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    # 消费者配置
    consumer:
      # 是否自动提交
      enable-auto-commit: false # false手动提交消费位移(offset)
      auto-offset-reset: earliest
      # 控制一个poll()调用返回的记录数,即consumer每次批量拉多少条数据。
      max-poll-records: 10
      # 指定消息key和消息体的编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    # 监听器配置
    listener:
      # manual_immediate:手动调用Acknowledgment.acknowledge()后立即提交,一般推荐使用这种
      ack-mode: manual_immediate
      # 消费监听接口监听的主题不存在时,默认会报错
      missing-topics-fatal: false
      # 使用批量消费需要将listener的type设置为batch,该值默认为single
      type: batch

生产者:

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 批量生产消费
     *
     * KafkaTemplate没有提供批量发送的方法。事实上,Kafka的java producer自己也没有提供批量发送。
     * 不过由于消息发送是异步而且本身在内存中已经做了批量化处理,因此通常不需要关心发送时是否是批量的。
     * 至于高效的发送消息,Kafka producer提供了一些参数帮助你调优它的性能。
     * 常见的参数包括但不限于:batch.size, linger.ms, compression.type, buffer.memory等
     *
     * */
    @GetMapping("/msg3")
    public void msg3() {
        /**
         * 不指定分区,则默认通过hash运算和对分区总数取模来确定分区
         *
         * @Param1: 主题
         * @Param2: 消息体
         * */
        for (int i = 0; i < 30; i++) {
            kafkaTemplate.send("THERR_TOPIC", "消息-" + i);
        }

    }

消费者:

    /**
     * 指定一个消费者组:THERR_TOPIC_TEST,一个主题主题:THERR_TOPIC。
     * @param record
     *
     * 配置offset手动提交:enable-auto-commit=false
     */
    @KafkaListener(topics = "THERR_TOPIC", groupId = "THERR_TOPIC_TEST")
    public void topicListener3(List<ConsumerRecord<String, String>> record, Acknowledgment ack) {
        try {
            System.out.println("一次获取消息数量:"+record.size());
            //手动提交offset
            ack.acknowledge();
            System.out.println("消费成功");
        } catch (Exception e) {
            System.out.println("消费失败");
            e.printStackTrace();
        }
    }

 

5、指定多消费组方式/多主题方式/多分区方式/特定偏移量位置方式的生产消费:

相关YML配置:

spring:
  kafka:
    bootstrap-servers: 192.158.1.130:9092,192.192.158.1.131:9092 # kafka集群信息,多个用逗号间隔
    # 生产者配置
    producer:
      retries: 3  # 消息发送失败时的最大重试次数
      batch-size: 16384 # 每次批量发送消息的数量,produce积累到一定数据,一次发送批量处理大小,16K
      buffer-memory: 33554432 # produce积累数据一次发送,数据缓冲存储大小,32M
      acks: all
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    # 消费者配置
    consumer:
      # 是否自动提交
      enable-auto-commit: false # false手动提交消费位移(offset)
      auto-offset-reset: earliest
      # 指定消息key和消息体的编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    # 监听器配置
    listener:
      # manual_immediate:手动调用Acknowledgment.acknowledge()后立即提交,一般推荐使用这种
      ack-mode: manual_immediate
      # 消费监听接口监听的主题不存在时,默认会报错
      missing-topics-fatal: false

生产者:

    @Autowired
    private KafkaTemplate kafkaTemplate;   

    /**
     * 指定多消费组方式/多主题方式/多分区方式/特定偏移量位置方式的生产消费
     * */
    @GetMapping("/msg4")
    public void msg4() {
        /**
         * 不指定分区,则默认通过hash运算和对分区总数取模来确定分区
         *
         * @Param1: 主题
         * @Param2: 分区
         * @Param3: key
         * @Param4: 消息体
         * */
        kafkaTemplate.send("FOUR_TOPIC_01", 0,"key0", "FOUR_TOPIC_01在0分区的消息体1");
        kafkaTemplate.send("FOUR_TOPIC_01", 0,"key0", "FOUR_TOPIC_01在0分区的消息体2");
        kafkaTemplate.send("FOUR_TOPIC_01", 0,"key0", "FOUR_TOPIC_01在0分区的消息体3");
        kafkaTemplate.send("FOUR_TOPIC_01", 0,"key0", "FOUR_TOPIC_01在0分区的消息体4");
        kafkaTemplate.send("FOUR_TOPIC_01", 0,"key0", "FOUR_TOPIC_01在0分区的消息体5");
        kafkaTemplate.send("FOUR_TOPIC_01", 1,"key1", "FOUR_TOPIC_01在1分区的消息体1");
        kafkaTemplate.send("FOUR_TOPIC_01", 1,"key1", "FOUR_TOPIC_01在1分区的消息体2");
        kafkaTemplate.send("FOUR_TOPIC_02", 0,"ONE", "FOUR_TOPIC_02在0分区的消息体1");
        kafkaTemplate.send("FOUR_TOPIC_02", 0,"ONE", "FOUR_TOPIC_02在0分区的消息体2");
        kafkaTemplate.send("FOUR_TOPIC_02", 1,"TWO", "FOUR_TOPIC_02在1分区的消息体1");
    }

消费者:

    /**
     * 指定多消费组方式/多主题方式/多分区方式/特定偏移量位置方式的生产消费
     * @param record
     *
     * 多个消费者实例同时监听同一个主题(topic),每个消费组只能消费消息的一个副本,
     * 因此如果您需要多个消费者实例同时消费同一个主题的所有消息,请确保主题的分区数量大于或等于消费者实例的数量。
     */
    //@KafkaListener(topics = {"FOUR_TOPIC_01","FOUR_TOPIC_02"},groupId = "FOUR_TOPIC_TEST")
    @KafkaListeners({
            @KafkaListener(
                    /**
                     * 同时监听FOUR_TOPIC_01和FOUR_TOPIC_02,
                     * 监听FOUR_TOPIC_02的0号和1号分区、
                     * 监听FOUR_TOPIC_01的 0号和1号分区,指向0号分区的offset初始值为3
                     * */
                    //消费组
                    groupId = "FOUR_TOPIC_TEST_ONE",
                    topicPartitions = {
                            /**
                             * 多主题,指定分区
                             * @Param(topic): 主题
                             * @Param(partitions): 分区
                             * @Param(initialOffset): 偏移位置
                             * */
                            @TopicPartition(topic = "FOUR_TOPIC_02", partitions = {"0", "1"}),
                            @TopicPartition(topic = "FOUR_TOPIC_01", partitions = "1",
                                    //指定偏移量位置,offset位置消费
                                    partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "3"))
                    },
                    /**
                     * 在监听器容器中运行的线程数,创建多少个consumer,值必须小于等于Kafk Topic的分区数。
                     * 大于分区数时会有部分线程空闲
                     * */
                    concurrency = "2"
            ),
            @KafkaListener(
                    //消费组
                    groupId = "FOUR_TOPIC_TEST_TWO",
                    topicPartitions = {
                            @TopicPartition(topic = "FOUR_TOPIC_02", partitions = {"0", "1"}),
                            @TopicPartition(topic = "FOUR_TOPIC_01", partitions = "1",
                                    partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "3"))
                    },
                    concurrency = "2"
            )
    })
    public void topicListener4(ConsumerRecord<String, String> record, Acknowledgment ack) {
        try {
            System.out.println(
                    "主题="+record.topic()+ "" +
                    "偏移量="+record.offset()+"" +
                    "分区="+record.partition()+"" +
                    "key="+record.key()+"" +
                    "内容="+record.value()+"" +
                    "创建消息的时间戳="+record.timestamp()+"");
            //手动提交offset
            ack.acknowledge();
            System.out.println("消费成功");
        } catch (Exception e) {
            System.out.println("消费失败");
            e.printStackTrace();
        }
    }

 

6、自定义分区器:

一个庞大的topic可以分为多个partition,不同的partition可以分布到 Kafka 集群中的多个broker上,而每个partition是一个有序的队列。kafka只能保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体的顺序。

(1)、Kafka中常用的分区策略:

分区策略

说明

Round-robin(轮询)

默认的分区策略。按照消费者实例的顺序依次将消息分配给每个分区,确保相对均匀地分布消息负载。

Range(范围)

根据消息键(key)的范围将消息分配给不同的分区。这种策略适用于希望具有相关键的消息被分配到同一个分区的场景。

Sticky(粘性)

在 Sticky 分区策略中,Kafka 将尽量将同一个消费者实例分配给同一个分区,以保持消费者与分区之间的粘性。这种策略适用于需要保持消息处理的顺序性和状态的场景。

Custom(自定义)

可以根据自己的需求实现自定义的分区策略。通过实现 Kafka 的 Partitioner 接口,并覆盖其中的 partition 方法,您可以根据自己的逻辑来决定消息分配的方式。

分区策略只会在创建新的消费者组或添加新的消费者实例时生效。一旦消费者组被创建并且分区被分配,分区策略将不会再次应用,除非有新的消费者实例加入或离开消费者组。

(2)、自定义分区器:

自定义分区策略:

/**
 * @description 自定义分区规则,需要在配置中指定当前类生效
 */
public class CustomizePartitionerConfig implements Partitioner {

    @Override
    public int partition(String topic, Object o, byte[] bytes, Object value, byte[] bytes1, Cluster cluster) {
        String msg = value.toString();
        int partition = 0;
        // 消息种包含KEY01,就发往1号分区
        if(msg.contains("KEY01")){
            partition = 1;
        }
        return 0;
    }

    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> map) {}
}

相关YML配置:

spring:
  kafka:
    bootstrap-servers: 192.158.1.130:9092,192.192.158.1.131:9092 # kafka集群信息,多个用逗号间隔
    # 生产者配置
    producer:
      retries: 3  # 消息发送失败时的最大重试次数
      batch-size: 16384 # 每次批量发送消息的数量,produce积累到一定数据,一次发送批量处理大小,16K
      buffer-memory: 33554432 # produce积累数据一次发送,数据缓冲存储大小,32M
      acks: all
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      properties:
        # 批量发送,延迟为1毫秒,启用该功能能有效减少生产者发送消息次数,从而提高并发量
        linger.ms: 1
        # 自定义分区策略
        partitioner.class: com.iven.kafkademo.config.CustomizePartitionerConfig
    # 消费者配置
    consumer:
      # 是否自动提交
      enable-auto-commit: false # false手动提交消费位移(offset)
      auto-offset-reset: earliest
      # 指定消息key和消息体的编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    # 监听器配置
    listener:
      # manual_immediate:手动调用Acknowledgment.acknowledge()后立即提交,一般推荐使用这种
      ack-mode: manual_immediate
      # 消费监听接口监听的主题不存在时,默认会报错
      missing-topics-fatal: false

 

7、消息转发处理:

应用A从TopicA获取到消息,经过处理后转发到TopicB,再由应用B监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息的转发。

消费者:

    /**
     * 消息处理后转发到另一个Topic
     * @param record
     *
     * 配置offset手动提交:enable-auto-commit=false
     */
    @KafkaListener(topics = "FIVE_TOPIC", groupId = "FIVE_TOPIC_TEST")
    @SendTo("GET_TOPIC")
    public String topicListener5(ConsumerRecord<String, String> record, Acknowledgment ack) {
        System.out.println("主题="+record.topic()+ "" +
                "偏移量="+record.offset()+"" +
                "分区="+record.partition()+"" +
                "key="+record.key()+"" +
                "内容="+record.value()+"" +
                "创建消息的时间戳="+record.timestamp()+"");
        //手动提交offset
        ack.acknowledge();
        System.out.println("消费成功");
        return "从ONE_TOPIC消费完后转发的消息";
    }

    @KafkaListener(topics = "GET_TOPIC", groupId = "FIVE_TOPIC_TEST")
    public void listenTwo(ConsumerRecord<String, String> record, Acknowledgment ack){
        System.out.println("GET_TOPIC接收消息:" + record.value());
        ack.acknowledge();
    }

 

8、定时启动消费者监听器(KafkaListener):

默认情况下,当消费者项目启动的时候,监听器就开始工作,监听消费发送到指定Topic的消息,当我们不想让监听器立即工作,可使用KafkaListenerEndpointRegistry指定的时间点开始监听

监听器定时配置:

@EnableScheduling
@Component
public class CronListenConfig {
    /**
     * @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,
     * 而是会被注册在KafkaListenerEndpointRegistry中,
     * 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean
     */
    @Resource
    private KafkaListenerEndpointRegistry registry;

    @Autowired
    private ConsumerFactory consumerFactory;

    // 监听器容器工厂(设置禁止KafkaListener自启动)
    @Bean
    public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
        ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
        container.setConsumerFactory(consumerFactory);
        container.setAutoStartup(false);  //禁止KafkaListener自启动
        return container;
    }


    /**
     * 定时启动监听器
     *
     */
    @Scheduled(cron = "*/10 * * * * ?")
    public void startListener() {
        System.out.println("启动监听器..." + new Date());
        // "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器
        if (!registry.getListenerContainer("timingConsumer").isRunning()) {
            System.out.println("启动");
            registry.getListenerContainer("timingConsumer").start();
        }
        registry.getListenerContainer("timingConsumer").resume();
    }

    /**
     * 定时停止监听器
     */
    @Scheduled(cron = "0 53 1 * * ? ")
    public void shutDownListener() {
        System.out.println("关闭监听器..." + new Date());
        registry.getListenerContainer("timingConsumer").pause();
    }

}

消费者:

    @KafkaListener(id="timingConsumer", topics = "SIX_TOPIC", groupId = "SIX_TOPIC_TEST",
            //配置定时启动
            containerFactory = "delayContainerFactory")
    public void topicListener6(ConsumerRecord<String, String> record){
        System.out.println("主题="+record.topic()+ "" +
                "偏移量="+record.offset()+"" +
                "分区="+record.partition()+"" +
                "key="+record.key()+"" +
                "内容="+record.value()+"" +
                "创建消息的时间戳="+record.timestamp()+"");
        System.out.println("消费成功");
    }

 

9、消息事务机制:

Kafka默认情况下是不开启事务的。在Kafka中,事务是一种保证消息的原子性和一致性的机制。通过事务,可以确保一组消息要么全部成功写入主题,要么全部失败。这对于一些需要确保数据完整性和可靠性的场景非常重要。然而,默认情况下,Kafka并没有开启事务功能。这意味着生产者在发送消息时,并没有提供事务支持,也就是说,如果发送过程中出现了错误,消息可能只会部分写入主题,而无法回滚已经写入的消息。

Kafka的事务功能主要是为了保证生产者发送消息的原子性和一致性,而不是针对消费组的事务支持。消费者可以通过设置适当的偏移量提交策略来实现消费的可靠性,但它们无法直接影响生产者的事务操作。

相关YML配置:

spring:
  kafka:
    bootstrap-servers: 192.158.1.130:9092,192.192.158.1.131:9092 # kafka集群信息,多个用逗号间隔
    # 生产者配置
    producer:
      #启用 kafka 事务管理器
      transaction-id-prefix: tx-my-transaction-id //自定义id
      retries: 3  # 消息发送失败时的最大重试次数
      batch-size: 16384 # 每次批量发送消息的数量,produce积累到一定数据,一次发送批量处理大小,16K
      buffer-memory: 33554432 # produce积累数据一次发送,数据缓冲存储大小,32M
      acks: all
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    # 消费者配置
    consumer:
      # 是否自动提交
      enable-auto-commit: false # false手动提交消费位移(offset)
      auto-offset-reset: earliest
      # 指定消息key和消息体的编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    # 监听器配置
    listener:
      # manual_immediate:手动调用Acknowledgment.acknowledge()后立即提交,一般推荐使用这种
      ack-mode: manual_immediate
      # 消费监听接口监听的主题不存在时,默认会报错
      missing-topics-fatal: false

生产者:

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 声明事务:后面报错消息不会发出去
     * 消息事务机制方式一
     * @Transactional
     *
     */
    @Transactional
    @GetMapping("/msg7")
    public void msg7() {
        kafkaTemplate.send("SEVEN_TOPIC", "第一个消息").addCallback(
                success  -> {
                    System.out.println("第一个发送消息成功:");
                },
                failure  -> {
                    // 发送失败的回调处理
                    System.err.println("第一个发送消息失败:");
                    failure .printStackTrace();
                }
        );
        Integer i = 1 / 0;  //异常信息
        kafkaTemplate.send("SEVEN_TOPIC", "第二个消息");
    }

    /**
     * 声明事务:后面报错消息不会发出去
     * 消息事务机制方式二
     *
     */
    @GetMapping("/msg8")
    public void msg8() {
        kafkaTemplate.executeInTransaction(operations -> {
            operations.send("SEVEN_TOPIC", "第一个消息");
            Integer i = 1 / 0;  //异常信息
            operations.send("SEVEN_TOPIC", "第二个消息");
            return true;
        });
//        kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
//            @Override
//            public Object doInOperations(KafkaOperations kafkaOperations) {
//                kafkaOperations.send("SEVEN_TOPIC", "第一个消息");
//                int a = 1 / 0;
//                kafkaOperations.send("SEVEN_TOPIC", "第二个消息");
//                return true;
//            }
//        });
    }

 

五、相关问题解决:

Kafka常见面试题

 

六、参考:

官方文档

kafka3.x详解

SpringBoot整合kafka消费者注解详解

SpringBoot-Kafka

SpringBoot下,Kafka可靠性应用方案