美文网首页
Flink 源码:StreamOperator 及其相关类介绍

Flink 源码:StreamOperator 及其相关类介绍

作者: longLiveData | 来源:发表于2020-05-27 08:29 被阅读0次

本文仅为笔者平日学习记录之用,侵删
原文:https://mp.weixin.qq.com/s/QVAAK7SocsGl8d3cNB0gxg

原作者声明:笔者的源码分析都是基于 flink-1.9.0 release 分支,其实阅读源码不用非常在意版本的问题,各版本的主要流程基本都是类似的。如果熟悉了某个版本的源码,之后新版本有变化,我们重点看一下变化之处即可。
笔者阅读源码中会加很多中文注释,对源码很兴趣且有需要的同学可以关注一下笔者的 github 仓库:https://github.com/1996fanrui/flink/tree/feature/source-code-read-1-9-0
注释都在 feature/source-code-read-1-9-0 分支,之后也会持续更新

本文对以下类做了介绍,希望阅读本文读者能理解这些类的功能及类之间的关系:

  • StreamOperator.class
  • AbstractUdfStreamOperator.class
  • StreamFlatMap.class
  • StreamMap.class
  • FlatMapFunction.class
  • MapFunction.class
  • OneInputStreamOperator.class
  • TwoInputStreamOperator.class
  • CoStreamMap.class
  • CoMapFunction.class

看完这篇文章,会对以下问题有更清醒的认识:
在开发一个 Flink 任务时,自定义了一些 MapFunction、FilterFunction、FlatMapFunction,它们 map 方法、filter 方法或 flatMap 方法都是怎么被调用和执行的。

一、 自定义 Function 的封装过程简介

在 Flink 中自定义的 MapFunction、FilterFunction、FlatMapFunction 等都被 Flink 认为是 UDF,都会被封装到 AbstractUdfStreamOperator 中。
例如:DataStream.map() 方法或 flatMap() 方法中会传入我们自定义的 MapFunction 或 FlatMapFunction,map 方法和 flatMap 方法会对 UDF 封装。

DataStream 类的 map 和 flatMap 方法源码如下:

public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {

 TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
   Utils.getCallLocationName(), true);

 return transform("Map", outType, 
                   // 封装 MapFunction 到 StreamMap
                   new StreamMap<>(clean(mapper)));
}


public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {

 TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
   getType(), Utils.getCallLocationName(), true);

 return transform("Flat Map", outType, 
                   // 封装 FlatMapFunction 到 StreamFlatMap
                   new StreamFlatMap<>(clean(flatMapper)));
}

重点可以关注:

  • new StreamMap<>(clean(mapper)) 封装 MapFunction 到 StreamMap 中
  • new StreamFlatMap<>(clean(flatMapper)) 封装 FlatMapFunction 到 StreamFlatMap 中

下面介绍一下 StreamFlatMap 类 UML 图,StreamMap 也是类似的。

StreamFlatMap 类 UML 图

StreamOperator 是所有 Operator 的基接口,StreamOperator派生出了两个抽象的运算符AbstractStreamOperator 和 AbstractUdfStreamOperator,所有最终的运算符向上追述都直接或间接派生自这两个运算符,而继承链中不同层次的运算符有各自的分工。

StreamFlatMap 还实现了 OneInputStreamOperator,下面会介绍 OneInputStreamOperator。

二、 AbstractUdfStreamOperator 简介

AbstractUdfStreamOperator 继承了 AbstractStreamOperator。

AbstractUdfStreamOperator 源码简版:

public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
  extends AbstractStreamOperator<OUT>
  implements OutputTypeConfigurable<OUT> {

 /** The user function. */
 protected final F userFunction;

 public AbstractUdfStreamOperator(F userFunction) {
  this.userFunction = requireNonNull(userFunction);
 }

 public F getUserFunction() {
  return userFunction;
 }
}

AbstractUdfStreamOperator 类中有个泛型 F 类型属性 userFunction,用于保存用户定义的 MapFunction、FlatMapFunction。AbstractUdfStreamOperator 的构造器可以将 userFunction 保存起来。

三、 StreamFlatMap 介绍

StreamFlatMap 源码:

public class StreamFlatMap<IN, OUT>
  // 继承 AbstractUdfStreamOperator,F 的泛型为 FlatMapFunction
  extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>
  implements OneInputStreamOperator<IN, OUT> {

 private transient TimestampedCollector<OUT> collector;

 public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
  // super 表示将 FlatMapFunction 传递给 AbstractUdfStreamOperator
  // 将 FlatMapFunction 维护在 AbstractUdfStreamOperator 的 userFunction 中
  super(flatMapper);
  chainingStrategy = ChainingStrategy.ALWAYS;
 }

 @Override
 public void open() throws Exception {
  super.open();
  collector = new TimestampedCollector<>(output);
 }

 @Override
 public void processElement(StreamRecord<IN> element) throws Exception {
  collector.setTimestamp(element);
  // 处理数据时,调用的是 userFunction 的 flatMap 方法,
  // 即:用户在 flatMap 中定义的业务逻辑
  userFunction.flatMap(element.getValue(), collector);
 }
}

