25-Flume

发布时间 2023-08-01 21:21:51作者: tree6x7

1. 概述&入门

Flume 是 Cloudera 提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统。

Flume 基于流式架构,灵活简单。其最主要的作用就是实时读取服务器本地磁盘的数据,将数据写入到 HDFS。

1.1 基础架构

a. Agent

Flume 的部署单元,本质是一个 JVM 进程,Agent 内部是以「事件」的形式将数据从源头送至目的。

Agent 主要有 3 个部分组成:Source、Channel、Sink。

b. Source

Source 是负责接收数据到 Flume Agent 的组件。

Source 组件可以处理各种类型、各种格式的日志数据。

Source 组件类型 说明
avro 本质是 RPC 框架,支持跨语言、跨平台的数据传输,avro Source 在 Flume 中多用于 Agent 的连接。
netcat 本质是 Linux 下的端口类工具,netcat Source 在 Flume 中用于采集端口传输的数据。
exec 支持执行命令的,并将命令执行后的标准输出作为数据采集,多用于采集一个可追加文件。
spooling directory 支持对一个目录进行监听,采集目录中一个或多个新生成的文件数据。
taildir 支持对多个目录进行监听,采集一个或多个目录下的一个或多个可追加文件,支持断点续传。

除此之外还有:thrift、jms、sequence generator、syslog、http、自定义 Source。

c. Sink

Sink 是负责发送数据到外部系统的 Flume Agent 的组件。

Sink 组件不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量的、事务的写入到存储或索引系统、或者被发送到另一个 Flume Agent。

Sink 组件类型 说明
logger logger Sink 组件则是将数据写到成 Flume 框架的运行日志中,配合运行参数 -Dflume.root.logger=INFO,console 可以将 Flume 运行日志(其中就包含了采集的数据)输出到控制台,多用于测试环境。
hdfs hdfs Sink 组件是负责将数据传输到 HDFS 分布式文件系统中。
avro avro Sink 组件配合 avro Source 组件可以实现 Agent 的连接。
file file Sink 组件是将采集到的数据直接输出到本地文件系统中,即 Linux 的磁盘上。

除此之外还有:thrift、ipc、HBase、solr、自定义 Sink。

d. Channel

Channel 是负责暂存数据的,是位于 Source 和 Sink 组件之间的缓冲区。

由于 Channel 组件的存在,使得 Source 和 Sink 组件可以运作在不同的速率上。并且,Channel 是线程安全的,可以同时处理几个 Source 的写入操作和几个 Sink 的读取操作。

Flume 自带两种 Channel:

Channel 类型 说明 默认容量
Memory Channel 基于内存的队列存储事件,适用于对数据安全性要求不高的场景。 100 Event
File Channel 基于磁盘存储事件,宕机数据不丢失,适用于对数据安全敏感度高的场景。 100w Event

FileChannel 可以通过配置 dataDirs 指向多个路径,每个路径对应不同的硬盘,增大 Flume 吞吐量。

除此之外还可以自定义 Channel。

e. Event

Event 是 Flume 数据传输的基本单元,以 Event 的形式将数据从源头送至目的地。

Event 由 Header 和 Body 两部分组成:

  • Header:用来存放该 Event 的一些属性,为 KV 结构;
  • Body:用来存放该条数据,形式为字节数组。

1.2 安装部署

http://flume.apache.org/

(1)将 apache-flume-1.9.0-bin.tar.gz 上传到 /opt/software 目录下

(2)解压 apache-flume-1.9.0-bin.tar.gz 到 /opt/module/ 目录下

(3)修改 apache-flume-1.9.0-bin 的名称为 flume

(4)将 flume/lib 下的 guava-11.0.2.jar 删除以兼容 Hadoop-3.1.3

[liujiaqi@hadoop102 ~]$ mv apache-flume-1.9.0-bin.tar.gz /opt/software/
[liujiaqi@hadoop102 ~]$ tar -zxvf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/
[liujiaqi@hadoop102 ~]$ cd /opt/module/
[liujiaqi@hadoop102 module]$ mv apache-flume-1.9.0-bin flume
[liujiaqi@hadoop102 module]$ rm -rf flume/lib/guava-11.0.2.jar 

1.3 入门案例

a. 监控端口数据

使用 Flume 监听一个端口,收集该端口数据,并打印到控制台。

需求分析:

  1. 通过 netcat 工具向本机的 44444 端口发送数据;
  2. Flume 监控本机的 44444 端口,通过 Flume 的 Source 端读取数据;
  3. Flume 将获取的数据通过 Sink 写出到控制台。

