Task 是 Flink 任务调度的最小单位。通过 StreamTask -> StreamOperator -> User-define-function 这样的封装,用户自定义的数据处理逻辑最终得以调度执行。
- Task#反射AbstractInvokable(
StreamTask
):(OneInputStreamTask,SourceStreamTask ),通过StreamInputProcessor#processInput#emitNext while true 循环从channel取数据#emitRecord#(OneInputStreamOperator)operator.processElement(record),最终调用到了用户udf函数 - OperatorChain#
StreamOperator
<?>[] allOperators : AbstractStreamOperator ,AbstractUdfStreamOperator,OneInputStreamOperator,StreamMap,层级分装从上往下(用户udf
)
Task 和 OperatorChain
我们已经了解了 Flink 会尽可能把能够 chaining 到一起的算子串联在一起,形成 OperatorChain,对应一个 JobVertex。
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();
StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();
return downStreamVertex.getInEdges().size() == 1
&& outOperator != null
&& headOperator != null
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
//必须是stream模式
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled();
}
自定义operator,别忘了设置operator.setChainingStrategy(ChainingStrategy.ALWAYS),通过flink ui 会发现可能不会chain到一起
网友评论