美文网首页
flink run任务提交流程<命令行方式>

flink run任务提交流程<命令行方式>

作者: Watchi | 来源:发表于2019-10-24 14:09 被阅读0次

    运行模式是 flink-on-yarn per-job模式,每个任务有独立的yarn session,启动任务的方式是CLI方式。所以我们任务启动命令像是这样:

    flink run -ynm your_jobName -yn 7 -ys 2 -p 14  -ytm 2048m -yjm 2048m  -m yarn-cluster -yD metrics.reporter.influxdb_reporter.db=metrics_flink -c com.xxx.K2kExtractStream  k2k-extractor-1.0-SNAPSHOT_jobName.jar
    

    那么任务具体是如何启动的呢?
    让我们先看下flink启动脚本:


    image2019-10-24 13_42_41.png

    再看下CliFrontend的实现:


    image2019-10-24 12_57_14.png
    获取配置,最终调用:parseParameters方法
    image2019-10-24 12_56_44.png
    这个方法里构建了 PackagedProgram 对象,对象内容如下:
    image2019-10-24 13_0_17.png

    然后把PackagedProgram以及几个参数传入runProgram


    image2019-10-24 12_58_35.png
    这个方法很关键,里面创建了ClusterClient (on-yarn模式即RestClusterClient) 和 jobGraph(通过program构建),并把jobGraph部署到JobCluster。
    image2019-10-24 13_28_27.png
    deploySessionCluster是调用的父类AbstractYarnClusterDescriptor的deploySessionCluster,并返回一个client对象。这里对配置和yarn集群相关的做了一定的检查,最主要调用了两个方法:
    1 startAppMaster里面封装了大量yarn客户端代码,最终yarnClient.submitApplication(appContext);
    和 2 createYarnClusterClient,分别是创建appMaster和启动一个与yarn进行Http 交互的Client(启动了两个LeaderRetrievalService)
    image2019-10-24 13_19_26.png
    至此发生了什么呢? 上面我们提到了返回一个client,其实上面的过程执行后,yarn上已经有了一个flink任务(通过yarn任务list可查),只不过这个flink任务还没有执行env.execute的后续流程。
    client的执行包含了env.execute(),我们继续往下看
    image2019-10-24 13_35_57.png
    这个里面的逻辑是先切换当前classloader为userCodeClassloader。然后开始通过反射去执行任务jar包里的main函数。
    image2019-10-24 13_37_43.png

    最后执行的方法:

    mainMethod.invoke(null, (Object) args);
    

    至此任务提交jar到执行jar内main函数已经分析完,有些流程不那么重点的就忽略了,比如如何启动的appMaster。先到这里,之后env.execute 是如何转化成可执行任务的。后续补充......

    相关文章

      网友评论

          本文标题:flink run任务提交流程<命令行方式>

          本文链接:https://www.haomeiwen.com/subject/gngrvctx.html