实现步骤:

(1)安装 netcat/net-tools 工具

$ sudo yum install -y net-tools
$ sudo yum install -y nc

(2)判断 44444 端口是否被占用

$ sudo netstat -nlp | grep 44444
# nc -lk <PORT>   开的是服务端
# nc <IP> <PORT>  开的是客户端

(3)在 flume 目录下创建 job 文件夹并进入 job 文件夹

$ mkdir -p job/simple
$ cd job/simple

(4)在 simple 目录下创建 Flume Agent 配置文件:netcat-flume-logger.conf

# Name the components on this agent
a1.sources = r1											# 为a1的Source组件命名为r1(多个组件用空格间隔)
a1.sinks = k1                                       	# 为a1的Sink组件命名为k1(多个组件用空格间隔)
a1.channels = c1                                     	# 为a1的Channel组件命名为c1(多个组件用空格间隔)

# Describe/configure the source
a1.sources.r1.type = netcat                          	# 配置r1的类型
a1.sources.r1.bind = localhost                       	# 配置r1的绑定地址(注意localhost和hadoop102的区别)
a1.sources.r1.port = 44444                           	# 配置r1的监听端口

# Describe the sink
a1.sinks.k1.type = logger                          		# 配置k1的类型为logger-输出到控制台

# Use a channel which buffers events in memory
a1.channels.c1.type = memory                    			# 配置c1的类型为memory
a1.channels.c1.capacity = 1000                   			# 配置c1的「容量」为1000个事件
a1.channels.c1.transactionCapacity = 100         			# 配置c1的「事务容量」为100个事件

# Bind the source and sink to the channel
a1.sources.r1.channels = c1                     			# 配置r1的channel属性,指定r1连接到那个channel
a1.sinks.k1.channel = c1                        			# 配置k1的channel属性,指定k1连接到那个channel

(5)部署运行 Flume 监听端口数据

# 写法一
$ bin/flume-ng agent --conf conf/ --name a1 --conf-file job/simple/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
# 写法二
$ bin/flume-ng agent -c conf/ -n a1 -f job/simple/netcat-flume-logger.conf -Dflume.root.logger=INFO,console
参数 说明
--conf / -c 表示配置文件存储在 conf 目录
--name / -n 表示给 agent 起名为 a1
--conf-file / -f 指定读取的配置文件是在 job/simple 下的 netcat-flume-logger.conf 文件
-Dflume.root.logger=INFO,console -D 表示 flume 运行时动态修改 flume.root.logger 参数属性值,并将控制台日志打印级别设置为 INFO 级别。console 表示将日志输出到控制台。

(6)测试

使用 netcat 工具向本机的 44444 端口发送内容

$ nc localhost 44444
hello,flume!
OK

在 Flume 观察接收数据情况:

b. 实时监控单个追加文件

实时监控 Hive 日志,并上传到 HDFS 中。

需求分析:

  1. 需要先启动 Hadoop、Hive
  2. 要想读取 Linux 系统中的文件,就得按照 Linux 命令的规则执行命令。由于 Hive日志在 Linux 系统中,所以读取文件的类型选择 exec,即 execute 执行的意思,表示执行 Linux 命令来读取文件;

实现步骤:

(1)启动 Hadoop、Hive

(2)在 job/simple 目录下创建 file-flume-hdfs.conf

# Name the components on this agent
a2.sources = r2
a2.sinks = k2
a2.channels = c2

# Describe/configure the source
a2.sources.r2.type = exec
a2.sources.r2.command = tail -F /opt/module/hive/logs/hive.log

# Describe the sink
a2.sinks.k2.type = hdfs
a2.sinks.k2.hdfs.path = hdfs://hadoop102:8020/flume/%Y%m%d/%H
# 上传文件的前缀
a2.sinks.k2.hdfs.filePrefix = logs-
# round1 是否按照时间滚动文件夹
a2.sinks.k2.hdfs.round = true
# round2 多少时间单位创建一个新的文件夹
a2.sinks.k2.hdfs.roundValue = 1
# round3 重新定义时间单位
a2.sinks.k2.hdfs.roundUnit = hour
# 是否使用本地时间戳
a2.sinks.k2.hdfs.useLocalTimeStamp = true
# 积攒多少个 Event 才 flush 到 HDFS 一次
a2.sinks.k2.hdfs.batchSize = 100
# 设置文件类型,可支持压缩
a2.sinks.k2.hdfs.fileType = DataStream
# 多久生成一个新的文件
a2.sinks.k2.hdfs.rollInterval = 60
# 设置每个文件的滚动大小
a2.sinks.k2.hdfs.rollSize = 134217700
# 文件的滚动与 Event 数量无关
a2.sinks.k2.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2

