StreamExecutionEnvironment
StreamExecutionEnvironment
是 Flink 在流模式下任务执行的上下文
,也是我们编写 Flink 程序的入口。根据具体的执行环境不同,StreamExecutionEnvironment 有不同的具体实现类,如 LocalStreamEnvironment
, RemoteStreamEnvironment
等。StreamExecutionEnvironment 也提供了用来配置默认并行度
、Checkpointing
等机制的方法,这些配置
主要都保存在 ExecutionConfig
和 CheckpointConfig
中
在 StreamExecutionEnvironment
内部使用一个 List<StreamTransformation<?>> transformations
来保留生成 DataStream
的所有转换
StreamTransformation
代表了生成 DataStream
的操作
,在 DataStream 上面通过算子不断进行转换,就得到了由 StreamTransformation 构成的图
。当需要执行的时候,底层的这个图就会被转换成 StreamGraph
。
StreamTransformation
在运行时并不一定对应着一个物理转换
操作,有一些操作只是逻辑层面上的,比如 split/select/partitioning
等。
每一个 StreamTransformation 都有一个关联的 Id
,这个 Id 是全局递增的。除此以外,还有 uid, slotSharingGroup, parallelism
等信息。
StreamTransformation 有很多具体的子类,如SourceTransformation
、 OneInputStreamTransformation
、TwoInputTransformation
、SideOutputTransformation
、 SinkTransformation
等等,这些分别对应了DataStream
上的不同转换
操作。
public OneInputTransformation(
Transformation<IN> input,
String name,
StreamOperatorFactory<OUT> operatorFactory,
TypeInformation<OUT> outputType,
int parallelism) {
super(name, outputType, parallelism);
this.input = input;
this.operatorFactory = operatorFactory;
}
public TwoInputTransformation(
Transformation<IN1> input1,
Transformation<IN2> input2,
String name,
StreamOperatorFactory<OUT> operatorFactory,
TypeInformation<OUT> outputType,
int parallelism) {
super(name, outputType, parallelism);
this.input1 = input1;
this.input2 = input2;
this.operatorFactory = operatorFactory;
}
DataStream
一个 DataStream
就表征了由同一种类型元素构成的数据流
。通过对 DataStream 应用 map/filter
等操作,可以将一个 DataStream
转换为另一个 DataStream
,这个转换的过程
就是根据不同的操作
生成不同的 StreamTransformation
,并将其加入 StreamExecutionEnvironment
的 transformations
列表中。
@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
transformation.getOutputType();
//构造 StreamTransformation
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
this.transformation,
operatorName,
operator,
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({ "unchecked", "rawtypes" })
SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
//加入到 StreamExecutionEnvironment 的列表中
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
DataStream
的子类包括 SingleOutputStreamOperator
、 DataStreamSource
KeyedStream
、IterativeStream
, SplitStream
(已弃用)。这里要吐槽一下 SingleOutputStreamOperator
的这个类的命名,太容易和 StreamOperator
混淆了。StreamOperator
的介绍见下一小节。
除了 DataStream
及其子类以外,其它的表征数据流的类还有 ConnectedStreams
(两个流连接在一起)、 WindowedStream
、AllWindowedStream
。这些数据流之间的转换可以参考 Flink 的官方文档。
StreamOperator
在操作 DataStream
的时候,比如 DataStream#map
等,会要求我们实现一个自定义的处理函数
。那么这些信息时如何保存在 StreamTransformation
中的呢?这里就要引入一个新的接口 StreamOperator。
StreamOperator
定义了对一个具体的算子的生命周期的管理
,包括:
//生命周期
void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output);
void open() throws Exception;
void close() throws Exception;
@Override
void dispose() throws Exception;
//状态管理
OperatorSnapshotFutures snapshotState(
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions,
CheckpointStreamFactory storageLocation) throws Exception;
void initializeState() throws Exception;
//其它方法暂时省略
StreamOperator
的两个子接口 OneInputStreamOperator
和 TwoInputStreamOperator
则提供了操作数据流中具体元素的方法,而 AbstractUdfStreamOperator
这个抽象子类则提供了自定义处理函数
对应的算子的基本实现:
#OneInputStreamOperator
void processElement(StreamRecord<IN> element) throws Exception;
void processWatermark(Watermark mark) throws Exception;
void processLatencyMarker(LatencyMarker latencyMarker) throws Exception;
#TwoInputStreamOperator
void processElement1(StreamRecord<IN1> element) throws Exception;
void processElement2(StreamRecord<IN2> element) throws Exception;
void processWatermark1(Watermark mark) throws Exception;
void processWatermark2(Watermark mark) throws Exception;
#AbstractStreamOperator
提供了initializeState、snapshotState等基本方法的实现
至于具体到诸如 map/fliter
等操作对应的 StreamOperator
,基本都是在 AbstractUdfStreamOperator
的基础上实现的。以 StreamMap
为例
@Internal
public class StreamMap<IN, OUT>
extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L;
public StreamMap(MapFunction<IN, OUT> mapper) {
super(mapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
output.collect(element.replace(userFunction.map(element.getValue())));
}
}
通过 DataStream
–> StreamTransformation
–> StreamOperator
这样的依赖关系,就可以完成 DataStream
的转换,并且保留数据流
和应用在流上的算子
之间的关系
StreamGraph
StreamGraphGenerator
会基于 StreamExecutionEnvironment
的 transformations
列表来生成 StreamGraph
在遍历 List<StreamTransformation> 生成 StreamGraph 的时候,会递归调用StreamGraphGenerator#transform
方法。对于每一个 StreamTransformation
, 确保当前其上游
已经完成转换。StreamTransformations 被转换为 StreamGraph 中的节点 StreamNode
,并为上下游节点添加边 StreamEdge
Collection<Integer> transformedIds;
if (transform instanceof OneInputTransformation<?, ?>) {
transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
} else if (transform instanceof SourceTransformation<?>) {
transformedIds = transformSource((SourceTransformation<?>) transform);
} else if (transform instanceof SinkTransformation<?>) {
transformedIds = transformSink((SinkTransformation<?>) transform);
} else if (transform instanceof UnionTransformation<?>) {
transformedIds = transformUnion((UnionTransformation<?>) transform);
} else if (transform instanceof SplitTransformation<?>) {
transformedIds = transformSplit((SplitTransformation<?>) transform);
} else if (transform instanceof SelectTransformation<?>) {
transformedIds = transformSelect((SelectTransformation<?>) transform);
} else if (transform instanceof FeedbackTransformation<?>) {
transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
} else if (transform instanceof CoFeedbackTransformation<?>) {
transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
} else if (transform instanceof PartitionTransformation<?>) {
transformedIds = transformPartition((PartitionTransformation<?>) transform);
} else if (transform instanceof SideOutputTransformation<?>) {
transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
} else {
throw new IllegalStateException("Unknown transformation: " + transform);
}
对于不同类型的 StreamTransformation
,分别调用对应的转换方法,以 最典型的 transformOneInputTransform
为例:
private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
//首先确保上游节点完成转换
Collection<Integer> inputIds = transform(transform.getInput());
// the recursive call might have already transformed this
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
streamGraph.addOperator(transform.getId(),
slotSharingGroup,
transform.getCoLocationGroupKey(),
transform.getOperatorFactory(),
transform.getInputType(),
transform.getOutputType(),
transform.getName());
if (transform.getStateKeySelector() != null) {
TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig);
streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
}
int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
transform.getParallelism() : executionConfig.getParallelism();
streamGraph.setParallelism(transform.getId(), parallelism);
streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
for (Integer inputId: inputIds) {
streamGraph.addEdge(inputId, transform.getId(), 0);
}
return Collections.singleton(transform.getId());
}
对于不同类型的 StreamTransformation
,分别调用对应的转换方法,以 最典型的 transformOneInputTransform
为例:
private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
//首先确保上游节点完成转换
Collection<Integer> inputIds = transform(transform.getInput());
// 由于是递归调用的,可能已经完成了转换
if (alreadyTransformed.containsKey(transform)) {
return alreadyTransformed.get(transform);
}
String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
streamGraph.addOperator(transform.getId(),
slotSharingGroup,
transform.getCoLocationGroupKey(),
transform.getOperatorFactory(),
transform.getInputType(),
transform.getOutputType(),
transform.getName());
if (transform.getStateKeySelector() != null) {
TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig);
streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
}
int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
transform.getParallelism() : executionConfig.getParallelism();
streamGraph.setParallelism(transform.getId(), parallelism);
streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
for (Integer inputId: inputIds) {
streamGraph.addEdge(inputId, transform.getId(), 0);
}
return Collections.singleton(transform.getId());
}
StreamGraph 中对应的添加节点
和边
的方法:
protected StreamNode addNode(Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
Class<? extends AbstractInvokable> vertexClass,
StreamOperatorFactory<?> operatorFactory,
String operatorName) {
if (streamNodes.containsKey(vertexID)) {
throw new RuntimeException("Duplicate vertexID " + vertexID);
}
StreamNode vertex = new StreamNode(
vertexID,
slotSharingGroup,
coLocationGroup,
operatorFactory,
operatorName,
new ArrayList<OutputSelector<?>>(),
vertexClass);
//创建 StreamNode,这里保存了 StreamOperator 和 vertexClass 信息
streamNodes.put(vertexID, vertex);
return vertex;
}
在 StreamNode
中,保存了对应的 StreamOperator
(从 StreamTransformation 得到),并且还引入了变量 jobVertexClass
来表示该节点在 TaskManager 中运行时的实际任务类型
Class<? extends AbstractInvokable> jobVertexClass
AbstractInvokable
是所有可以在 TaskManager
中运行的任务的抽象基础类,包括流式任务和批任务。StreamTask
是所有流式任务的基础类,其具体的子类包括 SourceStreamTask
, OneInputStreamTask
, TwoInputStreamTask
等。
对于一些不包含物理转换操作的 StreamTransformation,如 Partitioning, split/select, union
,并不会生成 StreamNode
,而是生成一个带有特定属性的虚拟节点
。当添加一条有虚拟节点
指向下游节点
的边
时,会找到虚拟节点
上游的物理节点
,在两个物理节点
之间添加边
,并把虚拟转换
操作的属性
附着上去。
以 PartitionTansformation 为例, PartitionTansformation 是 KeyedStream 对应的转换:
//StreamGraphGenerator#transformPartition
private <T> Collection<Integer> transformPartition(PartitionTransformation<T> partition) {
Transformation<T> input = partition.getInput();
//递归地转换上游节点
List<Integer> resultIds = new ArrayList<>();
//上游输入节点
Collection<Integer> transformedIds = transform(input);
for (Integer transformedId: transformedIds) {
int virtualId = Transformation.getNewNodeId();
//添加虚拟的 Partition 节点
streamGraph.addVirtualPartitionNode(
transformedId, virtualId, partition.getPartitioner(), partition.getShuffleMode());
resultIds.add(virtualId);
}
return resultIds;
}
// StreamGraph#addVirtualPartitionNode
public void addVirtualPartitionNode(
Integer originalId,
Integer virtualId,
StreamPartitioner<?> partitioner,
ShuffleMode shuffleMode) {
if (virtualPartitionNodes.containsKey(virtualId)) {
throw new IllegalStateException("Already has virtual partition node with id " + virtualId);
}
//添加一个虚拟节点,后续添加边的时候会连接到实际的物理节点
virtualPartitionNodes.put(virtualId, new Tuple3<>(originalId, partitioner, shuffleMode));
}
前面提到,在每一个物理节点
的转换
上,会调用 StreamGraph#addEdge
在输入节点
和当前节点
之间建立边的连接
:
private void addEdgeInternal(Integer upStreamVertexID,
Integer downStreamVertexID,
int typeNumber,
StreamPartitioner<?> partitioner,
List<String> outputNames,
OutputTag outputTag,
ShuffleMode shuffleMode) {
//先判断是不是虚拟节点上的边,如果是,则找到虚拟节点上游对应的物理节点
//在两个物理节点之间添加边,并把对应的 StreamPartitioner,或者 OutputTag 等补充信息添加到StreamEdge中
if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;
if (outputTag == null) {
outputTag = virtualSideOutputNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode);
} else if (virtualSelectNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualSelectNodes.get(virtualId).f0;
if (outputNames.isEmpty()) {
// selections that happen downstream override earlier selections
outputNames = virtualSelectNodes.get(virtualId).f1;
}
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
} else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
int virtualId = upStreamVertexID;
upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
if (partitioner == null) {
partitioner = virtualPartitionNodes.get(virtualId).f1;
}
shuffleMode = virtualPartitionNodes.get(virtualId).f2;
addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames, outputTag, shuffleMode);
} else {
//两个物理节点
StreamNode upstreamNode = getStreamNode(upStreamVertexID);
StreamNode downstreamNode = getStreamNode(downStreamVertexID);
// If no partitioner was specified and the parallelism of upstream and downstream
// operator matches use forward partitioning, use rebalance otherwise.
if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
partitioner = new ForwardPartitioner<Object>();
} else if (partitioner == null) {
partitioner = new RebalancePartitioner<Object>();
}
if (partitioner instanceof ForwardPartitioner) {
if (upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
throw new UnsupportedOperationException("Forward partitioning does not allow " +
"change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() +
", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() +
" You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
}
}
if (shuffleMode == null) {
shuffleMode = ShuffleMode.UNDEFINED;
}
//创建 StreamEdge,保留了 StreamPartitioner 等属性
StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode);
//分别将StreamEdge添加到上游节点和下游节点
getStreamNode(edge.getSourceId()).addOutEdge(edge);
getStreamNode(edge.getTargetId()).addInEdge(edge);
}
}
这样通过 StreamNode 和 SteamEdge,就得到了 DAG 中的所有节点和边,以及它们之间的连接关系,拓扑结构也就建立了
参考自
https://ververica.cn/developers/advanced-tutorial-2-flink-job-execution-depth-analysis/
https://blog.jrwang.me/tags/flink/
https://ververica.cn/developers/advanced-tutorial-1-analysis-of-the-core-mechanism-of-runtime/
网友评论