背景
通过命令行向Flink集群提交任务,都经过哪些环节,中间的调用关系是什么。
这里以Yarn模式为例,通过Flink任务提交至Yarn集群,由Yarn的AM开始执行Flink代码作为入口,尝试进行分析。
Flink里的代码调用关系比较复杂,这里只列了部分关键点,太过于琐碎的代码就没有具体深入。待后续再进行深入吧。如果有哪里写的不对的地方请指正。
环境信息
jdk:1.8
scala:2.12
flink:1.12
组件关系
Flink与Yarn组件关系
JobManger组件加载顺序
流程分析
入口类
通过命令行提交flink任务至yarn,在yarn的AM中,调用指定的入口类:org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
main方法
会去调用ClusterEntrypoint.runClusterEntrypoint方法
ClusterEntrypoint.java
runClusterEntrypoint
该方法直接调用内部方法并增加一个回调hook
startCluster
用Lambda回调runCluster方法
runCluster
该方法比较关键,通过调用子类实现的createDispatcherResourceManagerComponentFactory方法,构建一个JobManager组件工厂的实例。在Flink中,还是有非常多工厂模式的影子。
然后通过该工厂的create方法再去构建一个clusterComponent对象实例,在构建该实例时,在其方法内部就完成了各组件的构建和启动工作。
DefaultDispatcherResourceManagerComponentFactory.java
create
该方法是整个JobManager的核心方法,整个JobManger的组件都在这里定义和启动。
按照启动顺序分别是WebMonitorEndpoint、DispatcherRunner和ResourceManager。
这些组件都是高可用的方式,所以有很多调用都是在为高可用服务,我这里就跳过这部分代码,只看和启动相关的。
这里我只分析Dispatcher的相关启动,其它代码分析后续再补充。
createRestEndpoint & start
createDispatcherRunner
而dispatcher的核心是dispatcherRunner,它通过dispatcherRunnerFactory.createDispatcherRunner来构建一个实例,所有的操作都封装到了工厂方法中。dispatcherRunnerFactory的实现类是DefaultDispatcherRunnerFactory。
createResourceManager & start
DefaultDispatcherRunnerFactory.java
createDispatcherRunner
该方法内部再调用DefaultDispatcherRunner.create返回一个DispatcherRunner实例。
DefaultDispatcherRunner.java
create
该方法再调用DispatcherRunnerLeaderElectionLifecycleManager.createFor方法返回DispatcherRunner。
grantLeadership
内部调用startNewDispatcherLeaderProcess
startNewDispatcherLeaderProcess
创建DispatcherLeaderProcess对象启动
JobDispatcherLeaderProcess.java
start
调用startInternal
startInternal
调用onStart
onStart
调用dispatcherGatewayServiceFactory.create方法,完成Dispatcher的初始化和启动工作,
DefaultDispatcherGatewayServiceFactory.java
create
DispatcherRunnerLeaderElectionLifecycleManager.java
createFor
该方法会返回DispatcherRunnerLeaderElectionLifecycleManager实例。
Init
初始化方法时,会调用它的leaderElectionService的start方法,通过该方法完成Dispatcher的启动工作。
leaderElectionService的实现是StandaloneLeaderElectionService。
StandaloneLeaderElectionService.java
start
内部调用DefaultDispatcherRunner.grantLeadership方法来完成启动
总结
JobManager的Dispatcher的构建和启动主要围绕DispatcherRunner这个类来展开,但中间的代码跳转有点乱,需要多读两遍。