非常粗略的内容,仅自己参考
Stream Graph
stream graph https://izualzhy.cn/flink-source-stream-graph
- 对于split parition union select是不会生成真正节点的,会是虚拟节点。虽然不设置节点,但是这些内容会变成属性被放在边中 “设置两个节点的连接方式,也就是边的属性(上游节点的数据是如何发送到下游节点)”
* <p>Partitioning, split/select and union don't create actual nodes in the {@code StreamGraph}. For
* these, we create a virtual node in the {@code StreamGraph} that holds the specific property, i.e.
* partitioning, selector and so on. When an edge is created from a virtual node to a downstream
* node the {@code StreamGraph} resolved the id of the original node and creates an edge in the
* graph with the desired property. For example, if you have this graph:
*
* <pre>
* Map-1 -> HashPartition-2 -> Map-3
* </pre>
*
* <p>where the numbers represent transformation IDs. We first recurse all the way down. {@code
* Map-1} is transformed, i.e. we create a {@code StreamNode} with ID 1. Then we transform the
* {@code HashPartition}, for this, we create virtual node of ID 4 that holds the property {@code
* HashPartition}. This transformation returns the ID 4. Then we transform the {@code Map-3}. We add
* the edge {@code 4 -> 3}. The {@code StreamGraph} resolved the actual node with ID 1 and creates
* and edge {@code 1 -> 3} with the property HashPartition.
- transformation的处理方法,对应的translator
static {
@SuppressWarnings("rawtypes")
Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>>
tmp = new HashMap<>();
tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator<>());
tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator<>());
tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator<>());
tmp.put(SourceTransformation.class, new SourceTransformationTranslator<>());
tmp.put(SinkTransformation.class, new SinkTransformationTranslator<>());
tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator<>());
tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator<>());
tmp.put(UnionTransformation.class, new UnionTransformationTranslator<>());
tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator<>());
tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator<>());
tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator<>());
tmp.put(
TimestampsAndWatermarksTransformation.class,
new TimestampsAndWatermarksTransformationTranslator<>());
tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator<>());
tmp.put(
KeyedBroadcastStateTransformation.class,
new KeyedBroadcastStateTransformationTranslator<>());
translatorMap = Collections.unmodifiableMap(tmp);
}
- translate 添加边、点,在transform里
return shouldExecuteInBatchMode
? translator.translateForBatch(transform, context)
: translator.translateForStreaming(transform, context); // 一般是这个
@Override
public final Collection<Integer> translateForStreaming(
final T transformation, final Context context) {
checkNotNull(transformation);
checkNotNull(context);
final Collection<Integer> transformedIds =
translateForStreamingInternal(transformation, context); // 这个地方会最终调用到translator里的translateInternal
configure(transformation, context);
return transformedIds;
}
// AbstractOneInputTansformationTranslator.java
protected Collection<Integer> translateInternal(
final Transformation<OUT> transformation,
final StreamOperatorFactory<OUT> operatorFactory,
final TypeInformation<IN> inputType,
@Nullable final KeySelector<IN, ?> stateKeySelector,
@Nullable final TypeInformation<?> stateKeyType,
final Context context) {
checkNotNull(transformation);
checkNotNull(operatorFactory);
checkNotNull(inputType);
checkNotNull(context);
final StreamGraph streamGraph = context.getStreamGraph();
final String slotSharingGroup = context.getSlotSharingGroup();
final int transformationId = transformation.getId();
final ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
streamGraph.addOperator(
transformationId,
slotSharingGroup,
transformation.getCoLocationGroupKey(),
operatorFactory,
inputType,
transformation.getOutputType(),
transformation.getName());
if (stateKeySelector != null) {
TypeSerializer<?> keySerializer = stateKeyType.createSerializer(executionConfig);
streamGraph.setOneInputStateKey(transformationId, stateKeySelector, keySerializer);
}
int parallelism =
transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT
? transformation.getParallelism()
: executionConfig.getParallelism();
streamGraph.setParallelism(transformationId, parallelism);
streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
final List<Transformation<?>> parentTransformations = transformation.getInputs();
checkState(
parentTransformations.size() == 1,
"Expected exactly one input transformation but found "
+ parentTransformations.size());
for (Integer inputId : context.getStreamNodeIds(parentTransformations.get(0))) {
streamGraph.addEdge(inputId, transformationId, 0);
}
return Collections.singleton(transformationId);
}
- 虚拟节点相关的操作在部分translator中有,可以参考SideOutputTransformationTranslator
private Collection<Integer> translateInternal(
final SideOutputTransformation<OUT> transformation, final Context context) {
checkNotNull(transformation);
checkNotNull(context);
final StreamGraph streamGraph = context.getStreamGraph();
final List<Transformation<?>> parentTransformations = transformation.getInputs();
checkState(
parentTransformations.size() == 1,
"Expected exactly one input transformation but found "
+ parentTransformations.size());
final List<Integer> virtualResultIds = new ArrayList<>();
final Transformation<?> parentTransformation = parentTransformations.get(0);
for (int inputId : context.getStreamNodeIds(parentTransformation)) {
final int virtualId = Transformation.getNewNodeId();
streamGraph.addVirtualSideOutputNode(inputId, virtualId, transformation.getOutputTag());
virtualResultIds.add(virtualId);
}
return virtualResultIds;
}
Job Graph
Job Graph https://izualzhy.cn/flink-source-job-graph
关于IntermediateDataset https://www.jianshu.com/p/efb1313ba52a JobEdge里通过IntermediateDataSet记录source
Job Graph基本结构 https://blog.csdn.net/weixin_39657860/article/details/96756705
- 在StreamExecutionEnvironment的execute中,executeAsync(streamGraph)
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
...
CompletableFuture<JobClient> jobClientFuture =
executorFactory
.getExecutor(configuration)
.execute(streamGraph, configuration, userClassloader);
...
- 可以看到StreamGraph需要转换为JobGraph,并且最后提交时是使用JobGraph进行提交
public CompletableFuture<JobClient> execute(
Pipeline pipeline, Configuration configuration, ClassLoader userCodeClassLoader)
throws Exception {
final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
if (jobGraph.getSavepointRestoreSettings() == SavepointRestoreSettings.none()
&& pipeline instanceof StreamGraph) {
jobGraph.setSavepointRestoreSettings(
((StreamGraph) pipeline).getSavepointRestoreSettings());
}
return miniCluster
.submitJob(jobGraph)
- PipelineExecutorUtils.getJobGraph找下来,在StreamGraphTranslator里
public JobGraph translateToJobGraph(
Pipeline pipeline, Configuration optimizerConfiguration, int defaultParallelism) {
checkArgument(
pipeline instanceof StreamGraph, "Given pipeline is not a DataStream StreamGraph.");
StreamGraph streamGraph = (StreamGraph) pipeline;
return streamGraph.getJobGraph(null);
}
public JobGraph getJobGraph(@Nullable JobID jobID) {
return StreamingJobGraphGenerator.createJobGraph(this, jobID);
}
- 在createJobGraph中,核心就是chain的操作,下面这个写的很详细。
// createChain() 能chain就一直往后找
for (StreamEdge chainable : chainableOutputs) {
transitiveOutEdges.addAll(
createChain(
chainable.getTargetId(),
chainIndex + 1,
chainInfo,
chainEntryPoints));
}
for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
createChain(
nonChainable.getTargetId(),
1, // operators start at position 1 because 0 is for chained source inputs
chainEntryPoints.computeIfAbsent(
nonChainable.getTargetId(),
(k) -> chainInfo.newChain(nonChainable.getTargetId())),
chainEntryPoints);
}
关于什么情况能chain在一起
- 上下游算子实例处于同一个SlotSharingGroup中;
- 下游算子的链接策略(ChainingStrategy)为ALWAYS——既可以与上游链接,也可以与下游链>接。我们常见的map()、filter()等都属此类;
- 上游算子的链接策略为HEAD或ALWAYS。HEAD策略表示只能与下游链接,这在正常情况下是Source算子的专属;
- 两个算子间的物理分区逻辑是ForwardPartitioner,《聊聊Flink DataStream的八种物理分区逻辑》
- 两个算子间的shuffle方式不是批处理模式;* 上下游算子实例的并行度相同
ExecutionGraph