【注】对于所有与时间相关的转义序列,Event Header 中必须存在以 “timestamp”的 key(除非 hdfs.useLocalTimeStamp 设置为 true,此方法会使用 TimestampInterceptor 自动添加 timestamp)。

a3.sinks.k3.hdfs.useLocalTimeStamp = true

(3)运行 Flume

$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/simple/file-flume-hdfs.conf

(4)在 HDFS 上查看文件

c. 实时监控目录下多个新文件

使用 Flume 监听整个目录的文件,并上传至 HDFS。

Source 选用 Spooling Directory Source。

创建配置文件 dir-flume-hdfs.conf:

# Name the components on this agent
a3.sources = r3
a3.sinks = k3
a3.channels = c3

# Describe/configure the source
a3.sources.r3.type = spooldir
a3.sources.r3.spoolDir = /opt/module/flume/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true
# 忽略所有以 .tmp 结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)

# Describe the sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:8020/flume/upload/%Y%m%d/%H
# 上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
# 是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
# 重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
# 是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
# 积攒多少个 Event 才 flush 到 HDFS 一次
a3.sinks.k3.hdfs.batchSize = 100
# 设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
# 多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
# 设置每个文件的滚动大小大概是 128M
a3.sinks.k3.hdfs.rollSize = 134217700
# 文件的滚动与 Event 数量无关
a3.sinks.k3.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 1000
a3.channels.c3.transactionCapacity = 100

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

【注】在使用 Spooling Directory Source 时,不要在监控目录中创建并持续修改文件;上传完成的文件会以 .COMPLETED 结尾;被监控文件夹每 500ms 扫描一次文件变动。

d. 实时监控目录下的多个追加文件

使用 Flume 监听整个目录的实时追加文件,并上传至 HDFS。

(1)在 flume 根目录下创建目录 datas/tailCase/files 和 datas/tailCase/logs 用于存放数据文件

(2)在 job/simple 目录下创建配置文件 taildir-flume-hdfs.conf

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1

# Describe/configure the source
a2.sources.r1.type = TAILDIR
a2.sources.r1.positionFile = /opt/module/flume/tail_dir.json
a2.sources.r1.filegroups = f1 f2
a2.sources.r1.filegroups.f1 = /opt/module/flume/datas/tailCase/files/.*file.*
a2.sources.r1.filegroups.f2 = /opt/module/flume/datas/tailCase/logs/.*log.*

# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://hadoop102:8020/flume/tailDir/%Y%m%d/%H
# 上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = tail-
# 是否按照时间滚动文件夹
a2.sinks.k1.hdfs.round = true
# 多少时间单位创建一个新的文件夹
a2.sinks.k1.hdfs.roundValue = 1
# 重新定义时间单位
a2.sinks.k1.hdfs.roundUnit = hour
# 是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
# 积攒多少个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 100
# 设置文件类型,(可选择设置支持压缩的CompressedStream或者不支持压缩的DataStream) 
a2.sinks.k1.hdfs.fileType = DataStream
# 多久生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 60
# 设置每个文件的滚动大小大概是128M
a2.sinks.k1.hdfs.rollSize = 134217700
# 文件的滚动与Event数量无关
a2.sinks.k1.hdfs.rollCount = 0

# Use a channel which buffers events in memory
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

(3)启动 flume,监控文件夹

$ bin/flume-ng agent --conf conf/ --name a2 --conf-file job/simple/taildir-flume-hdfs.conf

(4)测试

# 1. 在/opt/module/flume/datas/目录下创建tailCase/files文件夹向files文件夹下文件追加内容
[liujiaqi@hadoop102 files]$ touch file1.txt
[liujiaqi@hadoop102 files]$ echo I am file1 >> file1.txt
[liujiaqi@hadoop102 files]$ touch log1.txt
[liujiaqi@hadoop102 files]$ echo I am log1 >> log1.txt
# 2. 在/opt/module/flume/datas/目录下创建tailCase/logs文件夹向logs文件夹下文件追加内容
[liujiaqi@hadoop102 logs]$ touch file2.txt
[liujiaqi@hadoop102 logs]$ echo I am file2 >> file2.txt
[liujiaqi@hadoop102 logs]$ touch log2.txt
[liujiaqi@hadoop102 logs]$ echo I am log2 >> log2.txt
# 3. 查看HDFS上的数据,验证flume对多目录下文件的实时采集
# 4. 关掉flume采集程序,对logs/和files/下文件追加,再开启flume采集程序,验证flume的断点续传
[liujiaqi@hadoop102 flume]$ cat /opt/module/flume/tail_dir.json
[liujiaqi@hadoop102 flume]$ cd datas/tailCase/files
[liujiaqi@hadoop102 files]$ echo I am file1 duandian >> file1.txt
[liujiaqi@hadoop102 files]$ cd /opt/module/flume/datas/tailCase/logs
[liujiaqi@hadoop102 logs]$ echo I am log2 xuchuan>> log2.txt

