kafka集群双活-数据迁移

发布时间 2023-12-08 14:56:56作者: 叶不知秋
一、集群搭建(kafka使用自带的zookeeper)
前提:必须要有java环境
1、下载地址:
http://kafka.apache.org/downloads

2、安装目录 /app/kafka
tar -zxvf kafka_2.12-3.6.1.tgz
 
建立数据和日志存储目录
mkdir -p /app/kafka/data/zookeeper/
mkdir -p /app/kafka/data/kafka/
mkdir -p /app/kafka/log/zookeeper/
mkdir -p /app/kafka/log/kafka/
 
cd /app/kafka/kafka-3.6.1-src
 
vim config/zookeeper.properties
 
配置如下:
dataDir=/app/kafka/data/zookeeper
dataLogDir=/app/kafka/log/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=100
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080

tickTime=2000
initLimit=10
syncLimit=5

server.1=192.168.43.145:12888:13888
server.2=192.168.43.146:12888:13888
server.3=192.168.43.121:12888:13888
 
vim config/server.properties
 
broker.id=1
listeners=PLAINTEXT://192.168.43.145:9092 #另外两个节点修改成对应的ip
log.dirs=/app/kafka/log/kafka
zookeeper.connect=192.168.43.145:2181,192.168.43.146:2181,192.168.43.121:2181
 
 
所有节点启动服务
./zookeeper-server-start.sh -daemon ../config/zookeeper.properties
./kafka-server-start.sh -daemon ../config/server.properties
 
查看是否有报错,如果有报错,把防火墙关闭后,再启动
systemctl stop firewalld
systemctl start firewalld
systemctl status firewalld
 
验证:
sh ./zookeeper-shell.sh 192.168.43.121:2181
ls /brokers/ids
看到
【1,2,3】
说明集群构建成功
 
 
二、数据迁移和同步(mirror-maker mm2)
vim ../config/connect-mirror-maker.properties
 
clusters = A, B     #集群名称,代号

# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for e.g. "A_host1:9092, A_host2:9092, A_host3:9092"
A.bootstrap.servers = A_host1:9092, A_host2:9092, A_host3:9092 #源集群
B.bootstrap.servers = B_host1:9092, B_host2:9092, B_host3:9092 #目标集群

# enable and configure individual replication flows
A->B.enabled = true  #同步方向 A集群->B集群

# regex which defines which topics gets replicated. For eg "foo-.*"
A->B.topics = .*    #同步主题过滤

#B->A.enabled = true  #建议注释掉 双向实时同步可能会导致cup,内存爆掉
#B->A.topics = .*     #建议注释掉 双向实时同步可能会导致cup,内存爆掉

#同步协议,不加默认同步的主题会带上 集群前缀例如 A.topic,B.topic
replication.policy.class=org.apache.kafka.connect.mirror.IdentityReplicationPolicy

# Setting replication factor of newly created remote topics

#两个集群保持一致
replication.factor=1
启动命令:
./connect-mirror-maker.sh -daemon ../config/connect-mirror-maker.propertie
 
 
验证:
查看主题列表:
./kafka-topics.sh --bootstrap-server 192.168.43.145:9092 --list
创建主题test
./kafka-topics.sh --bootstrap-server 192.168.43.145:9092 --create --topic test
 
 
#最大位移
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.43.146:9092 --topic test --time -1
 
#最早位移
./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.43.146:9092 --topic test --time -2