[TOC]
第一部分讲到,我们的主函数最后一项任务就是生成StreamGraph,然后生成JobGraph,然 后以此开始调度任务运行,所以接下来我们从这里入手,继续探索flink。
2.1 flink的三层图结构
事实上,flink总共提供了三种图的抽象,我们前面已经提到了StreamGraph和JobGraph,还 有一种是ExecutionGraph,是用于调度的基本数据结构。
![](https://img.haomeiwen.com/i11345047/3f6aa57b63fba061.png)
![](https://img.haomeiwen.com/i11345047/17f2f7e52078617a.png)
上面这张图清晰的给出了flink各个图的工作原理和转换过程。其中最后一个物理执行图并非 flink的数据结构,而是程序开始执行后,各个task分布在不同的节点上,所形成的物理上的关 系表示。
- 从JobGraph的图里可以看到,数据从上一个operator流到下一个operator的过程中,上 游作为生产者提供了IntermediateDataSet,而下游作为消费者需要JobEdge。事实 上,JobEdge是一个通信管道,连接了上游生产的dataset和下游的JobVertex节点。
- 在JobGraph转换到ExecutionGraph的过程中,主要发生了以下转变:
- 加入了并行度的概念,成为真正可调度的图结构
- 生成了与JobVertex对应的ExecutionJobVertex,ExecutionVertex,与 IntermediateDataSet对应的IntermediateResult和IntermediateResultPartition等, 并行将通过这些类实现
- ExecutionGraph已经可以用于调度任务。我们可以看到,flink根据该图生成了一一对应的 Task,每个task对应一个ExecutionGraph的一个Execution。Task用InputGate、 InputChannel和ResultPartition对应了上面图中的IntermediateResult和 ExecutionEdge。
那么,flink抽象出这三层图结构,四层执行逻辑的意义是什么呢? StreamGraph是对用户逻辑的映射。JobGraph在此基础上进行了一些优化,比如把一部分操 作串成chain以提高效率。ExecutionGraph是为了调度存在的,加入了并行处理的概念。而在 此基础上真正执行的是Task及其相关结构。
2.2 StreamGraph的生成
在第一节的算子注册部分,我们可以看到,flink把每一个算子transform成一个对流的转换 (比如上文中返回的SingleOutputStreamOperator是一个DataStream的子类),并且注册 到执行环境中,用于生成StreamGraph。实际生成StreamGraph的入口是 StreamGraphGenerator.generate(env, transformations) 其中的transformations是一 个list,里面记录的就是我们在transform方法中放进来的算子。
2.2.1 StreamTransformation类代表了流的转换
StreamTransformation代表了从一个或多个DataStream生成新DataStream的操作。顺便,DataStream类在内部组合了一个StreamTransformation类,实际的转换操作均通过该 类完成。
我们可以看到,从source到各种map,union再到sink操作全部被映射成了 StreamTransformation。
![](https://img.haomeiwen.com/i11345047/121842ca1edf7169.png)
以MapFunction为例:
- 首先,用户代码里定义的UDF会被当作其基类对待,然后交给StreamMap这个operator 做进一步包装。事实上,每一个Transformation都对应了一个StreamOperator。
- 由于map这个操作只接受一个输入,所以再被进一步包装为OneInputTransformation。
- 最后,将该transformation注册到执行环境中,当执行上文提到的generate方法时,生成 StreamGraph图结构。
另外,并不是每一个 StreamTransformation 都会转换成runtime层中的物理操作。 有一些只是逻辑概念,比如union、split/select、partition等。如下图所示的转换 树,在运行时会优化成下方的操作图。
![](https://img.haomeiwen.com/i11345047/cc9602f45aceee24.png)
2.2.2 StreamGraph生成函数分析
2.2.2 StreamGraph生成函数分析
![](https://img.haomeiwen.com/i11345047/ab80a87298df96ce.png)
[图片上传失败...(image-8e8652-1609649040159)]
因为map,filter等常用操作都是OneInputStreamOperator,我们就来看看transformOneInputTransform((OneInputTransformation<?, ?>) transform) 方法。
![](https://img.haomeiwen.com/i11345047/3c82509e068b19ff.png)
2.2.3 WordCount函数的StreamGraph
flink提供了一个StreamGraph可视化显示工具,在这里 我们可以把我们的程序的执行计划打印出 来System.out.println(env.getExecutionPlan()); 复制到这个网站上,点击生成,如图 所示:
![](https://img.haomeiwen.com/i11345047/7fe0ef0bfb934229.png)
可以看到,我们源程序被转化成了4个operator。 另外,在operator之间的连线上也显示出了flink添加的一些逻辑流程。由于我设定了每个操作 符的并行度都是1,所以在每个操作符之间都是直接FORWARD,不存在shuffle的过程。
2.3 JobGraph的生成
flink会根据上一步生成的StreamGraph生成JobGraph,然后将JobGraph发送到server端进 行ExecutionGraph的解析
2.3.1 JobGraph生成源码
与StreamGraph类似,JobGraph的入口方法
是 StreamingJobGraphGenerator.createJobGraph() 。我们直接来看源码
![](https://img.haomeiwen.com/i11345047/3f378fcfa8dfa896.png)
2.3.2 operator chain的逻辑
为了更高效地分布式执行,Flink会尽可能地将operator的subtask链接(chain)在一起 形成task。每个task在一个线程中执行。将operators链接成task是非常有效的优化:它 能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少 了延迟的同时提高整体的吞吐量。
![](https://img.haomeiwen.com/i11345047/951e123317e296ad.png)
上图中将KeyAggregation和Sink两个operator进行了合并,因为这两个合并后并不会改变整 体的拓扑结构。但是,并不是任意两个 operator 就能 chain 一起的,其条件还是很苛刻的:
- 上下游的并行度一致
- 下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入) 上下游节点都在同一个 slot group 中(下面会解释 slot group)
- 下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等 默认是ALWAYS)
- 上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链 接,Source默认是HEAD)
- 两个节点间数据分区方式是 forward(参考理解数据流的分区)
- 用户没有禁用 chain
flink的chain逻辑是一种很常见的设计,比如spring的interceptor也是类似的实现方式。通过 把操作符串成一个大操作符,flink避免了把数据序列化后通过网络发送给其他节点的开销,能 够大大增强效率。
2.3.3 JobGraph的提交
前面已经提到,JobGraph的提交依赖于JobClient和JobManager之间的异步通信,如图所 示:
![](https://img.haomeiwen.com/i11345047/bf34a50d1c25f09b.png)
在submitJobAndWait方法中,其首先会创建一个JobClientActor的ActorRef,然后向其发起 一个SubmitJobAndWait消息,该消息将JobGraph的实例提交给JobClientActor。发起模式是ask,它表示需要一个应答消息。
Future<Object> future = Patterns.ask(jobClientActor, new JobClientMessa ges.SubmitJobAndWait(jobGraph), new Timeout(AkkaUtils.INF_TIMEOUT()));
answer = Await.result(future, AkkaUtils.INF_TIMEOUT());
该SubmitJobAndWait消息被JobClientActor接收后,最终通过调用tryToSubmitJob方法触 发真正的提交动作。当JobManager的actor接收到来自client端的请求后,会执行一个 submitJob方法,主要做以下事情:
向BlobLibraryCacheManager注册该Job;
构建ExecutionGraph对象;
对JobGraph中的每个顶点进行初始化; 将DAG拓扑中从source开始排序,排序后的顶点集合附加到Exec> - utionGraph对象;
获取检查点相关的配置,并将其设置到ExecutionGraph对象; 向ExecutionGraph注册相关的listener; 执行恢复操作或者将JobGraph信息写入SubmittedJobGraphStore以在后续用于恢 复目的;
响应给客户端JobSubmitSuccess消息;
对ExecutionGraph对象进行调度执行;
最后,JobManger会返回消息给JobClient,通知该任务是否提交成功。
2.4 ExecutionGraph的生成
与StreamGraph和JobGraph不同,ExecutionGraph并不是在我们的客户端程序生成,而是 在服务端(JobManager处)生成的,顺便flink只维护一个JobManager。其入口代码
是 ExecutionGraphBuilder.buildGraph(...)
该方法长200多行,其中一大半是checkpoiont的相关逻辑,我们暂且略过,直接看核心方
法 executionGraph.attachJobGraph(sortedTopology)
因为ExecutionGraph事实上只是改动了JobGraph的每个节点,而没有对整个拓扑结构进行变 动,所以代码里只是挨个遍历jobVertex并进行处理:
![](https://img.haomeiwen.com/i11345047/4e140f9037d19953.png)
至此,ExecutorGraph就创建完成了。
3. 任务的调度与执行
关于flink的任务执行架构,官网的这两张图就是最好的说明:
![](https://img.haomeiwen.com/i11345047/20adb232d6c4239e.png)
Flink 集群启动后,首先会启动一个 JobManger 和多个的 TaskManager。用户的代码会由 JobClient 提交给 JobManager,JobManager 再把来自不同用户的任务发给 不同的 TaskManager 去执行,每个TaskManager管理着多个task,task是执行计算的最小结构, TaskManager 将心跳和统计信息汇报给 JobManager。TaskManager 之间以流的形式进行 数据的传输。上述除了task外的三者均为独立的 JVM 进程。 要注意的是,TaskManager和job并非一一对应的关系。flink调度的最小单元是task而非 TaskManager,也就是说,来自不同job的不同task可能运行于同一个TaskManager的不同 线程上。
![](https://img.haomeiwen.com/i11345047/00faf7fb1f2fcc4b.png)
一个flink任务所有可能的状态如上图所示。图上画的很明白,就不再赘述了。
3.1 计算资源的调度
Task slot是一个TaskManager内资源分配的最小载体,代表了一个固定大小的资源子集,每 个TaskManager会将其所占有的资源平分给它的slot。
通过调整 task slot 的数量,用户可以定义task之间是如何相互隔离的。每个 TaskManager 有一个slot,也就意味着每个task运行在独立的 JVM 中。每个 TaskManager 有多个slot的 话,也就是说多个task运行在同一个JVM中。
而在同一个JVM进程中的task,可以共享TCP连接(基于多路复用)和心跳消息,可以减少数 据的网络传输,也能共享一些数据结构,一定程度上减少了每个task的消耗。
每个slot可以接受单个task,也可以接受多个连续task组成的pipeline,如下图所示,FlatMap函数占用一个taskslot,而key Agg函数和sink函数共用一个taskslot:
![](https://img.haomeiwen.com/i11345047/11efec469bfafe22.png)
为了达到共用slot的目的,除了可以以chain的方式pipeline算子,我们还可以允许 SlotSharingGroup,如下图所示:
![](https://img.haomeiwen.com/i11345047/2c1f3f5f56a5d7a5.png)
我们可以把不能被chain成一条的两个操作如flatmap和key&sink放在一个TaskSlot里执行, 这样做可以获得以下好处:
- 共用slot使得我们不再需要计算每个任务需要的总task数目,直接取最高算子的并行度即可
- 对计算资源的利用率更高。例如,通常的轻量级操作map和重量级操作Aggregate不再分 别需要一个线程,而是可以在同一个线程内执行,而且对于slot有限的场景,我们可以增 大每个task的并行度了。
接下来我们还是用官网的图来说明flink是如何重用slot的:
![](https://img.haomeiwen.com/i11345047/18ae589080a940fe.png)
- TaskManager1分配一个SharedSlot0
- 把source task放入一个SimpleSlot0,再把该slot放入SharedSlot0
- 把flatmap task放入一个SimpleSlot1,再把该slot放入SharedSlot0
- 因为我们的flatmap task并行度是2,因此不能再放入SharedSlot0,所以向
TaskMange21申请了一个新的SharedSlot0 - 把第二个flatmap task放进一个新的SimpleSlot,并放进TaskManager2的
SharedSlot0 - 开始处理key&sink task,因为其并行度也是2,所以先把第一个task放进
TaskManager1的SharedSlot - 把第二个key&sink放进TaskManager2的SharedSlot
3.2 JobManager执行job
JobManager负责接收 flink 的作业,调度 task,收集 job 的状态、管理 TaskManagers。被 实现为一个 akka actor。
3.2.1 JobManager的组件
- BlobServer 是一个用来管理二进制大文件的服务,比如保存用户上传的jar文件,该服务会 将其写到磁盘上。还有一些相关的类,如BlobCache,用于TaskManager向JobManager 下载用户的jar文件
- InstanceManager 用来管理当前存活的TaskManager的组件,记录了TaskManager的心 跳信息等
- CompletedCheckpointStore 用于保存已完成的checkpoint相关信息,持久化到内存中 或者zookeeper上
- MemoryArchivist 保存了已经提交到flink的作业的相关信息,如JobGraph等
3.2.2 JobManager的启动过程
![](https://img.haomeiwen.com/i11345047/4928b054ce5db848.png)
- 配置Akka并生成ActorSystem,启动JobManager
- 启动HA和metric相关服务
- 在 startJobManagerActors() 方法中启动JobManagerActors,以及 webserver,TaskManagerActor,ResourceManager等等
- 阻塞等待终止
- 集群通过LeaderService等选出JobManager的leader
3.2.3 JobManager启动Task
JobManager 是一个Actor,通过各种消息来完成核心逻辑:
override def handleMessage: Receive = {
case GrantLeadership(newLeaderSessionID) =>
log.info(s"JobManager $getAddress was granted leadership with leader session ID " +
s"$newLeaderSessionID.") leaderSessionID = newLeaderSessionID
.......
有几个比较重要的消息:
- GrantLeadership 获得leader授权,将自身被分发到的 session id 写到 zookeeper,并 恢复所有的 jobs
- RevokeLeadership 剥夺leader授权,打断清空所有的 job 信息,但是保留作业缓存,注 销所有的 TaskManagers
- RegisterTaskManagers 注册 TaskManager,如果之前已经注册过,则只给对应的 Instance 发送消息,否则启动注册逻辑:在 InstanceManager 中注册该 Instance 的信 息,并停止 Instance BlobLibraryCacheManager 的端口【供下载 lib 包用】,同时使用 watch 监听 task manager 的存活
- SubmitJob 提交 jobGraph 最后一项SubmintJob就是我们要关注的,从客户端收到JobGraph,转换为 ExecutionGraph并执行的过程。
首先做一些准备工作,然后获取一个ExecutionGraph,判断是否是恢复的job,然后将job保 存下来,并且通知客户端本地已经提交成功了,最后如果确认本JobManager是leader,则执 行 executionGraph.scheduleForExecution() 方法,这个方法经过一系列调用,把每个 ExecutionVertex传递给了Excution类的deploy方法
我们首先生成了一个TaskDeploymentDescriptor,然后交给
了 taskManagerGateway.submitTask() 方法执行。接下来的部分,就属于TaskManager的范 畴了。
3.3 TaskManager执行task
3.3.1 TaskManager的基本组件
TaskManager是flink中资源管理的基本组件,是所有执行任务的基本容器,提供了内存管 理、IO管理、通信管理等一系列功能,本节对各个模块进行简要介绍。
-
MemoryManager flink并没有把所有内存的管理都委托给JVM,因为JVM普遍存在着存储 对象密度低、大内存时GC对系统影响大等问题。所以flink自己抽象了一套内存管理机制,将 所有对象序列化后放在自己的MemorySegment上进行管理。MemoryManger涉及内容较 多,将在后续章节进行继续剖析。
-
IOManager flink通过IOManager管理磁盘IO的过程,提供了同步和异步两种写模式,又 进一步区分了block、buffer和bulk三种读写方式。 IOManager提供了两种方式枚举磁盘文件,一种是直接遍历文件夹下所有文件,另一种是计 数器方式,对每个文件名以递增顺序访问。 在底层,flink将文件IO抽象为FileIOChannle,封装了底层实现。
-
NetworkEnvironment 是TaskManager的网络 IO 组件,包含了追踪中间结果和数据交换 的数据结构。它的构造器会统一将配置的内存先分配出来,抽象成 NetworkBufferPool 统一 管理内存的申请和释放。意思是说,在输入和输出数据时,不管是保留在本地内存,等待 chain在一起的下个操作符进行处理,还是通过网络把本操作符的计算结果发送出去,都被抽 象成了NetworkBufferPool。后续我们还将对这个组件进行详细分析。
3.3.2 TaskManager执行Task
对于TM来说,执行task就是把收到的 TaskDeploymentDescriptor 对象转换成一个task并执
行的过程。TaskDeploymentDescriptor这个类保存了task执行所必须的所有内容,例如序列 化的算子,输入的InputGate和输出的ResultPartition的定义,该task要作为几个subtask执 行等等。
按照正常逻辑思维,很容易想到TM的submitTask方法的行为:首先是确认资源,如寻找 JobManager和Blob,而后建立连接,解序列化算子,收集task相关信息,接下来就是创建一个新的 Task 对象,这个task对象就是真正执行任务的关键所在。
![](https://img.haomeiwen.com/i11345047/d4c8774111f4a194.png)
如果读者是从头开始看这篇blog,里面有很多对象应该已经比较明确其作用了(除了那个 brVarManager,这个是管理广播变量的,广播变量是一类会被分发到每个任务中的共享变 量)。接下来的主要任务,就是把这个task启动起来,然后报告说已经启动task了:
![](https://img.haomeiwen.com/i11345047/99213359f40e5bef.png)
3.3.2.1 生成Task对象
在执行new Task()方法时,第一步是把构造函数里的这些变量赋值给当前task的fields。
接下来是初始化ResultPartition和InputGate。这两个类描述了task的输出数据和输入数据。
![](https://img.haomeiwen.com/i11345047/285cea294a9a5305.png)
最后,创建一个Thread对象,并把自己放进该对象,这样在执行时,自己就有了自身的线程 的引用。
3.3.2.2 运行Task对象
Task对象本身就是一个Runable,因此在其run方法里定义了运行逻辑。 第一步是切换Task的状态:
![](https://img.haomeiwen.com/i11345047/d55c0c8c3ae00629.png)
其实这里有个值得关注的点,就是flink里大量使用了这种while(true)的写法来修改和检测状 态,emmm...
接下来,就是导入用户类加载器并加载用户代码。 然后,是向网络管理器注册当前任务(flink的各个算子在运行时进行数据交换需要依赖网络管 理器),分配一些缓存以保存数据
然后,读入指定的缓存文件。 然后,再把task创建时传入的那一大堆变量用于创建一个执行环境Envrionment。 再然后,对于那些并不是第一次执行的task(比如失败后重启的)要恢复其状态。 接下来最重要的是
1. invokable.invoke();
方法。为什么这么说呢,因为这个方法就是用户代码所真正被执行的入口。比如我们写的什么 new MapFunction()的逻辑,最终就是在这里被执行的。这里说一下这个invokable,这是一 个抽象类,提供了可以被TaskManager执行的对象的基本抽象。 这个invokable是在解析JobGraph的时候生成相关信息的,并在此处形成真正可执行的对象
// now load the task's invokable code
2. //通过反射生成对象
3. invokable = loadAndInstantiateInvokable(userCodeClassLoader,
nameOfInvokableClass);
上图显示了flink提供的可被执行的Task类型。从名字上就可以看出各个task的作用,在此不再 赘述。
接下来就是invoke方法了,因为我们的wordcount例子用了流式api,在此我们以StreamTask 的invoke方法为例进行说明。
3.3.2.3 StreamTask的执行逻辑
![](https://img.haomeiwen.com/i11345047/ac59857eff7a42c9.png)
![](https://img.haomeiwen.com/i11345047/9ec0bf0381a8273f.png)
StreamTask.invoke()方法里,第一个值得一说的是 TimerService 。Flink在2015年决定向 StreamTask类加入timer service的时候解释到:
第二个要注意的是chain操作。前面提到了,flink会出于优化的角度,把一些算子chain成一个 整体的算子作为一个task来执行。比如wordcount例子中,Source和FlatMap算子就被chain 在了一起。在进行chain操作的时候,会设定头节点,并且指定输出的RecordWriter。
接下来不出所料仍然是初始化,只不过初始化的对象变成了各个operator。如果是有 checkpoint的,那就从state信息里恢复,不然就作为全新的算子处理。从源码中可以看 到,flink针对keyed算子和普通算子做了不同的处理。keyed算子在初始化时需要计算出一个 group区间,这个区间的值在整个生命周期里都不会再变化,后面key就会根据hash的不同结 果,分配到特定的group中去计算。顺便提一句,flink的keyed算子保存的是对每个数据的 key的计算方法,而非真实的key,用户需要自己保证对每一行数据提供的keySelector的幂等 性。至于为什么要用KeyGroup的设计,这就牵扯到扩容的范畴了,将在后面的章节进行讲 述。
对于 openAllOperators() 方法,就是对各种RichOperator执行其open方法,通常可用于在 执行计算之前加载资源。 最后,run方法千呼万唤始出来,该方法经过一系列跳转,最终调用chain上的第一个算子的 run方法。在wordcount的例子中,它最终调用了SocketTextStreamFunction的run,建立 socket连接并读入文本。
3.4 StreamTask与StreamOperator
前面提到,Task对象在执行过程中,把执行的任务交给了StreamTask这个类去执行。在我们 的wordcount例子中,实际初始化的是OneInputStreamTask的对象(参考上面的类图)。那 么这个对象是如何执行用户的代码的呢?
![](https://img.haomeiwen.com/i11345047/5b8a26dd878afc29.png)
它做的,就是把任务直接交给了InputProcessor去执行processInput方法。这是一个 StreamInputProcessor 的实例,该processor的任务就是处理输入的数据,包括用户数 据、watermark和checkpoint数据等。我们先来看看这个processor是如何产生的:
![](https://img.haomeiwen.com/i11345047/9596556190fcac08.png)
这是OneInputStreamTask的init方法,从configs里面获取StreamOperator信息,生成自己 的inputProcessor。那么inputProcessor是如何处理数据的呢?我们接着跟进源码:
![](https://img.haomeiwen.com/i11345047/7cd1821cc9c309ec.png)
![](https://img.haomeiwen.com/i11345047/68f5bf4d3a6c3d94.png)
![](https://img.haomeiwen.com/i11345047/d5047e26f65bc6c3.png)
到此为止,以上部分就是一个flink程序启动后,到执行用户代码之前,flink框架所做的准备工 作。回顾一下:
- 启动一个环境
- 生成StreamGraph
- 注册和选举JobManager
- 在各节点生成TaskManager,并根据JobGraph生成对应的Task
- 启动各个task,准备执行代码
接下来,我们挑几个Operator看看flink是如何抽象这些算子的。
网友评论