Taildir Source 维护了一个 json 格式的 Position File,其会定期的往 Position File 中更新每个文件读取到的最新的位置,因此能够实现断点续传。

Position File 的格式如下:

{"inode":2496272,"pos":12,"file":"/opt/module/flume/datas/tailCase/files/file1.txt"}
{"inode":2496275,"pos":12,"file":"/opt/module/flume/datas/tailCase/logs/log2.txt"}

【注】Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统用 inode 来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用 inode 号码来识别文件。

Taildir 是使用 inode 和 fileAbsPath 联合确定一个文件。一旦文件更名,就会重新上传全量数据。如果想要禁止这种情况,需要修改 flume-taildir 源码:

// ReliableTaildirEventReader

// 1. updatePos
// if (this.inode == inode && this.path.equals(path)) {
if (this.inode == inode)

// 2. updateTailFiles
// if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
if (tf == null)

小结:

  • Exec Source:适用于监控一个实时追加的文件,不能实现断点续传;
  • Spooldir Source:适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;
  • Taildir Source:适合用于监听多个实时追加的文件,并且能够实现断点续传,但可能会造成数据重复

2. Flume 进阶

2.1 Flume 事务

Flume 中一共有两种事务:

  • Put 事务:在 Source 组件和 Channel 组件之间,保证 Source 组件到 Channel 组件之间数据传递的可靠性。
  • Take 事务:在 Channel 组件和 Sink 组件之间,保证 Channel 组件到 Sink 组件之间数据传输的可靠性。

a. Put 事务流程

(1)Source 组件采集外部数据到 Agent 内部,并且将数据包装为 Event;

(2)Source 组件开始将事件传输到 Channel 组件中;

(3)先开启事务,在事务内部,通过 doPut 方法将一批数据放入到 putList 中存储(数据批的大小取决于 Source 组件的配置参数 batchSize 的值);

(4)调用 doCommit 方法,把 putList 中的所有 Event 放到 Channel 中,成功之后就清空 putList(putList 的大小取决于 Channel 组件的配置参数 transactionCapacity 的值);

(5)putList 在向 Channel 中发送数据前会先检查 Channel 中的容量是否放得下,放不下则一个都不会放并调用 doRollback 方法;

(6)调用 doRollback 方法后,doRollback 方法会进行两步操作:将 putList 清空;抛出 ChannelException 异常。

(7)Source 组件捕捉到 doRollback 方法抛出的异常后,会将刚才的一批数据重新采集,然后重新开启一个事务。

【思考】Put 事务能否保证采集数据不丢失?

b. Take 事务流程

(1)Sink 组件不断的轮询 Channel,当其中有新的事件到达时,开启 Take 事务;

(2)Take 事务开启后,会调用 doTake 方法将 Channel 组件中的 Event 剪切到 takeList 中;

(3)当 takeList 中存放了 batch size 数量的 Event 之后,就会调用 doCommit 方法;

(4)doCommit 方法中,首先会将数据写出到外部系统,成功后就会清空 takeList;

(5)当事务失败时,就会调用 doRollback 方法来进行回滚,就是将 takeList 中的数据原封不动的还给 Channel。

(6)当 Take 事务失败时,可能向外部写了一半的数据了,但是回滚时,是将 takeList 中的全部数返给 Channel,当开启新的 Take 事务时,又会将这批数据再次写出到外部,就造成了数据重复。

【思考】Take 事务可能造成数据重复,如何避免呢?

【注】Source 丢失和 Sink 重复是外部的事情,Flume 的事务是说 Agent 内部组件之间的可靠性。

2.2 Agent 内部原理

(1)Source 组件采集外部数据到 Agent 内部,并包装为 Event;

(2)将 Event 发送到 ChannelProcessor 中:

  • 先通过 InterceptorChain(拦截器链)中每个 Interceptor 的拦截过滤,符合要求的 Event 会返回到 ChannelProcessor 中;
  • 再根据不同的 ChannelSelector(选择器)来决定 Event 去往哪个 Channel,然后返回到 ChannelProcessor;

(3)开启 Put 事务,将批量的 Event 发送到 Channel 中;

(4)根据 SinkProcessor 组件配置的类型不同,实现相应的功能(负载均衡或故障转移),最终都会且同一时刻只能有一个 Sink 去拉取数据。

(5)Sink 组件不断的轮询 Channel,当有新的 Event 到达 Channel 时,拉取 Event。

2.3 Flume 拓扑结构

(1)简单串联

这种模式是将多个 Flume 顺序连接起来了,从最初的 Source 开始到最终 Sink 传送的目的存储系统。此模式不建议桥接过多的 Flume 数量,Flume 数量过多不仅会影响传输速率,而且一旦传输过程中某个节点 Flume 宕机,会影响整个传输系统。

(2)复制和多路复用

Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个 Channel 中,或者将不同数据分发到不同的 Channel 中,Sink 可以选择传送到不同的目的地。

(3)负载均衡和故障转移

Flume 支持使用将多个 Sink 逻辑上分到一个 Sink 组,Sink 组配合不同的 SinkProcessor 可以实现负载均衡和错误恢复的功能。

(4)聚合

这种模式是我们最常见的,也非常实用,日常 web 应用通常分布在上百个服务器,大者甚至上千个、上万个服务器。产生的日志,处理起来也非常麻烦。用 Flume 的这种组合方式能很好的解决这一问题,每台服务器部署一个 Flume 采集日志,传送到一个集中收集日志的 Flume,再由此 Flume 上传到 hdfs、hive、hbase 等进行日志分析。

2.4 进阶案例

a. 复制

需求说明

使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责输出到 Local FileSystem。

需求分析

实现步骤

测试步骤

b. 负载均衡和故障转移

案例需求

使用 Flume1 监控一个端口,其 Sink Group 中的 Sink 分别对接 Flume2 和 Flume3,采用 FailoverSinkProcessor 实现故障转移的功能。

需求分析

实现步骤

测试步骤

  1. 使用 netcat 工具向本机的 44444 端口发送内容
  2. 查看 Flume2 及 Flume3 的控制台打印日志
  3. 将 Flume2 kill,观察 Flume3 的控制台打印情况

c. 多路复用和拦截器的使用

案例需求

使用 Flume 采集服务器端口日志数据,需要按照日志类型的不同,将不同种类的日志发往不同分析系统。

需求分析

在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。

此时会用到 Flume 的 ChannelSelecter 中的 Multiplexing 结构,其原理是:根据 Event#Header 的某个 key 的值,将不同的 Event 发送到不同的 Channel 中。所以我们需要自定义一个 Interceptor,实现为不同类型的 Event#Header 中的 key 赋予不同的值。

在该案例中,我们以端口数据模拟日志,以数字和字母模拟不同类型的日志,我们需要自定义 Interceptor 区分数字和字母,将其分别发往不同的分析系统(Channel)。

实现步骤

(1)创建 Maven 项目并引入如下依赖

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.9.0</version>
</dependency>

(2)定义 CustomInterceptor 类并实现 Interceptor 接口

package io.tree6x7.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.List;

public class CustomInterceptor implements Interceptor {


    @Override
    public void initialize() {
    }

    @Override
    public Event intercept(Event event) {
        // 1. 从事件中获取数据
        byte[] body = event.getBody();
        // 2. 判断数据开头的字符是字母还是数据
        if (body[0] >= 'a' && body[0] <= 'z') {
            // 是字母就在事件头部设置type类型为letter
            event.getHeaders().put("type", "letter");         
        } else if (body[0] >= '0' && body[0] <= '9') {
            // 是数字就在事件头部设置type类型为number
            event.getHeaders().put("type", "number");
        }
        // 3. 返回事件
        return event;

    }

	  /**
     * 对批量事件进行拦截
     */
    @Override
    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
        }
        return events;
    }

    @Override
    public void close() {
    }
  

    /**
     * 拦截器对象的构造对象
     */
    public static class Builder implements Interceptor.Builder {

        @Override
        public Interceptor build() {
            return new CustomInterceptor();
        }

        @Override
        public void configure(Context context) {
        }
    }
}

(3)将项目打包,并导入到 Flume 的 lib 目录下

