Flink任务提交流程分析

发布时间 2023-06-21 18:28:59作者: 畔山陆仁贾

背景说明

在早期的Flink1.9时,为了对Flink任务的进行部署管理,对Flink任务提交的流程进行分析。刚好以前的博客图片失效了,那就用Flink1.13来再读一遍相关源码。

任务提交

flink任务提交的起点是flink脚本,以提交至Yarn为例,我们运行wordcount的脚本如下:

bin/flink run -t yarn-per-job examples/batch/WordCount.jar

Flink脚本

最后一行执行java命令

exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

其中 org.apache.flink.client.cli.CliFrontend 就是Flink任务提交的入口类。

导入Flink源码,CliFrontend类属于flink-clients模块。

Flink任务从提交到执行分主要由三大块:

  1. 初始化配置信息
  2. 封装用户代码和任务配置
  3. 调用并执行用户代码

CliFrontend.java

代码

EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

// 1. find the configuration directory
//初始化Filnk配置信息
final String configurationDirectory = getConfigurationDirectoryFromEnv();

// 2. load the global configuration
final Configuration configuration =
        GlobalConfiguration.loadConfiguration(configurationDirectory);

// 3. load the custom command lines
//加载并封装配置参数
final List<CustomCommandLine> customCommandLines =
        loadCustomCommandLines(configuration, configurationDirectory);

int retCode = 31;
try {
    final CliFrontend cli = new CliFrontend(configuration, customCommandLines);

    SecurityUtils.install(new SecurityConfiguration(cli.configuration));
    //执行任务代码
    retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
} catch (Throwable t) {
    final Throwable strippedThrowable =
            ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
    LOG.error("Fatal error while running command line interface.", strippedThrowable);
    strippedThrowable.printStackTrace();
} finally {
    System.exit(retCode);
}

相对来说Flink的代码还是比较易读,但也有淡疼的地方,封装调用层层嵌套,容易读着读着跑偏了。

在CliFrontend类中有个run方法,就对应我们命令行的run动作

final Options commandOptions = CliFrontendParser.getRunCommandOptions();
final CommandLine commandLine = getCommandLine(commandOptions, args, true);

// evaluate help flag
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
    CliFrontendParser.printHelpForRun(customCommandLines);
    return;
}

final CustomCommandLine activeCommandLine =
        validateAndGetActiveCommandLine(checkNotNull(commandLine));

final ProgramOptions programOptions = ProgramOptions.create(commandLine);

final List<URL> jobJars = getJobJarAndDependencies(programOptions);

final Configuration effectiveConfiguration =
        getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);

LOG.debug("Effective executor configuration: {}", effectiveConfiguration);

try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
    
    executeProgram(effectiveConfiguration, program);
}

ClientUtils.java

run方法最终调用ClientUtils.executeProgram方法来调用我们自己写的任务类代码

.....
try {
    program.invokeInteractiveModeForExecution();
} finally {
    ContextEnvironment.unsetAsContext();
    StreamContextEnvironment.unsetAsContext();
}
.....

PackagedProgram.java

再经过层层调用,PackagedProgram.callMainMethod最终通过反射执行用户代码逻辑。

Method mainMethod;
if (!Modifier.isPublic(entryClass.getModifiers())) {
    throw new ProgramInvocationException(
            "The class " + entryClass.getName() + " must be public.");
}

try {
    mainMethod = entryClass.getMethod("main", String[].class);
} catch (NoSuchMethodException e) {
    throw new ProgramInvocationException(
            "The class " + entryClass.getName() + " has no main(String[]) method.");
} catch (Throwable t) {
    throw new ProgramInvocationException(
            "Could not look up the main(String[]) method from the class "
                    + entryClass.getName()
                    + ": "
                    + t.getMessage(),
            t);
}

if (!Modifier.isStatic(mainMethod.getModifiers())) {
    throw new ProgramInvocationException(
            "The class " + entryClass.getName() + " declares a non-static main method.");
}
if (!Modifier.isPublic(mainMethod.getModifiers())) {
    throw new ProgramInvocationException(
            "The class " + entryClass.getName() + " declares a non-public main method.");
}

try {
    mainMethod.invoke(null, (Object) args);
} catch (IllegalArgumentException e) {
    throw new ProgramInvocationException(
            "Could not invoke the main method, arguments are not matching.", e);
} catch (IllegalAccessException e) {
    throw new ProgramInvocationException(
            "Access to the main method was denied: " + e.getMessage(), e);
} catch (InvocationTargetException e) {
    Throwable exceptionInMethod = e.getTargetException();
    if (exceptionInMethod instanceof Error) {
        throw (Error) exceptionInMethod;
    } else if (exceptionInMethod instanceof ProgramParametrizationException) {
        throw (ProgramParametrizationException) exceptionInMethod;
    } else if (exceptionInMethod instanceof ProgramInvocationException) {
        throw (ProgramInvocationException) exceptionInMethod;
    } else {
        throw new ProgramInvocationException(
                "The main method caused an error: " + exceptionInMethod.getMessage(),
                exceptionInMethod);
    }
} catch (Throwable t) {
    throw new ProgramInvocationException(
            "An error occurred while invoking the program's main method: " + t.getMessage(),
            t);
}

总结

顺手简单画了一个调用流程图

如果只关心Flink任务是如何提交到集群并执行的,那到此其实就可以结束了。相对而言还是比较简单的,不过里面的嵌套调用有点多,先理个主干出来就容易理解了。

当然小细节也是比较多的,比如加载命令行对象的过程中,对加载顺序是有一定要求的。