美文网首页
Flink StreamGraph JobGraph Execu

Flink StreamGraph JobGraph Execu

作者: kaiker | 来源:发表于2022-04-02 14:18 被阅读0次

非常粗略的内容,仅自己参考

Stream Graph

stream graph https://izualzhy.cn/flink-source-stream-graph

  • 对于split parition union select是不会生成真正节点的,会是虚拟节点。虽然不设置节点,但是这些内容会变成属性被放在边中 “设置两个节点的连接方式,也就是边的属性(上游节点的数据是如何发送到下游节点)”
 * <p>Partitioning, split/select and union don't create actual nodes in the {@code StreamGraph}. For
 * these, we create a virtual node in the {@code StreamGraph} that holds the specific property, i.e.
 * partitioning, selector and so on. When an edge is created from a virtual node to a downstream
 * node the {@code StreamGraph} resolved the id of the original node and creates an edge in the
 * graph with the desired property. For example, if you have this graph:
 *
 * <pre>
 *     Map-1 -&gt; HashPartition-2 -&gt; Map-3
 * </pre>
 *
 * <p>where the numbers represent transformation IDs. We first recurse all the way down. {@code
 * Map-1} is transformed, i.e. we create a {@code StreamNode} with ID 1. Then we transform the
 * {@code HashPartition}, for this, we create virtual node of ID 4 that holds the property {@code
 * HashPartition}. This transformation returns the ID 4. Then we transform the {@code Map-3}. We add
 * the edge {@code 4 -> 3}. The {@code StreamGraph} resolved the actual node with ID 1 and creates
 * and edge {@code 1 -> 3} with the property HashPartition.
  • transformation的处理方法,对应的translator
static {
        @SuppressWarnings("rawtypes")
        Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>>
                tmp = new HashMap<>();
        tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator<>());
        tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator<>());
        tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
        tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
        tmp.put(SourceTransformation.class, new SourceTransformationTranslator<>());
        tmp.put(SinkTransformation.class, new SinkTransformationTranslator<>());
        tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator<>());
        tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator<>());
        tmp.put(UnionTransformation.class, new UnionTransformationTranslator<>());
        tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator<>());
        tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator<>());
        tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator<>());
        tmp.put(
                TimestampsAndWatermarksTransformation.class,
                new TimestampsAndWatermarksTransformationTranslator<>());
        tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator<>());
        tmp.put(
                KeyedBroadcastStateTransformation.class,
                new KeyedBroadcastStateTransformationTranslator<>());
        translatorMap = Collections.unmodifiableMap(tmp);
    }
  • translate 添加边、点,在transform里
        return shouldExecuteInBatchMode
                ? translator.translateForBatch(transform, context)
                : translator.translateForStreaming(transform, context); // 一般是这个
   @Override
    public final Collection<Integer> translateForStreaming(
            final T transformation, final Context context) {
        checkNotNull(transformation);
        checkNotNull(context);

        final Collection<Integer> transformedIds =
                translateForStreamingInternal(transformation, context); // 这个地方会最终调用到translator里的translateInternal
        configure(transformation, context);

        return transformedIds;
    }
// AbstractOneInputTansformationTranslator.java
    protected Collection<Integer> translateInternal(
            final Transformation<OUT> transformation,
            final StreamOperatorFactory<OUT> operatorFactory,
            final TypeInformation<IN> inputType,
            @Nullable final KeySelector<IN, ?> stateKeySelector,
            @Nullable final TypeInformation<?> stateKeyType,
            final Context context) {
        checkNotNull(transformation);
        checkNotNull(operatorFactory);
        checkNotNull(inputType);
        checkNotNull(context);

        final StreamGraph streamGraph = context.getStreamGraph();
        final String slotSharingGroup = context.getSlotSharingGroup();
        final int transformationId = transformation.getId();
        final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();

        streamGraph.addOperator(
                transformationId,
                slotSharingGroup,
                transformation.getCoLocationGroupKey(),
                operatorFactory,
                inputType,
                transformation.getOutputType(),
                transformation.getName());

        if (stateKeySelector != null) {
            TypeSerializer<?> keySerializer = stateKeyType.createSerializer(executionConfig);
            streamGraph.setOneInputStateKey(transformationId, stateKeySelector, keySerializer);
        }

        int parallelism =
                transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
                        ? transformation.getParallelism()
                        : executionConfig.getParallelism();
        streamGraph.setParallelism(transformationId, parallelism);
        streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());

        final List<Transformation<?>> parentTransformations = transformation.getInputs();
        checkState(
                parentTransformations.size() == 1,
                "Expected exactly one input transformation but found "
                        + parentTransformations.size());

        for (Integer inputId : context.getStreamNodeIds(parentTransformations.get(0))) {
            streamGraph.addEdge(inputId, transformationId, 0);
        }

        return Collections.singleton(transformationId);
    }

  • 虚拟节点相关的操作在部分translator中有,可以参考SideOutputTransformationTranslator
    private Collection<Integer> translateInternal(
            final SideOutputTransformation<OUT> transformation, final Context context) {
        checkNotNull(transformation);
        checkNotNull(context);

        final StreamGraph streamGraph = context.getStreamGraph();
        final List<Transformation<?>> parentTransformations = transformation.getInputs();
        checkState(
                parentTransformations.size() == 1,
                "Expected exactly one input transformation but found "
                        + parentTransformations.size());

        final List<Integer> virtualResultIds = new ArrayList<>();
        final Transformation<?> parentTransformation = parentTransformations.get(0);
        for (int inputId : context.getStreamNodeIds(parentTransformation)) {
            final int virtualId = Transformation.getNewNodeId();
            streamGraph.addVirtualSideOutputNode(inputId, virtualId, transformation.getOutputTag());
            virtualResultIds.add(virtualId);
        }
        return virtualResultIds;
    }