- 最后JobGraph还会转换成ExecutionGraph,就在
.submitJob(jobGraph)
之后 - 路径可以和引用的Flink 源码之ExecutionGraph描述对照,但是到persistAndRunJob有了一些变化,找到
DefaultJobMasterServiceFactory#internalCreateJobMasterService--JobMaster#createScheduler
能看到是在创建JobMaster
private void persistAndRunJob(JobGraph jobGraph) throws Exception {
jobGraphWriter.putJobGraph(jobGraph);
runJob(createJobMasterRunner(jobGraph), ExecutionType.SUBMISSION);
}
- JobMaster里有一个,这个调用链路比较长。
schedulerNGFactory.createInstanc--DefaultSchedulteFactory#createInstance--DefaultScheduler(这个继承了SchedulerBase,SchedulerBase里有executionGraph)
- 一直找下去在DefaultExecutionGraphBuilder能找到buildGraph
this.schedulerNG =
createScheduler(
slotPoolServiceSchedulerFactory,
executionDeploymentTracker,
jobManagerJobMetricGroup,
jobStatusListener);
// SchedulerBase
this.executionGraph =
createAndRestoreExecutionGraph(
completedCheckpointStore,
checkpointsCleaner,
checkpointIdCounter,
initializationTimestamp,
mainThreadExecutor,
jobStatusListener,
vertexParallelismStore);
- ExecutionGraph为Flink作业的物理执行计划。用来协调数据流的分布式执行过程。和StreamGraph,JobGraph不同的是,ExecutionGraph是在JobManager中生成。
- ExecutionGraph也有顶点(Vertex)的概念,ExecutionGraph中的vertex为ExecutionJobVertex,和JobGraph中的JobVertex对应。从ExecutionGraph到JobGraph的过程中加入了并行度的概念,ExecutionJobVertex包含了与之对应的JobVertex中所有的并行任务。ExecutionJobVertex之中每一个并行的任务由ExecutionVertex代表。也就是说一个ExecutionJobVertex具有多少并行度,它下面就包含多少个ExecutionVertex。
- ExecutionVertex可以被执行一次或多次(由于任务恢复,重计算或更新配置)ExecutionVertex的每一次执行都会生成一个Execution对象。Execution负责跟踪ExecutionVertex的任务执行状态变化和资源使用状况。
- IntermediateResult和JobGraph中JobVertex的IntermediateDataSet的概念对应,用于表示两个相邻的ExecutionJobVertex之间数据传输过程中的临时存放点。IntermediateResult在ExecutionJobVertex创建的时候被构建出来,数量和该vertex的并行度一致。
- attachJobGraph映射JobGraph和ExecutionGraph
- ExecutionVertex里会new Execution
- SchedulerBase里的startScheduling()最终调用到
- Execution.deploy最终会被调用
CompletableFuture.supplyAsync(
() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor)
public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
return taskExecutorGateway.submitTask(tdd, jobMasterId, timeout);
}
public CompletableFuture<Acknowledge> submitTask(
TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {
...
Task task =
new Task(
jobInformation,
taskInformation,
tdd.getExecutionAttemptId(),
tdd.getAllocationId(),
tdd.getSubtaskIndex(),
tdd.getAttemptNumber(),
tdd.getProducedPartitions(),
tdd.getInputGates(),
memoryManager,
taskExecutorServices.getIOManager(),
taskExecutorServices.getShuffleEnvironment(),
taskExecutorServices.getKvStateService(),
taskExecutorServices.getBroadcastVariableManager(),
taskExecutorServices.getTaskEventDispatcher(),
externalResourceInfoProvider,
taskStateManager,
taskManagerActions,
inputSplitProvider,
checkpointResponder,
taskOperatorEventGateway,
aggregateManager,
classLoaderHandle,
fileCache,
taskManagerConfiguration,
taskMetricGroup,
resultPartitionConsumableNotifier,
partitionStateChecker,
getRpcService().getScheduledExecutor());
...
if (taskAdded) {
task.startTaskThread();
...
网友评论