Flink

发布时间 2023-09-19 21:24:00作者: CHEN_zu_he

Flink

概念

Flink运行时由两种类型的进程组成:JobManager和TaskManager。

Flink Program可以理解为自己提交的jar包。构建出Dataflow(数据流),Optimizer Graph Builder(图构造优化器),Client(客户端)。

时域

事件时间

事件发生的时间。

到达时间

数据到达Flink时间。

处理时间

Flink开始处理时间。

很多时候等于到达时间。

窗口

Aggregating events (e.g., counts, sums) works differently on streams than in batch processing. For example, it is impossible to count all elements in a stream, because streams are in general infinite (unbounded). Instead, aggregates on streams (counts, sums, etc), are scoped by windows, such as “count over the last 5 minutes”, or “sum of the last 100 elements”.
聚合事件(例如,计数、总和)在流上的工作方式不同于在批处理中。例如,不可能统计流中的所有元素,因为流通常是无限的(无界的)。相反,流上的聚合(计数、和等)由窗口确定作用域,如“在过去5分钟内计数”或“最后100个元素的总和”。

Windows can be time driven (example: every 30 seconds) or data driven (example: every 100 elements). One typically distinguishes different types of windows, such as tumbling windows (no overlap), sliding windows (with overlap), and session windows (punctuated by a gap of inactivity).
Windows 可以是时间驱动的(例如: 每30秒)或数据驱动的(例如: 每100个元素)。通常区分不同类型的窗口,例如滚动窗口(无重叠)、滑动窗口(有重叠)和会话窗口(不活动间隔)。

分为:固定窗口、滑动窗口、会话窗口

固定窗口:通常按照固定的时间片进行划分;各个窗口之间不会有重叠也不会有间隙;

滑动窗口

在固定的窗口上增加了滑动的步长;

比如:每五分钟统计过去十分钟内的数据,即:窗口为10 ,滑动步长为5 ,则每次都有5分钟的窗口数据与上次统计的重叠;

窗口=步长:等同于固定窗口;

窗口>步长:有重叠;

窗口<步长:窗口之间出现空隙;

会话窗口

没有固定的窗口大小,由时间频率决定;

当一个窗口在大于Session gap的时间内没有接收到新数据时,窗口将关闭。在这种模式下,窗口的长度是可变的,每个窗口的开始和结束时间并不是确定的;

Watermark

进程

JobManager

至少有一个,高可用模式下由多个,一个active,其余的standby。协调Flink应用程序的分布式执行,决定何时调度下一个task,对完成、失败的task做出反应,协调checkPoint,故障恢复等。

ResourceManager

负责Flink集群中资源提供、回收、分配,管理task solts。

Dispatcher

提供REST接口,用来提交Flink程序,并未每个提交的启动一个新的JobMaster。提供webui。

JobMaster

负责管理单个JobGraph(生成逻辑图有向图)的执行,Flink集群中可以同时运行多个作业,每个作业都有自己的JobMaster。

TaskManager

执行作业流的task,缓存、交换数据。

能调度的最小单位是task solt。

一个taskmanager被分配1000m资源,5个solt,那么每个solt占用1000/5=200m资源;

tasks和算子链

Flink将算子的subtasks链接成tasks,每个task由一个线程执行。(减少线程切换、缓冲开销、件事时延和增加吞吐量)

如下图示例:source-->map()-->keyby()wondow()apply()-->sink
source和map为一一对应关系,可以优化一个算子链,一个task,并行度为2,有2个subtask(线程);
keyby()会对数据进行分区,所以不能和前面整合为一个算子链,并行度为2,有两个线程;
sink一个task,一个并行度,一个线程;
共三个task,五个线程。

本地配置30个solt,运行两个job,并行度默认为2。

算子链优化

Task Slots 和资源

每个TaskManager(worker节点)都是一个jvm进程。可以在单独的线程中的执行一个或多个subtask。为了控制一个TaskManager能执行多少个task/subtask,就有了task solt。

具有3个solt的TaskManager,每个solt占据1/3的内存,没有CPU隔离。

一个TaskManager有多个solt时,solt共享同一jvm,tcp连接数,心跳信息,数据集,数据结构等;

默认情况下运训subtask共享solt,即便不是同一个task的subtask,只要是来自同一个作业即可。

例子:将上边的作业的五个线程分配到solt

例子:将上边作业的source()-map()和keyby()-window()-apply()的并行度增加到6,共2*6+1(sink)=13个线程

运行模式

local模式原理

1. flink 程序由JobClient提交;
2. JobClient将作业提交给JobManager
3. JobManager负责协调资源分配和作业执行。资源分配完成,将任务分配给相应的TaskManager。
4. TaskManager启动一个线程开始执行,同时向JobManager报告执行状态。
5. 执行完成将结果反馈客户端JobClient。

slot在 Flink 里面可以认为是资源组, Flink 是通过将任务分成子任务并且将这些子任务分配到 slot 来并行执行程序。

on yarn模式

1. client上传jar包和配置文件到hdfs集群;
2. clinet向yarn 的ResourceManager申请资源;
3. ResourceManager分配container资源并启动AppliactionMaster,然后AP加载flink的jar包和配置构建环境,启动JobManager;
JobManager和ApplicationMaster运行在同一个container上。
ApplicationMatser也提供了flink的web服务接口。
yarn所分配的所有端口都是临时端口,允许用户并行
4. ApplicationMaster向ResourceManager申请工作资源,nodemanager记载flink程序和配置文件构建TaskManager。
5. TaskManager启动后,向JobManager发送心跳包,等待分配任务。

关闭yarn的内存检查

是否启动线程检查每个任务使用的虚拟内存量,超出直接杀掉,默认是true;

yarn-site.xml

<!-- 关闭yarn内存检查 -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>

复制到各个节点上,重启;

session模式

特点:需要实现申请资源,启动JobManager和TaskManager,每次作业执行完成不是放flink集群资源;
有点:不需要每次递交作业申请资源,二十使用已经申请好的资源,从而提升执行效率;
缺点:作业执行完以后,资源不会释放,因此会一直占用系统资源;
适用场景:适合作业递交比较频繁的场景,小而多的作业;

提交任务

/flink/bin/yarn-session.sh -n 2 -tm 800 -s 1 -d

# -n 申请两个容器,这里指的是多少个taskmanager;
# -tm 表示每个taskmanager的内存大小;
# -s 表示每个taskmanager的solts数量;
# -d 表示后台运行;

per-job模式

特点:每次递交作业都需要申请一次资源;
优点:作业运行完成,资源会立刻释放,不会一直占用资源;
缺点:每次递交作业申请资源会影响执行效率;
适用场景:适合作业比较少的场景,大作业的场景

提交job

/flink/bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 /flink/server/flink/examples/batch/wordCount.jar

# -m jobmananger地址;
# -yjm 1024指定jobmananger的内存信息;
# -ytm 1024指定taskmanager的内存信息;

可在yarn的application界面查看;

如果使用yarn方式执行任务,想要切换回standalone模式有报错,可以删除/tmp/.yarn-properties-root,因为默认查找当前yarn集群中已经有的yarn-session信息中的jobmanager

application模式