Job Graph

Job Graph https://izualzhy.cn/flink-source-job-graph
关于IntermediateDataset https://www.jianshu.com/p/efb1313ba52a JobEdge里通过IntermediateDataSet记录source
Job Graph基本结构 https://blog.csdn.net/weixin_39657860/article/details/96756705

  • 在StreamExecutionEnvironment的execute中,executeAsync(streamGraph)
    public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
        ... 

        CompletableFuture<JobClient> jobClientFuture =
                executorFactory
                        .getExecutor(configuration)
                        .execute(streamGraph, configuration, userClassloader);
        ...
  • 可以看到StreamGraph需要转换为JobGraph,并且最后提交时是使用JobGraph进行提交
        public CompletableFuture<JobClient> execute(
                Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassLoader)
                throws Exception {
            final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
            if (jobGraph.getSavepointRestoreSettings() == SavepointRestoreSettings.none()
                    && pipeline instanceof StreamGraph) {
                jobGraph.setSavepointRestoreSettings(
                        ((StreamGraph) pipeline).getSavepointRestoreSettings());
            }
            return miniCluster
                    .submitJob(jobGraph)
  • PipelineExecutorUtils.getJobGraph找下来,在StreamGraphTranslator里
    public JobGraph translateToJobGraph(
            Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) {
        checkArgument(
                pipeline instanceof StreamGraph, "Given pipeline is not a DataStream StreamGraph.");

        StreamGraph streamGraph = (StreamGraph) pipeline;
        return streamGraph.getJobGraph(null);
    }
    public JobGraph getJobGraph(@Nullable JobID jobID) {
        return StreamingJobGraphGenerator.createJobGraph(this, jobID);
    }

  • 在createJobGraph中,核心就是chain的操作,下面这个写的很详细。

深入分析Flink的operator chain(算子链)机制

// createChain() 能chain就一直往后找
            for (StreamEdge chainable : chainableOutputs) {
                transitiveOutEdges.addAll(
                        createChain(
                                chainable.getTargetId(),
                                chainIndex + 1,
                                chainInfo,
                                chainEntryPoints));
            }

            for (StreamEdge nonChainable : nonChainableOutputs) {
                transitiveOutEdges.add(nonChainable);
                createChain(
                        nonChainable.getTargetId(),
                        1, // operators start at position 1 because 0 is for chained source inputs
                        chainEntryPoints.computeIfAbsent(
                                nonChainable.getTargetId(),
                                (k) -> chainInfo.newChain(nonChainable.getTargetId())),
                        chainEntryPoints);
            }

关于什么情况能chain在一起

  • 上下游算子实例处于同一个SlotSharingGroup中;
  • 下游算子的链接策略(ChainingStrategy)为ALWAYS——既可以与上游链接,也可以与下游链>接。我们常见的map()、filter()等都属此类;
  • 上游算子的链接策略为HEAD或ALWAYS。HEAD策略表示只能与下游链接,这在正常情况下是Source算子的专属;
  • 两个算子间的物理分区逻辑是ForwardPartitioner,《聊聊Flink DataStream的八种物理分区逻辑》
  • 两个算子间的shuffle方式不是批处理模式;* 上下游算子实例的并行度相同

ExecutionGraph

Flink 源码之ExecutionGraph

似乎有错,但是可以看到Dispatcher会start and submit application
  • 最后JobGraph还会转换成ExecutionGraph,就在.submitJob(jobGraph)之后
  • 路径可以和引用的Flink 源码之ExecutionGraph描述对照,但是到persistAndRunJob有了一些变化,找到DefaultJobMasterServiceFactory#internalCreateJobMasterService--JobMaster#createScheduler能看到是在创建JobMaster
    private void persistAndRunJob(JobGraph jobGraph) throws Exception {
        jobGraphWriter.putJobGraph(jobGraph);
        runJob(createJobMasterRunner(jobGraph), ExecutionType.SUBMISSION);
    }
  • JobMaster里有一个,这个调用链路比较长。schedulerNGFactory.createInstanc--DefaultSchedulteFactory#createInstance--DefaultScheduler(这个继承了SchedulerBase,SchedulerBase里有executionGraph)
  • 一直找下去在DefaultExecutionGraphBuilder能找到buildGraph
        this.schedulerNG =
                createScheduler(
                        slotPoolServiceSchedulerFactory,
                        executionDeploymentTracker,
                        jobManagerJobMetricGroup,
                        jobStatusListener);
// SchedulerBase
this.executionGraph =
                createAndRestoreExecutionGraph(
                        completedCheckpointStore,
                        checkpointsCleaner,
                        checkpointIdCounter,
                        initializationTimestamp,
                        mainThreadExecutor,
                        jobStatusListener,
                        vertexParallelismStore);
  • ExecutionGraph为Flink作业的物理执行计划。用来协调数据流的分布式执行过程。和StreamGraph,JobGraph不同的是,ExecutionGraph是在JobManager中生成。
  • ExecutionGraph也有顶点(Vertex)的概念,ExecutionGraph中的vertex为ExecutionJobVertex,和JobGraph中的JobVertex对应。从ExecutionGraph到JobGraph的过程中加入了并行度的概念,ExecutionJobVertex包含了与之对应的JobVertex中所有的并行任务。ExecutionJobVertex之中每一个并行的任务由ExecutionVertex代表。也就是说一个ExecutionJobVertex具有多少并行度,它下面就包含多少个ExecutionVertex。
  • ExecutionVertex可以被执行一次或多次(由于任务恢复,重计算或更新配置)ExecutionVertex的每一次执行都会生成一个Execution对象。Execution负责跟踪ExecutionVertex的任务执行状态变化和资源使用状况。
  • IntermediateResult和JobGraph中JobVertex的IntermediateDataSet的概念对应,用于表示两个相邻的ExecutionJobVertex之间数据传输过程中的临时存放点。IntermediateResult在ExecutionJobVertex创建的时候被构建出来,数量和该vertex的并行度一致。
  • attachJobGraph映射JobGraph和ExecutionGraph
  • ExecutionVertex里会new Execution
  • SchedulerBase里的startScheduling()最终调用到
  • Execution.deploy最终会被调用
CompletableFuture.supplyAsync(
                            () -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor)
    public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
        return taskExecutorGateway.submitTask(tdd, jobMasterId, timeout);
    }
    public CompletableFuture<Acknowledge> submitTask(
            TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {
      ...
      Task task =
                    new Task(
                            jobInformation,
                            taskInformation,
                            tdd.getExecutionAttemptId(),
                            tdd.getAllocationId(),
                            tdd.getSubtaskIndex(),
                            tdd.getAttemptNumber(),
                            tdd.getProducedPartitions(),
                            tdd.getInputGates(),
                            memoryManager,
                            taskExecutorServices.getIOManager(),
                            taskExecutorServices.getShuffleEnvironment(),
                            taskExecutorServices.getKvStateService(),
                            taskExecutorServices.getBroadcastVariableManager(),
                            taskExecutorServices.getTaskEventDispatcher(),
                            externalResourceInfoProvider,
                            taskStateManager,
                            taskManagerActions,
                            inputSplitProvider,
                            checkpointResponder,
                            taskOperatorEventGateway,
                            aggregateManager,
                            classLoaderHandle,
                            fileCache,
                            taskManagerConfiguration,
                            taskMetricGroup,
                            resultPartitionConsumableNotifier,
                            partitionStateChecker,
                            getRpcService().getScheduledExecutor());

  ...

            if (taskAdded) {
                task.startTaskThread();

  ... 

相关文章

网友评论

      本文标题:Flink StreamGraph JobGraph Execu

      本文链接:https://www.haomeiwen.com/subject/biknjrtx.html