Kafka Connect用法

发布时间 2024-01-03 15:34:30作者: Marydon

1.情景展示

传统的消息中间件,如果我们想要应用到自己的系统当中,就必须在框架里面进行集成。

也就是说,必须将连接消息中间件的代码(如:生产者和消费者)嵌入到我们的web系统中,这就是所谓的硬编码。

这种硬编码的方式,侵入性强,开发成本相对较高,它要求开发人员不仅要关注业务,还要兼顾技术实现。

这也与我们开发降低系统耦合性(解耦)的原则相悖。

那能不能使消息的生产和消费脱离系统而存在呢?

2.具体分析

我们知道:kafka是可以当作消息中间件来使用的。

而kafka connect就能很好的满足我们的需求。

我们可以根据kafka connect自定义开发组件来实现数据的同步。

3.kafka connect

Kafka Connect是一个高伸缩性、高可靠性的数据集成工具,用于在Apache Kafka与其他系统间进行数据搬运以及执行ETL操作。

Kafka Connect主要由source connector和sink connector组成。事实上,几乎大部分的ETL框架都是由这两大类逻辑组件组成的,如Apache Flume、Kettle等。

source connector负责把输入数据从外部系统中导入到Kafka中,而sink connector则负责把输出数据导出到其他外部系统。

接口文档:https://kafka.apache.org/35/generated/connect_rest.yaml

启动

运行前提:

运行zookeeper和kafka。

启动kafka connect。

kafka connect的启动方式有两种:单独部署和集群部署。

启动文件说明:

windows版

单独部署:KAFKA_HOME/bin/windows/connect-standalone.bat

集群部署:KAFKA_HOME/bin/windows/connect-distributed.bat

linux版

单独部署:KAFKA_HOME/bin/connect-standalone.sh

集群部署:KAFKA_HOME/bin/connect-distributed.sh

配置文件说明:

单独部署配置文件:KAFKA_HOME/config/connect-standalone.properties

集群部署配置文件:KAFKA_HOME/config/connect-distributed.properties

以windows版的单独部署为例。

 

kafka connect配置文件说明

找到KAFKA_HOME/config目录下的connect-standalone.properties和connect-distributed.properties文件。

connect-standalone.properties:当只有一个kafka服务器时,我们需要使用这个配置文件。

connect-distributed.properties:当存在多个kafka服务器时(集群部署),我们需要使用这个配置文件。

以connect-standalone.properties为例

说明:如果是connect-distributed.properties,这里可以配置多个broker地址,中间使用逗号隔开。

bootstrap.servers属性:指定broker服务器的地址,默认值为:localhost:9092。

offset.storage.file.filename属性:设置偏移量文件connect.offsets的存放地址,默认路径为:/tmp。

linux:connect.offsets存储在:/tmp目录下,windows:connect.offsets存在KAFKA_HOME所在磁盘的/tmp目录下。

当目录不存在时,会自动被创建。

offset.flush.interval.ms属性:偏移量刷新时间间隔,单位:毫秒,默认值为:10000毫秒(10秒)。

plugin.path属性:设置connect插件的存放路径,多个路径之间使用逗号隔开。

在启动kafka connect时,会自动从上面配置的路径,读取connect插件。

一般情况下,我们不在这里配置,而是将插件直接放在:KAFKA_HOME/plugins目录下。

plugin.path=/kfk/kafka_2.13-3.6.1/plugins

这样一来,当你使用connect-standalone.bat并且指定配置文件connect-standalone.properties时,kafka connect会自动读取此路径下的插件(Source Connector和Sink Connector)。

验证插件是否生效

http://localhost:8083/connector-plugins

 

listeners属性:设置的是kafka connect服务器的访问端口号,默认值是8083。

多个地址之间使用逗号隔开。

如何更改它的端口号呢?

listeners=HTTP://:8093

在此配置文件当中增加如上代码, 我们就把kafka connect的端口号改成8093啦。

启动kafka connect

前提:已经启动好了kafka server。

以只运行一个broker进行举例说明

windows操作步骤

broker的启动命令是:connect-standalone.bat。

文件所在路径:KAFKA_HOME/bin/windows

所需配置文件:connect-standalone.properties。

文件所在路径:KAFKA_HOME/config

同样打开黑窗口,运行以下指令:

connect-standalone.bat ../../config/connect-standalone.properties

当出现如上字样时,就说明:kafka connect服务启动成功了。

访问地址:

http://localhost:8083/

3.6.1是kafka服务器的版本号。 

linux操作步骤

broker的启动命令是:connect-standalone.sh。

文件所在路径:KAFKA_HOME/bin

所需配置文件:connect-standalone.properties。

文件所在路径:KAFKA_HOME/config

 

 

 

 

切换到KAFKA_HOME/bin/windows目录下,输入cmd。

并按回车键打开黑窗口。

运行以下命令

connect-standalone.bat ../../config/connect-standalone.properties

 

当出现如上字样时,说明kafka connect服务启动成功了。

 

运行connect-standalone.bat

 

 

写在最后

  哪位大佬如若发现文章存在纰漏之处或需要补充更多内容,欢迎留言!!!

 相关推荐: