Flink的JobManger-Dispatcher执行流程

发布时间 2023-06-30 15:12:16作者: 畔山陆仁贾

背景

通过命令行向Flink集群提交任务,都经过哪些环节,中间的调用关系是什么。

这里以Yarn模式为例,通过Flink任务提交至Yarn集群,由Yarn的AM开始执行Flink代码作为入口,尝试进行分析。

Flink里的代码调用关系比较复杂,这里只列了部分关键点,太过于琐碎的代码就没有具体深入。待后续再进行深入吧。如果有哪里写的不对的地方请指正。

环境信息

jdk:1.8

scala:2.12

flink:1.12

组件关系

Flink与Yarn组件关系

JobManger组件加载顺序

流程分析

入口类

通过命令行提交flink任务至yarn,在yarn的AM中,调用指定的入口类:org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint

main方法

会去调用ClusterEntrypoint.runClusterEntrypoint方法

ClusterEntrypoint.java

runClusterEntrypoint

该方法直接调用内部方法并增加一个回调hook

startCluster

用Lambda回调runCluster方法

runCluster

该方法比较关键,通过调用子类实现的createDispatcherResourceManagerComponentFactory方法,构建一个JobManager组件工厂的实例。在Flink中,还是有非常多工厂模式的影子。

然后通过该工厂的create方法再去构建一个clusterComponent对象实例,在构建该实例时,在其方法内部就完成了各组件的构建和启动工作。

DefaultDispatcherResourceManagerComponentFactory.java

create

该方法是整个JobManager的核心方法,整个JobManger的组件都在这里定义和启动。

按照启动顺序分别是WebMonitorEndpoint、DispatcherRunner和ResourceManager。

这些组件都是高可用的方式,所以有很多调用都是在为高可用服务,我这里就跳过这部分代码,只看和启动相关的。

这里我只分析Dispatcher的相关启动,其它代码分析后续再补充。

createRestEndpoint & start

createDispatcherRunner

而dispatcher的核心是dispatcherRunner,它通过dispatcherRunnerFactory.createDispatcherRunner来构建一个实例,所有的操作都封装到了工厂方法中。dispatcherRunnerFactory的实现类是DefaultDispatcherRunnerFactory。

createResourceManager & start

DefaultDispatcherRunnerFactory.java

createDispatcherRunner

该方法内部再调用DefaultDispatcherRunner.create返回一个DispatcherRunner实例。

DefaultDispatcherRunner.java

create

该方法再调用DispatcherRunnerLeaderElectionLifecycleManager.createFor方法返回DispatcherRunner。

grantLeadership

内部调用startNewDispatcherLeaderProcess

startNewDispatcherLeaderProcess

创建DispatcherLeaderProcess对象启动

JobDispatcherLeaderProcess.java

start

调用startInternal

startInternal

调用onStart

onStart

调用dispatcherGatewayServiceFactory.create方法,完成Dispatcher的初始化和启动工作,

DefaultDispatcherGatewayServiceFactory.java

create

DispatcherRunnerLeaderElectionLifecycleManager.java

createFor

该方法会返回DispatcherRunnerLeaderElectionLifecycleManager实例。

Init

初始化方法时,会调用它的leaderElectionService的start方法,通过该方法完成Dispatcher的启动工作。

leaderElectionService的实现是StandaloneLeaderElectionService。

StandaloneLeaderElectionService.java

start

内部调用DefaultDispatcherRunner.grantLeadership方法来完成启动

总结

JobManager的Dispatcher的构建和启动主要围绕DispatcherRunner这个类来展开,但中间的代码跳转有点乱,需要多读两遍。