kafka3.6单点部署使用

发布时间 2024-01-04 10:45:20作者: 村尚chun叔

一、kafka基本概念

kafka是一个分布式的,分区的消息(官方称之为commit log)服务。它提供一个消息系统应该具备的功能,但是确有着独特的设计。可以这样来说,Kafka借鉴了JMS规范的思想,但是并没有完全遵循JMS规范。
名称 解释
Broker 消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群
Topic Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic
Producer 消息生产者,向Broker发送消息的客户端
Consumer 消息消费者,从Broker读取消息的客户端
ConsumerGroup 每个Consumer属于一个特定的Consumer Group,一条消息可以被多个不同的Consumer Group消费,但是一个Consumer Group中只能有一个Consumer能够消费该消息
Partition 物理上的概念,一个topic可以分为多个partition,每个partition内部消息是有序的
Replica(副本) 一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个 Follower
Leader 每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 Leader
Follower 每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和 Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Leader。

image

服务端(brokers)和客户端(producer、consumer)之间通信通过TCP协议来完成。

二、Topic与Partition

在Kafka中,Topic就是一个主题,生产者往topic里面发送消息,消费者从topic里面捞数据进行消费。

假设现在有一个场景,如果我们现在有100T的数据需要进行消费,但是现在我们一台主机上面并不能存储这么多数据该怎么办呢?

image

其实做法很简单,就是将海量的数据进行切割,并且在Topic中添加分区的概念,每一个分区都对应一台主机,并且存储切分到的数据。

image

这样做的好处是:

  • 分区存储,可以解决一个topic中文件过大无法存储的问题
  • 提高了读写的吞吐量,读写可以在多个分区中同时进行

三、Kafka安装部署

3.1 安装jdk

yum install -y java-1.8.0-openjdk-devel.x86_64 \
&& (
cat <<EOF
#set java environment
JAVA_HOME=/usr/lib/jvm/jre-1.8.0-openjdk
PATH=$PATH:$JAVA_HOME/bin
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME CLASSPATH PATH
EOF
) >> /etc/profile && source /etc/profile && java -version

3.2 安装kafka

kafka3.0之后自带zookeeper

官网下载kafka的压缩包:http://kafka.apache.org/downloads

这里使用 清华大学开源软件镜像站下载
mkdir /usr/local/kafka \
&& cd /usr/local/kafka \
&& wget --no-check-certificate https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/3.6.0/kafka_2.13-3.6.0.tgz  \
&& tar -zvxf kafka_2.13-3.6.0.tgz \
&& rm -rf kafka_2.13-3.6.0.tg

3.3 修改配置文件

vim /usr/local/kafka/kafka_2.13-3.6.0/config/server.properties

#broker.id属性在kafka集群中必须要是唯一
broker.id=0
#kafka部署的机器ip和提供服务的端口号
#注意这里请不要填localhost:9092 ,localhost表示只能通过本机连接,可以设置为0.0.0.0或本地局域网地址
listeners=PLAINTEXT://10.0.0.101:9092   
#kafka的消息存储文件
log.dir=/usr/local/kafka/kafka-logs
#kafka连接zookeeper的地址,/kafka表示所有文件创建在/kafka下,便于管理
zookeeper.connect=localhost:2181/kafka

3.4 添加kafka环境变量

vim /etc/profile

export KAFKA_HOME=/usr/local/kafka/kafka_2.13-3.6.0
export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile

3.5 启动kafka服务器

在启动Kafka服务器之前成功启动Zookeeper服务器,并且它正在监听默认端口(2181)

启动zookeeper
/usr/local/kafka/kafka_2.13-3.6.0/bin/zookeeper-server-start.sh -daemon /usr/local/kafka/kafka_2.13-3.6.0/config/zookeeper.properties

启动kafka
/usr/local/kafka/kafka_2.13-3.6.0/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka_2.13-3.6.0/config/server.properties

验证是否启动成功
netstat -ntpl|grep -E '2181|9092'
tcp6       0      0 10.0.0.101:9092         :::*                    LISTEN      3025/java           
tcp6       0      0 :::2181                 :::*                    LISTEN      2617/java

四、使用kafka

在Kafk中,Topic是一个非常重要的概念,topic可以实现消息的分类,不同消费者订阅不同的topic

image

partition(分区)是kafka的一个核心概念,kafka将1个topic分成了一个或多个分区,每个分区在物理上对应一个目录
分区目录下存储的是该分区的日志段(segment),包括日志的数据文件和两个索引文件

4.1 创建topic

执行以下命令创建名为test的topic,这个topic只有一个partition,并且备份因子也设置为1:
/usr/local/kafka/kafka_2.13-3.6.0/bin/kafka-topics.sh --bootstrap-server 10.0.0.101:9092 --create --topic test --partitions 1

查看当前kafka内有哪些topic
/usr/local/kafka/kafka_2.13-3.6.0/bin/kafka-topics.sh --bootstrap-server 10.0.0.101:9092 --list 