(4)为 hadoop102 上的 Flume1 配置 1 个 netcat Source,1 个 Sink Group(2 个 avro Sink),并配置相应的 ChannelSelector 和 Interceptor。再分别为 hadoop103 上的 Flume2 和 hadoop104 上的 Flume3 配置一个 avro Source 和一个 logger Sink。

(5)分别在 hadoop102、hadoop103、hadoop104 上启动 Flume 进程,注意先后顺序。

测试步骤

  1. 在 hadoop102 使用 netcat 向 localhost:44444 发送字母和数字
  2. 观察 hadoop103 和 hadoop104 打印的日志。

d. 聚合

案例需求

  • hadoop102 上的 flume-1 监控文件 /opt/module/flume/datas/.*file*.
  • hadoop103 上的 flume-2 监控某一个端口的数据流
  • hadoop104 上的 flume-3 接收 flume-1 和 flume-2 的数据并打印到控制台

需求分析

实现步骤

部署运行 Flume 采集程序

# 在hadoop104节点上运行flume3
$ /opt/module/flume/bin/flume-ng agent –c conf/ -n a3 -f /opt/module/flume/job/enterprise/juhe/flume-3-avro-logger.conf -Dflume.root.logger=INFO,console
# 在hadoop103节点上运行flume2
$ /opt/module/flume/bin/flume-ng agent –c conf/ -n a2 -f /opt/module/flume/job/enterprise/juhe/flume-2-netcat-avro.conf
# 在hadoop102节点上运行flume1
$ /opt/module/flume/bin/flume-ng agent –c conf/ -n a1 -f /opt/module/flume/job/enterprise/juhe/flume-1-exec-avro.conf

测试步骤

  1. 在 hadoop102 上向 /opt/module/flume/datas/ 目录下的 realtime.log 追加内容
  2. 在 hadoop103 上向 44444 端口发送数据
  3. 检查 hadoop104 上数据

3. 自定义组件

3.1 自定义 Source

a. 说明

Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。官方提供的 source 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 source。

https://flume.apache.org/FlumeDeveloperGuide.html#source

根据官方说明自定义 MySource 需要完成以下两点:

  • 继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口

  • 实现相应方法

    getBackOffSleepIncrement()         // backoff 步长
    getMaxBackOffSleepInterval()       // backoff 最长时间
    configure(Context context)         // 初始化 context(读取配置文件内容)
    process()	                       // 获取数据封装成Event并写入Channel(该方法将被循环调用)
    

使用场景:读取 MySQL 数据或者其他文件系统。

b. 案例

(1)导入依赖

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.9.0</version>
</dependency>

(2)编码自定义 Source

package io.tree6x7;

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

import java.util.HashMap;

public class MySource extends AbstractSource implements
        Configurable, PollableSource {
    
    /**
     * 定义配置文件将来要读取的字段
     */
    private Long delay;
    private String field;

    /**
     * 初始化配置信息
     */
    @Override
    public void configure(Context context) {
        delay = context.getLong("delay");
        field = context.getString("field", "Hello!");
    }

    @Override
    public Status process() throws EventDeliveryException {
        try {
            // 创建 Header
            HashMap<String, String> hearderMap = new HashMap<>();
            // 创建 Event
            SimpleEvent event = new SimpleEvent();
            // 循环封装 Event
            for (int i = 0; i < 5; i++) {
                // 给事件设置头信息
                event.setHeaders(hearderMap);
                // 给事件设置内容
                event.setBody((field + i).getBytes());
                // 将事件写入 Channel
                getChannelProcessor().processEvent(event);
                Thread.sleep(delay);
            }
        } catch (Exception e) {
            e.printStackTrace();
            return Status.BACKOFF;
        }
        return Status.READY;
    }

    @Override
    public long getBackOffSleepIncrement() {
        return 0;
    }

    @Override
    public long getMaxBackOffSleepInterval() {
        return 0;
    }
}

(3)将写好的代码打包并放到 Flume 的 lib 目录下

(4)编写 Flume 配置文件 mysource.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = io.tree6x7.MySource
a1.sources.r1.delay = 1000
a1.sources.r1.field = liujiaqi

# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(5)开启任务

$  bin/flume-ng agent -c conf/ -f job/mysource.conf -n a1 -Dflume.root.logger=INFO,console

(6)查看控制台打印

3.2 自定义 Sink

a. 说明

Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。

Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。

Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。官方提供的 Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 Sink。

https://flume.apache.org/FlumeDeveloperGuide.html#sink

