StreamExecutionEnvironment
- 是程序执行的环境。
- 可基于此设置运行的环境。
- 也可基于该类的一些方法创建出DataStream(DataStreamSource,是DataStream子类的子类)。
- 会记录经过的所有的transformation,可以通过transformation获得StreamGraph(任务拓扑)。
- execute方法会将StreamGraph传递给任务执行器。
public abstract class StreamExecutionEnvironment {
...
// 默认使用processingTime
private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
// 最大的网络延迟记录数
private static final long DEFAULT_NETWORK_BUFFER_TIMEOUT = 100L;
// 执行设置,秉性度、重试次数、默认水位等许多属性
private final ExecutionConfig config = new ExecutionConfig();
// checkpoint设置
private final CheckpointConfig checkpointCfg = new CheckpointConfig();
// 所有的transformation
protected final List<Transformation<?>> transformations = new ArrayList<>();
// 状态后端设置
private StateBackend defaultStateBackend;
// 创建数据流
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
if (function instanceof ResultTypeQueryable) {
typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
}
if (typeInfo == null) {
try {
typeInfo = TypeExtractor.createTypeInfo(
SourceFunction.class,
function.getClass(), 0, null, null);
} catch (final InvalidTypesException e) {
typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
}
}
boolean isParallel = function instanceof ParallelSourceFunction;
clean(function);
final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
}
}
// 直接从data创建
public final <OUT> DataStreamSource<OUT> fromElements(Class<OUT> type, OUT... data) {
if (data.length == 0) {
throw new IllegalArgumentException("fromElements needs at least one element as argument");
}
TypeInformation<OUT> typeInfo;
try {
typeInfo = TypeExtractor.getForClass(type);
}
catch (Exception e) {
throw new RuntimeException("Could not create TypeInformation for type " + type.getName()
+ "; please specify the TypeInformation manually via "
+ "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)", e);
}
return fromCollection(Arrays.asList(data), typeInfo);
}
...
// 用到了持有的transformations变量,生成StreamGraph(Class representing the streaming topology.
// It contains all the information necessary to build the jobgraph for the execution.)
public StreamGraph getStreamGraph(boolean clearTransformations) {
final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
if (clearTransformations) {
transformations.clear();
}
return streamGraph;
}
// 大部分情况下执行时调用的env.execute()执行的方法
public JobExecutionResult execute() throws Exception {
return execute(getStreamGraph());
}
Transformation
A Transformation represents the operation that creates a DataStream. Every DataStream has an underlying Transformation that is the origin of said DataStream.
Transformation代表了创建DataStream的操作,每一个DataStream都持有一个Transformation代表它如何转换出来的。
- 对所有转换操作的记录
- 可以设置操作所需的资源
- 所有的你提供的操作,最终都会被包到Transformation中,addOperator会往StreamExecutionEnvironment里的List<Transformation>做add。因为有这个内容,flink知道你在对流做些什么转化,最后能够生成执行图。
public abstract class Transformation<T> {
protected final int id;
protected String name;
protected String description;
protected TypeInformation<T> outputType;
...
}
SourceTransformation
Transformation种类很多,这里先用StreamExecutionEnvironment里的fromElements()做下探索
fromElements过程中的Transformation
对LegacySourceTransformation的描述:
This represents a Source. This does not actually transform anything since it has no inputs but it
is the root
完成创建后,真正执行的时候需用到这些Transformation。
image.pngfinal StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
public StreamGraph generate() {
...
for (Transformation<?> transformation : transformations) {
transform(transformation);
}
...
for (StreamNode node : streamGraph.getStreamNodes()) {
if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) {
for (StreamEdge edge : node.getInEdges()) {
edge.setSupportsUnalignedCheckpoints(false);
}
}
}
...
return builtStreamGraph;
}
OneInputTansformation
最常见的DataStream转换操作,map操作的大致流程。
map
另外关于StreamGraph,可参考https://izualzhy.cn/flink-source-stream-graph
DataStream
- DataStream里有StreamExecutionEnvironment和Transformation<T>两个类型的成员变量。
- 上面也对这两个类型进行了介绍,DataStream里的API包括了各种对DataStream的操作,比如map、flatMap、filter、keyBy。另外它持有的这两个变量本身是不持有数据的,可以理解flink只是在不断地转换和计算。
public class DataStream<T> {
protected final StreamExecutionEnvironment environment;
protected final Transformation<T> transformation;
...
}
以map操作为例,
- 需要输入一个MapFunction的实现,输入类型T的数据,返回类型O的数据。
- 这个function会被提供给一个Operator(StreamMap<>)
- Operator会定义如何使用这个function
- transform-doTransfrom,如上一部分图所示,又将operator放进了transformation中,这样执行的时候就知道要做什么操作了。
public <R> SingleOutputStreamOperator<R> map(
MapFunction<T, R> mapper, TypeInformation<R> outputType) {
return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}
public interface MapFunction<T, O> extends Function, Serializable {
O map(T value) throws Exception;
}
public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L;
public StreamMap(MapFunction<IN, OUT> mapper) {
super(mapper); // userFunction来自这里,就是提供的mapfunction
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
output.collect(element.replace(userFunction.map(element.getValue())));
}
}
protected <R> SingleOutputStreamOperator<R> doTransform(
String operatorName,
TypeInformation<R> outTypeInfo,
StreamOperatorFactory<R> operatorFactory) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
OneInputTransformation<T, R> resultTransform =
new OneInputTransformation<>(
this.transformation,
operatorName,
operatorFactory,
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({"unchecked", "rawtypes"})
SingleOutputStreamOperator<R> returnStream =
new SingleOutputStreamOperator(environment, resultTransform);
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
DataStream方法都是这样一个逻辑(map\flatmap\process\filter\print),其实keyBy、join等操作看似复杂,最后调用apply还是会包成transformation
- 提供function
- Operator使用function展开数据处理
- transform-doTransform,最终转换成transformation,flink因此能够知道该如何展开处理。
网友评论