#删除topic
/usr/local/kafka/kafka_2.13-3.6.0/bin/kafka-topics.sh --bootstrap-server 10.0.0.101:9092 --delete --topic test

4.2 生产消息

/usr/local/kafka/kafka_2.13-3.6.0/bin/kafka-console-producer.sh --broker-list 10.0.0.101:9092 --topic test

4.3 消费消息

从消费者连接时开始消费
/usr/local/kafka/kafka_2.13-3.6.0/bin/kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092 --topic test

从头开始消费
/usr/local/kafka/kafka_2.13-3.6.0/bin/kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092 --from-beginning --topic test

几个注意点:

  • 消息会被存储
  • 消息是顺序存储
  • 消息是有偏移量的
  • 消费时可以指明偏移量进行消费

4.4 消费者偏移量

在上面我们展示了两种不同的消费方式,根据偏移量消费和从头开始消费,其实这个偏移量可以我们自己进行维护

我们进入我们在server.properties里面配置的日志文件地址/usr/local/kafka/kafka-logs

我们可以看到默认一共有五十个偏移量地址,里面就记录了当前消费的偏移量。

我们先关注test-0这个文件

image

我们进入这个文件,可以看到其中有个log文件,里面就保存了Topic发送的数据

image

4.5 单播消息

我们现在假设有一个场景,有一个生产者,两个消费者,问:生产者发送消息,是否会同时被两个消费者消费?

创建一个topic
/usr/local/kafka/kafka_2.13-3.6.0/bin/kafka-topics.sh --bootstrap-server 10.0.0.101:9092 --create --topic test2 --partitions 1

创建一个生产者
/usr/local/kafka/kafka_2.13-3.6.0/bin/kafka-console-producer.sh --broker-list 10.0.0.101:9092 --topic test2

分别在两个终端上面创建两个消费者
/usr/local/kafka/kafka_2.13-3.6.0/bin/kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092 --topic test2

image

这里就要引申出一个概念:消费组,当我们配置多个消费者在一个消费组里面的时候,其实只会有一个消费者进行消费

这样其实才符合常理,毕竟一条消息被消费一次就够了

我们可以通过命令--consumer-property group.id=testGroup在设置消费者时将其划分到一个消费组里面

/usr/local/kafka/kafka_2.13-3.6.0/bin/kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092  --consumer-property group.id=testGroup --topic test2

image

这个时候,如果消费里面有一个消费者挂掉了,就会由其他消费者来进行消费

小结一下:两个消费者在同一个组,只有一个能接到消息,两个在不同组或者未指定组则都能收到

4.6 多播消息

当多个消费组同时订阅一个Topic时,那么不同的消费组中只有一个消费者能收到消息。实际上也是多个消费组中的多个消费者收到了同一个消息

消费组1
/usr/local/kafka/kafka_2.13-3.6.0/bin/kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092  --consumer-property group.id=testGroup1 --topic test2
消费组2
/usr/local/kafka/kafka_2.13-3.6.0/bin/kafka-console-consumer.sh --bootstrap-server 10.0.0.101:9092  --consumer-property group.id=testGroup2 --topic test2

4.7 查看消费组信息

# 查看当前所有的消费组
/usr/local/kafka/kafka_2.13-3.6.0/bin/kafka-consumer-groups.sh --bootstrap-server 10.0.0.101:9092 --list
# 查看指定消费组具体信息,比如当前偏移量、最后一条消息的偏移量、堆积的消息数量
/usr/local/kafka/kafka_2.13-3.6.0/bin/kafka-consumer-groups.sh --bootstrap-server 10.0.0.101:9092 --describe --group testGroup

image

4.8 创建多个分区

我们在上面已经了解了Topic与Partition的概念,现在我们可以通过以下命令给一个topic创建多个分区

# 创建两个分区的主题
/usr/local/kafka/kafka_2.13-3.6.0/bin/kafka-topics.sh --bootstrap-server 10.0.0.101:9092 --create --topic test3 --partitions 2
# 查看下创建的topic
/usr/local/kafka/kafka_2.13-3.6.0/bin/kafka-topics.sh --bootstrap-server 10.0.0.101:9092 --list 

五、副本的概念

在创建主题时,除了指明了主题的分区数以外,还指明了副本数,那么副本是一个什么概念呢?

我们现在创建一个主题、两个分区、三个副本的topic(注意:副本只有在集群下才有意义)
/usr/local/kafka/kafka_2.13-3.6.0/bin/kafka-topics.sh \
--bootstrap-server 10.0.0.101:9092 \
--create --topic my-replicated-topic \
--partitions 2 \
--replication-factor 3   

查看分区的详细信息
# 查看topic情况
/usr/local/kafka/kafka_2.13-3.6.0/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

image

leader
kafka的写和读的操作,都发生在leader上。leader负责把数据同步给follower。当leader挂了,经过主从选举,从多个follower中选举产生一个新的leader

follower
接收leader的同步的数据

isr
可以同步和已同步的节点会被存入到isr集合中。这里有一个细节:如果isr中的节点性能较差,会被提出isr集合。