StreamFlatMap 继承 AbstractUdfStreamOperator,且指定了 F 的泛型为 FlatMapFunction,即:AbstractUdfStreamOperator 中维护的 userFunction 类型为 FlatMapFunction。

StreamFlatMap 构造器中调用父类构造器,将 FlatMapFunction 传递给 AbstractUdfStreamOperator,将 FlatMapFunction 维护在 AbstractUdfStreamOperator 的 userFunction 中。

StreamFlatMap 处理数据的逻辑,即:StreamFlatMap 的 processElement 方法,调用的是 userFunction 的 flatMap 方法,即:用户在 flatMap 中定义的业务逻辑。

四、 StreamMap 介绍

StreamMap 源码:

public class StreamMap<IN, OUT>
  // 继承 AbstractUdfStreamOperator,F 的泛型为 MapFunction
  extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
  implements OneInputStreamOperator<IN, OUT> {

 private static final long serialVersionUID = 1L;

 public StreamMap(MapFunction<IN, OUT> mapper) {
  // super 表示将 MapFunction 传递给 AbstractUdfStreamOperator
  // 将 MapFunction 维护在 AbstractUdfStreamOperator 的 userFunction 中
  super(mapper);
  chainingStrategy = ChainingStrategy.ALWAYS;
 }

 @Override
 public void processElement(StreamRecord<IN> element) throws Exception {
  output.collect(element.replace(
   // 处理数据时,调用的是 userFunction 的 map 方法,
   // 即:用户在 map 中定义的业务逻辑
   userFunction.map(element.getValue())));
 }
}

StreamMap 的设计与 StreamFlatMap 及其相似:

StreamMap 继承 AbstractUdfStreamOperator,且指定了 F 的泛型为 MapFunction,即:AbstractUdfStreamOperator 中维护的 userFunction 类型为 MapFunction。

StreamMap 构造器中调用父类构造器,将 MapFunction 传递给 AbstractUdfStreamOperator,将 MapFunction 维护在 AbstractUdfStreamOperator 的 userFunction 中。

StreamMap 处理数据的逻辑,即:StreamMap 的 processElement 方法,调用的是 userFunction 的 map 方法,即:用户在 map 中定义的业务逻辑。

五、 OneInputStreamOperator 介绍

在 StreamFlatMap 的 UML 图中看到 StreamFlatMap 还实现了 OneInputStreamOperator 接口,OneInputStreamOperator 源码如下:

public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> {

 /**
  * 重点在于 processElement 处理每一条数据,然后自定义的 udf 将会重写 processElement 方法,
  * 例如 StreamFlatMap 中,就会调用 udf 的 flatMap 方法来处理数据
  */
 void processElement(StreamRecord<IN> element) throws Exception;

 void processWatermark(Watermark mark) throws Exception;

 void processLatencyMarker(LatencyMarker latencyMarker) throws Exception;
}

OneInputStreamOperator 定义了三个方法 processElement、processWatermark、processLatencyMarker,分别处理数据、处理 WaterMark、处理 LatencyMarker。

processElement 用于处理每一条数据,UDF 的包装类 StreamMap、StreamFlatMap 等都实现了 OneInputStreamOperator 接口,并重写了 processElement 方法,例如 StreamFlatMap 中就会调用 udf 的 flatMap 方法来处理数据;StreamMap 中会调用 udf 的 map 方法处理数据。

StreamFlatMap 和 StreamMap 类可以当做一个适配器,Flink 框架使用各种算子处理数据时,只需要调用 processElement 方法即可,框架并不关注当前是 MapFunction 还是 FlatMapFunction,然后 StreamFlatMap 类就会在具体的 processElement 方法中执行 FlatMapFunction 具体处理数据的逻辑。

六、 TwoInputStreamOperator 介绍

源码中与 OneInputStreamOperator 相对应的还有个 TwoInputStreamOperator,笔者刚接触这两个接口时还有有点蒙的,到底是干嘛的,为什么没有 ThreeInputStreamOperator、FourXXX、FiveXXX 等等等。刚开始笔者完全理解错了,错误理解:以为 One 表示 Job 中定义了一个 Source,Two 表示 Job 中定义了两个 Source。

上述理解完全错误的,正确理解是:OneInputStreamOperator 表示当前 Operator 有一个输入流,TwoInputStreamOperator 表示当前 Operator 有两个输入流。

什么样的 Operator 有一个输入流呢?常用的 map、flatMap、filter 等都是有一个输入流的算子。什么样的 Operator 有两个输入流呢?像 Join 相关的算子()就会有两个输入流,或者在 Flink 中 Co 开头的算子都是有两个输入流。