根据官方说明自定义 MySink 需要完成以下两点:

  • 继承 AbstractSink 类并实现 Configurable 接口

  • 实现相应方法

    configure(Context context)	  // 初始化Context(读取配置文件内容)
    process()							  // 从Channel读取获取数据Event(该方法将被循环调用)
    

使用场景:读取 Channel 数据写入 MySQL 或者其他文件系统。

b. 案例

使用 Flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。要求前后缀可在 flume 任务配置文件中配置。

(1)编码自定义 Sink

package io.tree6x7;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySink extends AbstractSink implements Configurable {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractSink.class);
    private String prefix;
    private String suffix;

    @Override
    public Status process() throws EventDeliveryException {
        // 声明返回值状态信息
        Status status;
        // 获取当前 Sink 绑定的 Channel
        Channel ch = getChannel();
        // 获取事务
        Transaction txn = ch.getTransaction();
        // 声明事件
        Event event;
        // 开启事务
        txn.begin();
        // 读取 Channel 中的事件,直到读取到事件结束循环
        while (true) {
            event = ch.take();
            if (event != null) {
                break;
            }
        }
        try {
            // 处理事件(打印)
            LOG.info(prefix + new String(event.getBody()) + suffix);
            // 事务提交
            txn.commit();
			// 更新返回值状态信息
            status = Status.READY;
        } catch (Exception e) {
            // 遇到异常,事务回滚
            txn.rollback();
            // 更新返回值状态信息
            status = Status.BACKOFF;
        } finally {
            // 关闭事务
            txn.close();
        }
        return status;
    }

    @Override
    public void configure(Context context) {
        // 读取配置文件内容,有默认值
        prefix = context.getString("prefix", "hello:");
        // 读取配置文件内容,无默认值
        suffix = context.getString("suffix");
    }
}

(2)打包放到 Flume 的 lib 目录下

(3)编写 Flume 配置文件 mysink.conf

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = io.tree6x7.MySink
# a1.sinks.k1.prefix = halo:
a1.sinks.k1.suffix = :bye
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

(4)开启任务

$ bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.logger=INFO,console

(5)使用 netcat 工具向本机的 44444 端口发送内容

$ nc localhost 44444

(6)查看控制台打印

4. Ganglia 数据流监控

Ganglia 由 gmond、gmetad 和 gweb 三部分组成。

组成 说明
gmond(Ganglia Monitoring Daemon) gmond 是一种轻量级服务,安装在每台需要收集指标数据的节点主机上。使用 gmond 可以很容易收集很多系统指标数据,如 CPU、内存、磁盘、网络和活跃进程的数据等。
gmetad(Ganglia Meta Daemon) 整合所有信息,并将其以 RRD 格式存储至磁盘的服务。
gweb(Ganglia Web) gweb 是一种利用浏览器显示 gmetad 所存储数据的 PHP 前端。在 Web 界面中以图表方式展现集群的运行状态下收集的多种不同指标数据。

安装规划:

gweb gmetad gmod
hadoop102 ture true true
hadoop103 true
hadoop104 true

安装步骤:

(1)基本安装

# 1. 在 102 103 104 分别安装 epel-release
$ sudo yum -y install epel-release
$ sudo yum -y install epel-release
$ sudo yum -y install epel-release
# 2. 在 102 安装
$ sudo yum -y install ganglia-gmetad 
$ sudo yum -y install ganglia-web
$ sudo yum -y install ganglia-gmond
# 3. 在 103 和 104 安装
$ sudo yum -y install ganglia-gmond
$ sudo yum -y install ganglia-gmond

(2)在 102 修改 /etc/httpd/conf.d/ganglia.conf 和 /etc/ganglia/gmetad.conf

(3)在 102 103 104 修改 /etc/ganglia/gmond.conf

(4)在 102 修改 /etc/selinux/config

提示:selinux 本次生效关闭必须重启,如果此时不想重启,可以用 sudo setenforce 0 临时生效。

(5)启动 Ganglia

# 在 102 103 104 启动
$ sudo systemctl start gmond
$ sudo systemctl start gmond
$ sudo systemctl start gmond
# 在 102 启动
$ sudo systemctl start httpd
$ sudo systemctl start gmetad

(6)GangliaWeb http://hadoop102/ganglia

(7)启动 Flume 测试监控

[liujiaqi@hadoop102 flume]$ bin/flume-ng agent \
-c conf/ \
-n a1 \
-f job/simple/netcat-flume-logger.conf \
-Dflume.root.logger=INFO,console \
-Dflume.monitoring.type=ganglia \
-Dflume.monitoring.hosts=hadoop102:8649

