美文网首页flink
Flink源码3-task任务提交到graph生成

Flink源码3-task任务提交到graph生成

作者: fat32jin | 来源:发表于2020-12-19 06:56 被阅读0次

1、 上次回顾 0:10 ~0:18

2、 本次大纲 0:18 ~0:22

4.1 Flink 编程套路 0:23 ~ 0:38

4.2 Clifrontend 提交分析 0:38

image.png image.png

src/main/flink-bin/bin/flink
最后一行
exec JAVA_RUNJVM_ARGS FLINK_ENV_JAVA_OPTS "{log_setting[@]}" -classpath "manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"" org.apache.flink.client.cli.CliFrontend "$@"

示例程序:src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java

启动类: org.apache.flink.client.cli.CliFrontend
启动 main 方法

CliFrontend#main()

* 注释: 运行
* 解析命令行并并开始请求操作
* flink run calss1
* args【0】 = run
*/
int retCode = SecurityUtils.getInstalledContext()
.runSecured(() -> cli.parseParameters(args));
——》 CliFrontend#parseParameters()

switch (action) {
case ACTION_RUN:
run(params);
——》 CliFrontend#run()

final PackagedProgram program =
getPackagedProgram(programOptions);
final List<URL> jobJars = program.getJobJarAndDependencies();
final Configuration effectiveConfiguration = getEffectiveConfiguration(
........
executeProgram(effectiveConfiguration, program);
——》 CliFrontend# executeProgram()
——》ClientUtils#executeProgram()
——》 PackagedProgram#invokeInteractiveModeForExecution
——》PackagedProgram#callMainMethod // 反射调用用户程序的main方法

//这时就去到我们自己编写的wordcount应用程序的main方法
mainMethod.invoke(null, (Object) args);
---------------- 到此为止提交任务完成,转入用户编写的 任务解析阶段-------

4 ExecutionEnvironment 源码解析 0:41~ 0:50

image.png

5、 Job 提交流程源码分析 0:50 ~1:50

示例:
src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java

main()

1、source 部分
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
——》
return addSource(new SocketTextStreamFunction(hostname, port, delimiter, maxRetry),
——》 StreamExecutionEnvironment#addSource

final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
..................
return new DataStreamSource<>(this, resolvedTypeInfo, sourceOperator,

2、 回到main方法看flatmap
text .flatMap(new FlatMapFunction<String, WordWithCount>()
——》DataStream#flatMap()
return transform("Flat Map", outputType, new StreamFlatMap<>
——》DataStream#doTransform
getExecutionEnvironment().addOperator(resultTransform);
——》DataStream#addOperator
this.transformations.add(transformation);//增加算子
其他的keyby 算子类似

3、回到main方法看 executor 提交代码

env.execute("Socket Window WordCount");
——》 StreamExecutionEnvironment#execute
return 2 execute(1 getStreamGraph(jobName) );

第1步:getStreamGraph(jobName) 生成 StreamGraph 解析
——》 StreamExecutionEnvironment#getStreamGraph
StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();

第2步:execute(StreamGraph) 解析

4.6. Flink Graph 演变 0:58 ~ 1:20

Flink Graph介绍 :

Flink 的一个 Job,最终,归根结底,还是构建一个高效率的能用于分布式并行执行的 DAG 执行图。
1、帮我们把上下游两个相邻算子如果能chain到一起,则chain到一起做优化
2、chain到一起的多个Operator就会组成一个OperatorChain,当peratorChain执行的时候,到底要执行多少个 Task,则就需要把 DAG 进行并行化变成实实在在的Task来调度执行

最开始:
dataStream.xx1().xxx2().xxx3().....xxxn();
evn.execute();
到最后:
List<StreamTask> 执行(不同的StreamTask(StreamTask内部逻辑计算操作不一样))

总结要点
相邻两个阶段之间的StreamTask是有关系的。(到底哪些上游StreamTask生产数据给下游消费StreamTask)Shuffle关系!

一个 Flink 流式作业,从 Client 提交到 Flink 集群,到最后执行,总共会经历四种不同的状态。总的来说:

  1、Client 首先根据用户编写的代码生成 StreamGraph,然后把 StreamGraph 构建成 JobGraph 提交给 Flink 集群主节点
2、然后启动的 JobMaster 在接收到 JobGraph 后,会对其进行并行化生成 ExecutionGraph 后调度启动 StreamTask 执行。
3、StreamTask 并行化的运行在 Flink 集群中的,就是最终的物理执行图状态结构。

Flink 中的执行图可以分成四层:
StreamGraph ==> JobGraph ==> ExecutionGraph ==> 物理执行图。

StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
JobGraph:StreamGraph 经过优化后生成了 JobGraph,提交给 jobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化反序列化传输消耗。

ExecutionGraph:JobManager 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph 是JobGraph 的并行化版本,是调度层最核心的数据结构。
物理执行图:
JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个 TaskManager 上部署Task 后形成的图,并不是一个具体的数据结构。

关于这四层之间的演变,请看下图:

![image.png](https://img.haomeiwen.com/i11332520/04e493470a374efb.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
StreamGraph 生成 和 execute 执行 解析 1:20 ~ 1:50
第1步:getStreamGraph(jobName) 生成 StreamGraph 解析

4、再回到main方法看 executor 提交代码
env.execute("Socket Window WordCount");
——0 》 StreamExecutionEnvironment#execute
return 2 execute(1 getStreamGraph(jobName) );
------------------▼▼▼▼第1步: 生成 StreamGraph 开始▼▼▼▼ ------------
——》 StreamExecutionEnvironment#getStreamGraph
StreamGraph streamGraph =
getStreamGraphGenerator().setJobName(jobName).generate();

—— 》 StreamExecutionEnvironment#generate
—— 》 StreamGraphGenerator#transform()
——1 》 StreamGraphGenerator#transformOneInputTransform()

streamGraph.addOperator(transform.getId(),
——》StreamGraph#addOperator
addNode(vertexID, slotSharingGroup, coLocationGroup
——》StreamGraph#addNode
StreamNode vertex = new StreamNode(
——》StreamNode #构造函数
回到 ——1 》 StreamGraphGenerator#transformOneInputTransform

streamGraph.addEdge(inputId, transform.getId(), 0);
——》StreamGraph#addEdge
——》StreamGraph#addEdgeInternal
------------------ ▲▲▲▲第1步: 生成 StreamGraph 结束 ▲▲▲------------

第2步:execute(StreamGraph) 解析

回到 ——0 》 StreamExecutionEnvironment#execute()
return 2 execute(1 getStreamGraph(jobName) );
------------------▼▼▼▼第2步:execute (StreamGraph) 开始▼▼▼▼ ------------
——》 StreamExecutionEnvironment# execute(StreamGraph streamGraph)
final JobClient jobClient = executeAsync(streamGraph);
——》 StreamExecutionEnvironment# executeAsync
CompletableFuture<JobClient> jobClientFuture = executorFactory
.getExecutor(configuration)
.execute(streamGraph, configuration);

——2 》AbstractSessionClusterExecutor#execute()

//***** !!从 StreamGraph(pipeline) 得到jobGraph (在下面第2.5 节 详细说明)
2:》 final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline,
.......
return clusterClient .submitJob(jobGraph) 1:38

RestClusterClient#submitJob

image.png
               *  注释: 发送请求
     *  requestFuture.thenCompos 的参数函数的参数,是 requestFuture 的返回结果,就是 Tuple2  *  补充:thenCompose 的参数为一个返回 CompletableFuture 实例的函数,该函数的参数是先前计算步骤的结果。  */
    final CompletableFuture<JobSubmitResponseBody> submissionFuture = requestFuture.thenCompose(

requestAndFileUploads -> sendRetriableRequest(JobSubmitHeaders.getInstance()

——》RestClusterClient#sendRetriableRequest
* 注释: restClient = RestClient
return restClient.sendRequest(webMonitorBaseUrl.getHost(),
——》RestClusterClient#sendRequest()
return submitRequest(targetAddress, targetPort, httpRequest, responseType);
——》RestClusterClient#submitRequest()
* 注释: 通过 Netty 客户端发送请求给 Netty 服务端
final ChannelFuture connectFuture = bootstrap.connect(targetAddress, targetPort);
* 注释: 发送请求 到 JobManager 的 Netty 服务端
httpRequest.writeTo(channel);
------------------ ▲▲▲▲第2步: execute (StreamGraph) 结束 ▲▲▲------------

最终通过 channel 把请求数据,发给 WebMonitorEndpoint 中的 JobSubmitHandler 来执行处理。

第2.5步 由 StreamGraph 生成 jobgraph 分析 2:01~

回到 ——2 》AbstractSessionClusterExecutor#execute()
2:》 final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline,

——》PipelineExecutorUtils#getJobGraph()
final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(pipeline,
——》FlinkPipelineTranslationUtil#getJobGraph()

StreamGraphTranslator#getJobGraph()
——》streamGraph#getJobGraph
——》 StreamingJobGraphGenerator#createJobGraph

image.png
——》 StreamingJobGraphGenerator# setChaining
——》 StreamingJobGraphGenerator# createChain //创建job图的递归方法
2:20
image.png
//判断可否优化job isChainable
for (StreamEdge outEdge : currentNode.getOutEdges()) {
if (isChainable(outEdge, streamGraph)) {
——》StreamingJobGraphGenerator# isChainable()
//九大条件判断
return downStreamVertex.getInEdges().size() == 1
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled(); image.png
image.png
// 1、下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
downStreamVertex.getInEdges().size() == 1;
// 2、上下游节点都在同一个 slot group 中
upStreamVertex.isSameSlotSharingGroup(downStreamVertex);
// 3、前后算子不为空
!(downStreamOperator == null || upStreamOperator == null);
// 4、上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source 默认
是 HEAD)
!upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER;
// 5、下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter 等默认是
ALWAYS)
!downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS;
// 6、两个节点间物理分区逻辑是 ForwardPartitioner
(edge.getPartitioner() instanceof ForwardPartitioner);
// 7、两个算子间的shuffle方式不等于批处理模式
edge.getShuffleMode() != ShuffleMode.BATCH;
// 8、上下游的并行度一致
upStreamVertex.getParallelism() == downStreamVertex.getParallelism();
// 9、用户没有禁用 chain
streamGraph.isChainingEnabled();

4.7. WebMonitorEndpoint 处理 RestClient 的JobSubmit 请求 2:35~ 2:52

最终处理这个请求: JobSubmitHandler 来处理!
核心入口
——》 JobSubmitHandler.handleRequest();

// TODO_MA 注释: 提交任务
// TODO_MA 注释: gateway = Dispatcher
jobGraph -> gateway.submitJob(jobGraph, timeout)

Dispatcher#submitJob
——》 Dispatcher#internalSubmitJob

  • 注释: 提交执行
    final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager( // TODO_MA 注释: 持久和提交
    jobGraph.getJobID(), jobGraph, this::persistAndRunJob).thenApply(ignored -> Acknowledge.get()

——》 Dispatcher#persistAndRunJob
* 注释: 运行 job
final CompletableFuture<Void> runJobFuture = runJob(jobGraph);

     ——3  》 Dispatcher# runJob  
                                         ▼
             注释: 客户端正常提交一个 job 的时候,
             最终由 集群主节点中的 Dispatcher 接收到来继续提交执行 
          // *   第一  1  注释: 创建 JobManagerRunner 
    final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
      // *第二2   提交任务 == start JobManagerRunner
 FunctionUtils.uncheckedFunction(this::startJobManagerRunner)

          ——1 》 Dispatcher#createJobManagerRunner
                                                           ↓
             DefaultJobManagerRunnerFactory#createJobManagerRunner
                                          ▼
               *  注释: 返回 JobManagerRunnerImpl
     *  负责启动 JobMaster
     */
    return new JobManagerRunnerImpl(
                                         ↓
 JobManagerRunnerImpl#构造函数()
             *  注释: 启动 JobMaster
     *  jobMasterService = JobMaster 实例 */
    this.jobMasterService = jobMasterFactory.createJobMasterService
                                         ↓
  DefaultJobManagerRunnerFactory#createJobMasterService
          *  注释: 生成和启动一个 JobMaster
     */
    return new JobMaster(

——2 》 JobMaster#构造函数
// TODO_MA 注释: defaultScheduler
this.schedulerNG = createScheduler(jobManagerJobMetricGroup); 2:49

DefaultJobManagerRunnerFactory#createInstance()
注释: 返回一个 DefaultScheduler
*/
return new DefaultScheduler(log, jobGraph,
—— 》 DefaultScheduler#构造函数
super#构造函数
—— 》 SchedulerBase#构造函数
this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup,

————******** 此阶段END ********————--

4.8. ExecutionGraph 构建和提交源码解析 2:52 ~ 3:31

入口
—— 》 SchedulerBase#createAndRestoreExecutionGraph()
—— 》 SchedulerBase#createExecutionGraph()
—— 》 ExecutionGraphBuilder#buildGraph()

//先构建一个空的ExecutionGraph
try { executionGraph = (prior != null) ? prior :
new ExecutionGraph(
//1、先 根据jobGraph 生成JsonPlan
//2、 JsonPlan 设置到 executionGraph
try {executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph
......
executionGraph.attachJobGraph(sortedTopology);
——》ExecutionGraph #attachJobGraph()

ejv.connectToPredecessors(this.intermediateResults);
——》ExecutionJobVertex#connectToPredecessors

/* ExecutionVertex 一个电影真正执行的StremTask
* 一个 StremTask 得到1个slot
for (int i = 0; i < parallelism; i++) {
ExecutionVertex ev = taskVertices[i]; 3:09

回到 ——3 》 Dispatcher# runJob():

// TODO_MA 注释: 提交任务 == start JobManagerRunner FunctionUtils.uncheckedFunction(this::startJobManagerRunner)

——》 Dispatcher# startJobManagerRunner()
* 注释: 启动 jobManagerRunner
*/
jobManagerRunner.start();

JobManagerRunnerImpl#.start();

  • 注释:ZooKeeperLeaderElectionService = leaderElectionService
    leaderElectionService.start(this);
    ↓//开始选举启动同上一章
    ZooKeeperLeaderElectionService.start

    ZooKeeperLeaderElectionService#isLeader()

    leaderContender.grantLeadership(issuedLeaderSessionID);

    JobManagerRunnerImpl.grantLeadership
    • 注释: 调度 并启动 JobManager
      return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
      ——》JobManagerRunnerImpl#verifyJobSchedulingStatusAndStartJobMan

      * 注释: 启动 JobMaster
      return startJobMaster(leaderSessionId);
      ——》JobManagerRunnerImpl#startJobMaster
      * 注释: 启动 JobMaster
      startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));

      JobMaster#start()

      //1 注释: 确保 RPC 工作正常
      start();
      //2 注释: 执行 JobGragh
      return callAsyncWithoutFencing(() -> startJobExecution
      ——》JobMaster#startJobExecution()

      / *1 注释: 初始化一些必要的服务组件 jobmaster的注册和心跳 */
      startJobMasterServices();

        /* 2  注释: 开始调度执行  接下来进入 Slot 管理(申请和释放) 
      

      resetAndStartScheduler(); 3:31

相关文章

网友评论

    本文标题:Flink源码3-task任务提交到graph生成

    本文链接:https://www.haomeiwen.com/subject/bbzggktx.html