Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
JobGraph
相比StreamGraph,JobGraph在生成的时候做出了一项优化:将尽可能多的operator组合到同一个task中,形成operator chain。这样以来,同一个chain中的operator运行在同一个线程中,可以显著降低线程切换的性能开销,并且能增大吞吐量和降低延迟。
Operator Chain入口方法
StreamGraph的getJobGraph
方法
@Override
public JobGraph getJobGraph(@Nullable JobID jobID) {
return StreamingJobGraphGenerator.createJobGraph(this, jobID);
}
生成JobGraph的逻辑在StreamingJobGraphGenerator
类中
StreamingJobGraphGenerator的createJobGraph方法如下所示:
public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) {
return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph();
}
继续跟踪,发现创建JobGraph的主要逻辑如下所示:
private JobGraph createJobGraph() {
// 进行一些校验工作
preValidate();
// make sure that all vertices start immediately
// 设置JobGraph的调度模式
jobGraph.setScheduleMode(streamGraph.getScheduleMode());
// Generate deterministic hashes for the nodes in order to identify them across
// submission iff they didn't change.
Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
// Generate legacy version hashes for backwards compatibility
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
// 这里是重点,JobGraph的顶点和边在这个方法中创建,并且尝试将尽可能多的StreamNode聚合在一个JobGraph节点中。聚合条件稍后分析
setChaining(hashes, legacyHashes, chainedOperatorHashes);
// 设置物理边界
setPhysicalEdges();
// 设置slot共享和coLocation。同一个coLocationGroup的task需要在同一个slot中运行
setSlotSharingAndCoLocation();
// 配置检查点
configureCheckpointing();
JobGraphGenerator.addUserArtifactEntries(streamGraph.getUserArtifacts(), jobGraph);
// set the ExecutionConfig last when it has been finalized
try {
// 设置运行时配置
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
}
catch (IOException e) {
throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
"This indicates that non-serializable types (like custom serializers) were registered");
}
return jobGraph;
}
其中最为重要的是setChaining方法。该方法为StreamGraph中的每个source节点生成Job Vertex(chain)。
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);
}
}
Chain的概念:JobGraph最为重要的优化方式为创建OperatorChain,可以尽可能的多整合一些操作在同一个节点中完成,避免不必要的线程切换和网络通信。
createChain方法的主要逻辑:
- 如果stream具有多个sources,遍历每一个sources,调用createChain方法。
- createChain方法的两个参数startNodeId和currentNodeId,如果这两个参数形同,意味着一个新chain的创建。如果这两个参数不相同,则将startNode和currentNode构造在同一个chain中。
- 使用一个变量builtVertices保证各个StreamNode没有被重复处理。
- 处理流程将各个节点的出边(out edge)分类。分类的依据为isChainable函数。
- 出边分为3类,可以被chain和不可以被chain的,还有一种(transitiveOutEdges)是在递归调用createChain的时候加入,目的是存放整个chain所有的出边(在构造chain的时候,遇到一个无法被chain的节点,则意味着该chain已经结束,这个无法被chain的StreamEdge就是这个chain的出边)。
- createChain方法会递归调用。如果某个StreamNode的出边可以chain,则调用createChain方法连接这个节点(chain的起始节点)和这个节点可以被chain的出边指向的节点,一直递归到出边不可chain为止。
- 遇到不可chain的节点,会创建一个job vertex。
- 同一个chain中的start node和chain内的节点之间operator的关系在chainedOperatorHashes变量中保存,结构为
Map<startNodeID, List<Tuple2<StartNodeHash, currentNodeHash>>>
- 每一个Stream Node(无论有没有对应的job vertex)的配置信息在config变量中。
setVertexConfig
方法负责设置config变量。 - 通过
ChainedConfig
变量来保存chain的起始节点和chain内各个节点配置的对应关系。ChainedConfig
结构为Map<startNodeID, Map<currentNodeID, Config>>
。 - 调用connect方法将每个job vertex(chain)和下一个连接起来。比如节点A和B相连,会现在A后追加一个Intermediate DateSet,然后是Job Edge,最后连接到B节点。
createChain代码如下所示:
private List<StreamEdge> createChain(
Integer startNodeId,
Integer currentNodeId,
Map<Integer, byte[]> hashes,
List<Map<Integer, byte[]>> legacyHashes,
int chainIndex,
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
// builtVertices存放了已经被构建了的StreamNode ID,避免重复操作
if (!builtVertices.contains(startNodeId)) {
// 存储整个chain所有的出边
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
// 存储可以被chain的StreamEdge
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
// 存储可以不可以被chain的StreamEdge
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
// 获取当前处理node
StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);
// 分类可以被chain的edge和不可被chain的edge,使用isChainable的方法判断
for (StreamEdge outEdge : currentNode.getOutEdges()) {
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
for (StreamEdge chainable : chainableOutputs) {
// 如果是可被chain的StreamEdge,递归调用createChain
// 注意currentNode是chainable.getTargetId()
// 递归直到currentNode的out edge为不可chain的edge,会执行下一段for循环,不可chain的边被加入transitiveOutEdges,最终返回到递归最外层
// 这样以来,transitiveOutEdges收集齐了整个chain所有的出边
transitiveOutEdges.addAll(
createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
}
for (StreamEdge nonChainable : nonChainableOutputs) {
// 如果是不可被chain的StreamEdge,添加到transitiveOutEdges集合中
transitiveOutEdges.add(nonChainable);
// 调用createChain,构建新的chain
createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
}
List<Tuple2<byte[], byte[]>> operatorHashes =
chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
byte[] primaryHashBytes = hashes.get(currentNodeId);
OperatorID currentOperatorId = new OperatorID(primaryHashBytes);
for (Map<Integer, byte[]> legacyHash : legacyHashes) {
operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
}
// 设置chain的名字
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
// 设置chain的最小资源
chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
// 设置chain的最小资源
chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
if (currentNode.getInputFormat() != null) {
getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
}
if (currentNode.getOutputFormat() != null) {
getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
}
// 如果currentNodeId和startNodeId相等,说明需要创建一个新的chain,会生成一个JobVertex
StreamConfig config = currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
: new StreamConfig(new Configuration());
// 设置的顶点属性到config中
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
if (currentNodeId.equals(startNodeId)) {
// 意味着一个新chain的开始
config.setChainStart();
config.setChainIndex(0);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
config.setOutEdgesInOrder(transitiveOutEdges);
config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());
// 对于每一个chain,把它和指向下一个chain的出边连接起来
for (StreamEdge edge : transitiveOutEdges) {
connect(startNodeId, edge);
}
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
} else {
chainedConfigs.computeIfAbsent(startNodeId, k -> new HashMap<Integer, StreamConfig>());
config.setChainIndex(chainIndex);
// 获取到被chain的节点
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
// 关联chain内节点的配置信息到chain的起始节点上
chainedConfigs.get(startNodeId).put(currentNodeId, config);
}
config.setOperatorID(currentOperatorId);
if (chainableOutputs.isEmpty()) {
config.setChainEnd();
}
return transitiveOutEdges;
} else {
return new ArrayList<>();
}
}
isChainable方法,这个方法很重要。用于判断某个边两头连接的StreamNode的node是否可以组成OperatorChain。方法如下所示:
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
StreamOperator<?> headOperator = upStreamVertex.getOperator();
StreamOperator<?> outOperator = downStreamVertex.getOperator();
return downStreamVertex.getInEdges().size() == 1
&& outOperator != null
&& headOperator != null
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled();
}
总结起来,可以chain的条件如下(都必须满足):
- 下游节点的前置节点有且只能有1个。
- 该Edge的上游和下游节点必须存在。
- 上游节点和下游节点位于同一个SlotSharingGroup中。
- 下游的chain策略为ChainingStrategy.ALWAYS。
- 上游的chain策略为ChainingStrategy.ALWAYS或ChainingStrategy.HEAD。
- 使用ForwardPartitoner及其子类。
- 上游和下游节点的并行度一致。
- chaining被启用。
接下来是setPhysicalEdges方法。该方法负责设置job vertex的物理边界。执行步骤总结如下:
- 遍历
physicalEdgesInOrder
对象,该对象包含了所有的不可被chain的出边(在调用connect方法的时候edge被加入该集合)。 -
physicalInEdgesInOrder
结构为Map<不可chain的edge指向的下游节点,List<不可chain的edge>>
。 - 找到这些不可chain的edge指向的下游节点,设置物理边界(该节点的入边)
private void setPhysicalEdges() {
Map<Integer, List<StreamEdge>> physicalInEdgesInOrder = new HashMap<Integer, List<StreamEdge>>();
for (StreamEdge edge : physicalEdgesInOrder) {
int target = edge.getTargetId();
List<StreamEdge> inEdges = physicalInEdgesInOrder.computeIfAbsent(target, k -> new ArrayList<>());
inEdges.add(edge);
}
for (Map.Entry<Integer, List<StreamEdge>> inEdges : physicalInEdgesInOrder.entrySet()) {
int vertex = inEdges.getKey();
List<StreamEdge> edgeList = inEdges.getValue();
vertexConfigs.get(vertex).setInPhysicalEdges(edgeList);
}
}
其余的方法对生成JobGraph过程的理解不是很重要,暂时不分析,留在以后补充。
示例图
JobGraph示意图注意StreamGraph的window和sink两个节点被chain到了一起。
网友评论