使用 nc localhost 44444 发送数据观察 Ganglia 监测图。

5. 总结

5.1 组成与事务

(1)组成

  • Source 组件是专门用来收集数据的(各种类型、各种格式的日志数据);
  • Channel 组件对采集到的数据进行缓存
  • Sink 组件是用于把数据发送到目的地的组件。

(2)Flume 使用两个独立的事务分别负责从 Source 到 Channel,以及从 Channel 到 Sink 的事件传递。

  • Source 到 Channel 是 Put 事务
  • Channel 到 Sink 是 Take 事务

(3)Flume 采集数据会丢失吗?

根据 Flume 的架构原理,Flume 是不可能丢失数据的,其内部有完善的事务机制,Source 到 Channel 是事务性的,Channel 到 Sink 是事务性的,因此这两个环节不会出现数据的丢失。

唯一可能丢失数据的情况是 Channel 采用 MemoryChannel 时 Agent 宕机导致数据丢失,或者 Channel 存储数据已满,导致 Source 不再写入,未写入的数据丢失。

Flume 不会丢失数据,但是有可能造成数据的重复,例如数据已经成功由 Sink 发出,但是没有接收到响应,Sink 会再次发送数据,此时可能会导致数据的重复。

(4)Flume 的事务能保证数据采集传输过程中数据安全可靠吗?

  • Flume 能保证在其内部数据不会丢失(Source 丢失和 Sink 重复是外部的事情,Flume 事务说的是 Agent 内部组件之间的可靠性)
  • Source 类型如果不是回溯型的极端情况下会造成数据丢失
  • Sink 输出时可能会造成数据重复(外部系统支持事务可以达到去重效果)

5.2 拦截器

采用拦截器的优缺点:

  • 优点:模块化开发和可移植性
  • 缺点:性能会低一些

自定义拦截器步骤:

  1. 实现 Interceptor
  2. 重写 4 个方法
    • initialize 初始化方法
    • intercept 处理单个 Event 方法
    • intercept 处理多个 Event 方法
    • close 方法
    • 静态内部类 实现Interceptor.Builder

拦截器可以不用吗?

可以不用,需要在下一级 Hive 的 dwd 层和 SparkSteaming 里面处理。

  • 优势:只处理一次,轻度处理;
  • 劣势:影响性能,不适合做实时推荐这种对实时要求比较高的场景。

5.3 Channel 选择器

  • Replication Channel Selector 会将从 Source 过来的 Events 发往所有 Channel
  • Multiplexing channel Selector 通过配置选择法网哪些 Channel

5.4 参数调优

(1)Source

增加 Source 个数(使用 Tair Dir Source 时可增加 FileGroups 个数)可以增大 Source 的读取数据的能力。例如:当某一个目录产生的文件过多时需要将这个文件目录拆分成多个文件目录,同时配置好多个 Source 以保证 Source 有足够的能力获取到新产生的数据。

batchSize 参数决定 Source 一次批量运输到 Channel 的 Event 条数,适当调大这个参数可以提高 Source 搬运 Event 到 Channel 时的性能。

(2)Channel

type 选择 Memory 时 Channel 的性能最好,但是如果 Flume 进程意外挂掉可能会丢失数据。type 选择 File 时 Channel 的容错性更好,但是性能上会比 Memory 差。

使用 File Channel 时 dataDirs 配置多个不同盘下的目录可以提高性能。Capacity 参数决定 Channel 可容纳最大的 Event 条数。transactionCapacity 参数决定每次 Source 往 Channel 里面写的最大 Event 条数和每次 Sink 从 Channel 里面读的最大 Event 条数。transactionCapacity 需要大于 Source 和 Sink 的 batchSize 参数。

(3)Sink

增加 Sink 的个数可以增加 Sink 消费 Event 的能力。Sink 也不是越多越好够用就行,过多的 Sink 会占用系统资源,造成系统资源不必要的浪费。batchSize 参数决定 Sink 一次批量从 Channel 读取的 Event 条数,适当调大这个参数可以提高 Sink 从 Channel 搬出 Event 的性能。

5.5 Ganglia 监控器

采用 Ganglia 监控器,若监控到 Flume 尝试提交的次数远远大于最终成功的次数,说明 Flume 运行比较差。通常是因为内存不充足导致,所有提高内存是比较好的办法。

解决办法?

(1)增加内存:修改配置文件 flume-env.sh,其中 -Xmx 与 -Xms 最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁 FullGC。

(2)增加服务器台数

(3)提高 Flume 吞吐量