本文仅为笔者平日学习记录之用,侵删
原文:https://mp.weixin.qq.com/s/QVAAK7SocsGl8d3cNB0gxg
原作者声明:笔者的源码分析都是基于 flink-1.9.0 release 分支,其实阅读源码不用非常在意版本的问题,各版本的主要流程基本都是类似的。如果熟悉了某个版本的源码,之后新版本有变化,我们重点看一下变化之处即可。
笔者阅读源码中会加很多中文注释,对源码很兴趣且有需要的同学可以关注一下笔者的 github 仓库:https://github.com/1996fanrui/flink/tree/feature/source-code-read-1-9-0
注释都在 feature/source-code-read-1-9-0 分支,之后也会持续更新
本文对以下类做了介绍,希望阅读本文读者能理解这些类的功能及类之间的关系:
- StreamOperator.class
- AbstractUdfStreamOperator.class
- StreamFlatMap.class
- StreamMap.class
- FlatMapFunction.class
- MapFunction.class
- OneInputStreamOperator.class
- TwoInputStreamOperator.class
- CoStreamMap.class
- CoMapFunction.class
看完这篇文章,会对以下问题有更清醒的认识:
在开发一个 Flink 任务时,自定义了一些 MapFunction、FilterFunction、FlatMapFunction,它们 map 方法、filter 方法或 flatMap 方法都是怎么被调用和执行的。
一、 自定义 Function 的封装过程简介
在 Flink 中自定义的 MapFunction、FilterFunction、FlatMapFunction 等都被 Flink 认为是 UDF,都会被封装到 AbstractUdfStreamOperator 中。
例如:DataStream.map() 方法或 flatMap() 方法中会传入我们自定义的 MapFunction 或 FlatMapFunction,map 方法和 flatMap 方法会对 UDF 封装。
DataStream 类的 map 和 flatMap 方法源码如下:
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
Utils.getCallLocationName(), true);
return transform("Map", outType,
// 封装 MapFunction 到 StreamMap
new StreamMap<>(clean(mapper)));
}
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
getType(), Utils.getCallLocationName(), true);
return transform("Flat Map", outType,
// 封装 FlatMapFunction 到 StreamFlatMap
new StreamFlatMap<>(clean(flatMapper)));
}
重点可以关注:
-
new StreamMap<>(clean(mapper))
封装 MapFunction 到 StreamMap 中 -
new StreamFlatMap<>(clean(flatMapper))
封装 FlatMapFunction 到 StreamFlatMap 中
下面介绍一下 StreamFlatMap 类 UML 图,StreamMap 也是类似的。
StreamOperator 是所有 Operator 的基接口,StreamOperator派生出了两个抽象的运算符AbstractStreamOperator 和 AbstractUdfStreamOperator,所有最终的运算符向上追述都直接或间接派生自这两个运算符,而继承链中不同层次的运算符有各自的分工。
StreamFlatMap 还实现了 OneInputStreamOperator,下面会介绍 OneInputStreamOperator。
二、 AbstractUdfStreamOperator 简介
AbstractUdfStreamOperator 继承了 AbstractStreamOperator。
AbstractUdfStreamOperator 源码简版:
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
extends AbstractStreamOperator<OUT>
implements OutputTypeConfigurable<OUT> {
/** The user function. */
protected final F userFunction;
public AbstractUdfStreamOperator(F userFunction) {
this.userFunction = requireNonNull(userFunction);
}
public F getUserFunction() {
return userFunction;
}
}
AbstractUdfStreamOperator 类中有个泛型 F 类型属性 userFunction,用于保存用户定义的 MapFunction、FlatMapFunction。AbstractUdfStreamOperator 的构造器可以将 userFunction 保存起来。
三、 StreamFlatMap 介绍
StreamFlatMap 源码:
public class StreamFlatMap<IN, OUT>
// 继承 AbstractUdfStreamOperator,F 的泛型为 FlatMapFunction
extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
private transient TimestampedCollector<OUT> collector;
public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
// super 表示将 FlatMapFunction 传递给 AbstractUdfStreamOperator
// 将 FlatMapFunction 维护在 AbstractUdfStreamOperator 的 userFunction 中
super(flatMapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void open() throws Exception {
super.open();
collector = new TimestampedCollector<>(output);
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
collector.setTimestamp(element);
// 处理数据时,调用的是 userFunction 的 flatMap 方法,
// 即:用户在 flatMap 中定义的业务逻辑
userFunction.flatMap(element.getValue(), collector);
}
}
StreamFlatMap 继承 AbstractUdfStreamOperator,且指定了 F 的泛型为 FlatMapFunction,即:AbstractUdfStreamOperator 中维护的 userFunction 类型为 FlatMapFunction。
StreamFlatMap 构造器中调用父类构造器,将 FlatMapFunction 传递给 AbstractUdfStreamOperator,将 FlatMapFunction 维护在 AbstractUdfStreamOperator 的 userFunction 中。
StreamFlatMap 处理数据的逻辑,即:StreamFlatMap 的 processElement 方法,调用的是 userFunction 的 flatMap 方法,即:用户在 flatMap 中定义的业务逻辑。
四、 StreamMap 介绍
StreamMap 源码:
public class StreamMap<IN, OUT>
// 继承 AbstractUdfStreamOperator,F 的泛型为 MapFunction
extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L;
public StreamMap(MapFunction<IN, OUT> mapper) {
// super 表示将 MapFunction 传递给 AbstractUdfStreamOperator
// 将 MapFunction 维护在 AbstractUdfStreamOperator 的 userFunction 中
super(mapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
output.collect(element.replace(
// 处理数据时,调用的是 userFunction 的 map 方法,
// 即:用户在 map 中定义的业务逻辑
userFunction.map(element.getValue())));
}
}
StreamMap 的设计与 StreamFlatMap 及其相似:
StreamMap 继承 AbstractUdfStreamOperator,且指定了 F 的泛型为 MapFunction,即:AbstractUdfStreamOperator 中维护的 userFunction 类型为 MapFunction。
StreamMap 构造器中调用父类构造器,将 MapFunction 传递给 AbstractUdfStreamOperator,将 MapFunction 维护在 AbstractUdfStreamOperator 的 userFunction 中。
StreamMap 处理数据的逻辑,即:StreamMap 的 processElement 方法,调用的是 userFunction 的 map 方法,即:用户在 map 中定义的业务逻辑。
五、 OneInputStreamOperator 介绍
在 StreamFlatMap 的 UML 图中看到 StreamFlatMap 还实现了 OneInputStreamOperator 接口,OneInputStreamOperator 源码如下:
public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> {
/**
* 重点在于 processElement 处理每一条数据,然后自定义的 udf 将会重写 processElement 方法,
* 例如 StreamFlatMap 中,就会调用 udf 的 flatMap 方法来处理数据
*/
void processElement(StreamRecord<IN> element) throws Exception;
void processWatermark(Watermark mark) throws Exception;
void processLatencyMarker(LatencyMarker latencyMarker) throws Exception;
}
OneInputStreamOperator 定义了三个方法 processElement、processWatermark、processLatencyMarker,分别处理数据、处理 WaterMark、处理 LatencyMarker。
processElement 用于处理每一条数据,UDF 的包装类 StreamMap、StreamFlatMap 等都实现了 OneInputStreamOperator 接口,并重写了 processElement 方法,例如 StreamFlatMap 中就会调用 udf 的 flatMap 方法来处理数据;StreamMap 中会调用 udf 的 map 方法处理数据。
StreamFlatMap 和 StreamMap 类可以当做一个适配器,Flink 框架使用各种算子处理数据时,只需要调用 processElement 方法即可,框架并不关注当前是 MapFunction 还是 FlatMapFunction,然后 StreamFlatMap 类就会在具体的 processElement 方法中执行 FlatMapFunction 具体处理数据的逻辑。
六、 TwoInputStreamOperator 介绍
源码中与 OneInputStreamOperator 相对应的还有个 TwoInputStreamOperator,笔者刚接触这两个接口时还有有点蒙的,到底是干嘛的,为什么没有 ThreeInputStreamOperator、FourXXX、FiveXXX 等等等。刚开始笔者完全理解错了,错误理解:以为 One 表示 Job 中定义了一个 Source,Two 表示 Job 中定义了两个 Source。
上述理解完全错误的,正确理解是:OneInputStreamOperator 表示当前 Operator 有一个输入流,TwoInputStreamOperator 表示当前 Operator 有两个输入流。
什么样的 Operator 有一个输入流呢?常用的 map、flatMap、filter 等都是有一个输入流的算子。什么样的 Operator 有两个输入流呢?像 Join 相关的算子()就会有两个输入流,或者在 Flink 中 Co 开头的算子都是有两个输入流。
例如 a、b 两个流 connect 之后得到 ConnectedStreams,DataStream 类的 connect 方法源码如下:
public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream) {
return new ConnectedStreams(this.environment, this, dataStream);
}
ConnectedStreams 类的各种算子相当于都有两个输入流,ConnectedStreams 类的部分源码如下,可以看到 ConnectedStreams 类的大部分数据处理的方法,都需要传入一个 Co 开头的算子。
public <R> SingleOutputStreamOperator<R> map(CoMapFunction<IN1, IN2, R> coMapper) {
return transform("Co-Map", outTypeInfo,
new CoStreamMap<>(inputStream1.clean(coMapper)));
}
public <R> SingleOutputStreamOperator<R> flatMap(
CoFlatMapFunction<IN1, IN2, R> coFlatMapper) {
return transform("Co-Flat Map", outTypeInfo,
new CoStreamFlatMap<>(inputStream1.clean(coFlatMapper)));
}
public <R> SingleOutputStreamOperator<R> process(
CoProcessFunction<IN1, IN2, R> coProcessFunction) {
return process(coProcessFunction, outTypeInfo);
}
通过几个案例,相信大家理解了这里 One 和 Two 代表的含义。下图是笔者整理的几个重要类的 UML 图,可以看到 CoProcessFunction、IntervalJoinOperator 和 CoFlatMapFunction 实现了 TwoInputStreamOperator 接口,StreamFlatMap、 StreamMap 和 StreamFilter 类实现了 OneInputStreamOperator 接口。
在 AbstractStreamOperator 源码 doc 中,还有一句:
For concrete implementations,
one of the following two interfaces must also be implemented,
to mark the operator as unary or binary:
{@link OneInputStreamOperator} or {@link TwoInputStreamOperator}
中文含义:对于具体的实现类,还必须实现 OneInputStreamOperator 或 TwoInputStreamOperator,用于标记算子是 一元还是二元。
TwoInputStreamOperator 源码如下:
public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {
void processElement1(StreamRecord<IN1> element) throws Exception;
void processElement2(StreamRecord<IN2> element) throws Exception;
void processWatermark1(Watermark mark) throws Exception;
void processWatermark2(Watermark mark) throws Exception;
void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception;
void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception;
}
TwoInputStreamOperator 要处理两个输入流的数据,因此泛型定义两个输入,分别是 IN1 和 IN2。各种 process 方法也是两个。
CoStreamMap 及 CoMapFunction 两个类的源码也很简单,源码如下:
// CoMapFunction 接口定义
public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
OUT map1(IN1 value) throws Exception;
OUT map2(IN2 value) throws Exception;
}
// CoStreamMap 类定义
public class CoStreamMap<IN1, IN2, OUT>
// 继承 AbstractUdfStreamOperator,F 的泛型为 CoMapFunction
extends AbstractUdfStreamOperator<OUT, CoMapFunction<IN1, IN2, OUT>>
implements TwoInputStreamOperator<IN1, IN2, OUT> {
public CoStreamMap(CoMapFunction<IN1, IN2, OUT> mapper) {
super(mapper);
}
@Override
public void processElement1(StreamRecord<IN1> element) throws Exception {
output.collect(element.replace(
// processElement1 调用 userFunction 的 map1 方法
userFunction.map1(element.getValue())));
}
@Override
public void processElement2(StreamRecord<IN2> element) throws Exception {
output.collect(element.replace(
// processElement2 调用 userFunction 的 map2 方法
userFunction.map2(element.getValue())));
}
}
CoMapFunction 类是用户自定义的 Function 有 map1 和 map2 方法,用于定义两个输入流的业务处理逻辑。
CoStreamMap 继承 AbstractUdfStreamOperator,F 的泛型为 CoMapFunction。CoStreamMap 做为适配器实现了 TwoInputStreamOperator,并重写了 processElement1 和 processElement2 方法,处理数据时调用 CoMapFunction 的 map1 和 map2 方法。
具体处理数据时,什么时候调用 OneInputStreamOperator 和 TwoInputStreamOperator 类的 processElement 方法,后续在处理数据部分会详细介绍。
七、 总结
本文介绍了一个自定义的 Function 是如何被封装,同时用 MapFunction、FlatMapFunction 介绍了整个封装的过程,及处理数据时如何调用他们的 map 或 flatmap 方法。下半部分重点介绍了 OneInputStreamOperator 和 TwoInputStreamOperator 类的区别和联系。
网友评论