例如 a、b 两个流 connect 之后得到 ConnectedStreams,DataStream 类的 connect 方法源码如下:

public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream) {
    return new ConnectedStreams(this.environment, this, dataStream);
}

ConnectedStreams 类的各种算子相当于都有两个输入流,ConnectedStreams 类的部分源码如下,可以看到 ConnectedStreams 类的大部分数据处理的方法,都需要传入一个 Co 开头的算子。

public <R> SingleOutputStreamOperator<R> map(CoMapFunction<IN1, IN2, R> coMapper) {

 return transform("Co-Map", outTypeInfo, 
 new CoStreamMap<>(inputStream1.clean(coMapper)));
}


 public <R> SingleOutputStreamOperator<R> flatMap(
   CoFlatMapFunction<IN1, IN2, R> coFlatMapper) {

  return transform("Co-Flat Map", outTypeInfo, 
  new CoStreamFlatMap<>(inputStream1.clean(coFlatMapper)));
 }
  
  
public <R> SingleOutputStreamOperator<R> process(
  CoProcessFunction<IN1, IN2, R> coProcessFunction) {

 return process(coProcessFunction, outTypeInfo);
}

通过几个案例,相信大家理解了这里 One 和 Two 代表的含义。下图是笔者整理的几个重要类的 UML 图,可以看到 CoProcessFunction、IntervalJoinOperator 和 CoFlatMapFunction 实现了 TwoInputStreamOperator 接口,StreamFlatMap、 StreamMap 和 StreamFilter 类实现了 OneInputStreamOperator 接口。

重要类的 UML 图

在 AbstractStreamOperator 源码 doc 中,还有一句:

For concrete implementations, 
one of the following two interfaces must also be implemented, 
to mark the operator as unary or binary: 
{@link OneInputStreamOperator} or {@link TwoInputStreamOperator}

中文含义:对于具体的实现类,还必须实现 OneInputStreamOperator 或 TwoInputStreamOperator,用于标记算子是 一元还是二元。

TwoInputStreamOperator 源码如下:

public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {
 void processElement1(StreamRecord<IN1> element) throws Exception;

 void processElement2(StreamRecord<IN2> element) throws Exception;

 void processWatermark1(Watermark mark) throws Exception;

 void processWatermark2(Watermark mark) throws Exception;

 void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception;

 void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception;
}

TwoInputStreamOperator 要处理两个输入流的数据,因此泛型定义两个输入,分别是 IN1 和 IN2。各种 process 方法也是两个。

CoStreamMap 及 CoMapFunction 两个类的源码也很简单,源码如下:

// CoMapFunction 接口定义
public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {

 OUT map1(IN1 value) throws Exception;

 OUT map2(IN2 value) throws Exception;
}

// CoStreamMap 类定义
public class CoStreamMap<IN1, IN2, OUT>
  // 继承 AbstractUdfStreamOperator,F 的泛型为 CoMapFunction
  extends AbstractUdfStreamOperator<OUT, CoMapFunction<IN1, IN2, OUT>>
  implements TwoInputStreamOperator<IN1, IN2, OUT> {

 public CoStreamMap(CoMapFunction<IN1, IN2, OUT> mapper) {
  super(mapper);
 }

 @Override
 public void processElement1(StreamRecord<IN1> element) throws Exception {
  output.collect(element.replace(
   // processElement1 调用 userFunction 的 map1 方法
   userFunction.map1(element.getValue())));
 }

 @Override
 public void processElement2(StreamRecord<IN2> element) throws Exception {
  output.collect(element.replace(
   // processElement2 调用 userFunction 的 map2 方法
   userFunction.map2(element.getValue())));
 }
}

CoMapFunction 类是用户自定义的 Function 有 map1 和 map2 方法,用于定义两个输入流的业务处理逻辑。

CoStreamMap 继承 AbstractUdfStreamOperator,F 的泛型为 CoMapFunction。CoStreamMap 做为适配器实现了 TwoInputStreamOperator,并重写了 processElement1 和 processElement2 方法,处理数据时调用 CoMapFunction 的 map1 和 map2 方法。

具体处理数据时,什么时候调用 OneInputStreamOperator 和 TwoInputStreamOperator 类的 processElement 方法,后续在处理数据部分会详细介绍。

七、 总结

本文介绍了一个自定义的 Function 是如何被封装,同时用 MapFunction、FlatMapFunction 介绍了整个封装的过程,及处理数据时如何调用他们的 map 或 flatmap 方法。下半部分重点介绍了 OneInputStreamOperator 和 TwoInputStreamOperator 类的区别和联系。

相关文章

网友评论

      本文标题:Flink 源码:StreamOperator 及其相关类介绍

      本文链接:https://www.haomeiwen.com/subject/ygyiahtx.html