本文为《Flink大数据项目实战》学习笔记,想通过视频系统学习Flink这个最火爆的大数据计算框架的同学,推荐学习课程:
Flink大数据项目实战:http://t.cn/EJtKhaz
1. Flink运行时架构
1.1Flink架构
Flink 运行时架构主要包含几个部分:Client、JobManager(master节点)和TaskManger(slave节点)。
Client:Flink 作业在哪台机器上面提交,那么当前机器称之为Client。用户开发的Program 代码,它会构建出DataFlow graph,然后通过Client提交给JobManager。
JobManager:是主(master)节点,相当于YARN里面的REsourceManager,生成环境中一般可以做HA高可用。JobManager会将任务进行拆分,调度到TaskManager上面执行。
TaskManager:是从节点(slave),TaskManager才是真正实现task的部分。
Client提交作业到JobManager,就需要跟JobManager进行通信,它使用Akka框架或者库进行通信,另外Client与JobManager进行数据交互,使用的是Netty框架。Akka通信基于Actor
System,Client可以向JobManager发送指令,比如Submit job或者Cancel /update job。JobManager也可以反馈信息给Client,比如status updates,Statistics和results。
Client提交给JobManager的是一个Job,然后JobManager将Job拆分成task,提交给TaskManager(worker)。JobManager与TaskManager也是基于Akka进行通信,JobManager发送指令,比如Deploy/Stop/Cancel
Tasks或者触发Checkpoint,反过来TaskManager也会跟JobManager通信返回Task Status,Heartbeat(心跳),Statistics等。另外TaskManager之间的数据通过网络进行传输,比如Data Stream做一些算子的操作,数据往往需要在TaskManager之间做数据传输。
1.2. TaskManger Slot
TaskManager是进程,他下面运行的task(整个Flink应用是Job,Job可以拆分成很多个task)是线程,每个task/subtask(线程)下可运行一个或者多个operator,即OperatorChain。Task是class,抽象的,subtask是Object(类比学习),具体的。
一个TaskManager通过Slot(任务槽)来控制它上面可以接受多少个task,比如一个TaskManager划分了3个Task Slot(仅限内存托管,目前CPU未做隔离),它只能接受3个task。Slot均分TaskManager所托管的内存,比如一个TaskManager有6G内存,那么每个Slot分配2G。
同一个TaskManager中的task共享TCP连接(通过多路复用)和心跳消息。它们还可以共享数据集和数据结构,从而减少每个任务的开销。一个TaskManager有N个槽位只能接受N个Task吗?不是,后面会讲共享槽位。
1.3. OperatorChain && Task
为了更高效地分布式执行,Flink会尽可能地将operator的subtask链接(chain)在一起形成task。以wordcount为例,解析不同视图下的数据流,如下图所示。
数据流(逻辑视图)
创建Source(并行度设置为1)读取数据源,数据经过FlatMap(并行度设置为2)做转换操作,然后数据经过Key Agg(并行度设置为2)做聚合操作,最后数据经过Sink(并行度设置为2)将数据输出。
数据流(并行化视图)
并行度为1的Source读取数据源,然后FlatMap并行度为2读取数据源进行转化操作,然后数据经过Shuffle交给并行度为2的Key Agg进行聚合操作,然后并行度为2的Sink将数据输出,未优化前的task总和为7。
数据流(优化后视图)
并行度为1的Source读取数据源,然后FlatMap并行度为2读取数据源进行转化操作,然后数据经过Shuffle交给Key Agg进行聚合操作,此时Key Agg和Sink操作合并为一个task(注意:将KeyAgg和Sink两个operator进行了合并,因为这两个合并后并不会改变整体的拓扑结构),它们一起的并行度为2,数据经过Key Agg和Sink之后将数据输出,优化后的task总和为5.
1.4. OperatorChain的优点和组成条件
OperatorChain的优点
1.减少线程切换
2.减少序列化与反序列化
3.减少数据在缓冲区的交换
4.减少延迟并且提高吞吐能力
OperatorChain 组成条件
1.没有禁用Chain
2.上下游算子并行度一致。
3.下游算子的入度为1(也就是说下游节点没有来自其他节点的输入)。
4.上下游算子在同一个slot group(后面紧跟着就会讲如何通过slot
group先分配到同一个solt,然后才能chain) 。
5.下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)。
6.上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)。
7.上下游算子之间没有数据shuffle (数据分区方式是forward)。
1.5. 编程改变OperatorChain行为
Operator chain的行为可以通过编程API中进行指定,可以通过在DataStream的operator后面(如someStream.map(..))调用startNewChain()来指示从该operator开始一个新的chain(与前面截断,不会被chain到前面)。可以调用disableChaining()来指示该operator不参与chaining(不会与前后的operator
chain一起)。可以通过调用StreamExecutionEnvironment.disableOperatorChaining()来全局禁用chaining。可以设置Slot
group,例如someStream.filter(...).slotSharingGroup(“name”)。可以通过调整并行度,来调整Operator chain。
2. Slot分配与共享
2.1共享Slot
默认情况下,Flink 允许subtasks共享slot,条件是它们都来自同一个Job的不同task的subtask。结果可能一个slot持有该job的整个pipeline。
允许slot共享有以下两点好处:
1.Flink集群需要的任务槽与作业中使用的最高并行度正好相同(前提,保持默认SlotSharingGroup)。也就是说我们不需要再去计算一个程序总共会起多少个task了。
2.更容易获得更充分的资源利用。如果没有slot共享,那么非密集型操作source/flatmap就会占用同密集型操作keyAggregation/sink 一样多的资源。如果有slot共享,将task的2个并行度增加到6个,能充分利用slot资源,同时保证每个TaskManager能平均分配到重的subtasks。
2.2共享Slot实例
将 WordCount 的并行度从之前的2个增加到6个(Source并行度仍为1),并开启slot共享(所有operator都在default共享组),将得到如上图所示的slot分布图。
首先,我们不用去计算这个job会其多少个task,总之该任务最终会占用6个slots(最高并行度为6)。其次,我们可以看到密集型操作 keyAggregation/sink 被平均地分配到各个 TaskManager。
2.3 SlotSharingGroup(soft)
SlotSharingGroup是Flink中用来实现slot共享的类,它尽可能地让subtasks共享一个slot。
保证同一个group的并行度相同的sub-tasks 共享同一个slots。算子的默认group为default(即默认一个job下的subtask都可以共享一个slot)
为了防止不合理的共享,用户也能通过API来强制指定operator的共享组,比如:someStream.filter(...).slotSharingGroup("group1");就强制指定了filter的slot共享组为group1。怎么确定一个未做SlotSharingGroup设置算子的SlotSharingGroup什么呢(根据上游算子的group 和自身是否设置group共同确定)。适当设置可以减少每个slot运行的线程数,从而整体上减少机器的负载。
2.4 CoLocationGroup(强制)
CoLocationGroup可以保证所有的并行度相同的sub-tasks运行在同一个slot,主要用于迭代流(训练机器学习模型)。
3. Slot & parallelism的关系
3.1 Slots && parallelism
如上图所示,有两个TaskManager,每个TaskManager有3个槽位。假设source操作并行度为3,map操作的并行度为4,sink的并行度为4,所需的task slots数与job中task的最高并行度一致,最高并行度为4,那么使用的Slot也为4。
3.2如何计算Slot
如何计算一个应用需要多少slot?
如果不设置SlotSharingGroup,那么需要的Slot数为应用的最大并行度数。如果设置了SlotSharingGroup,那么需要的Slot数为所有SlotSharingGroup中的最大并行度之和。比如已经强制指定了map的slot共享组为test,那么map和map下游的组为test,map的上游source的组为默认的default,此时default组中最大并行度为10,test组中最大并行度为20,那么需要的Slot=10+20=30。
4.Flink部署模式
4.1 Local 本地部署
Flink 可以运行在 Linux、Mac OS X 和 Windows 上。本地模式的安装唯一需要的只是 Java 1.7.x或更高版本,本地运行会启动Single JVM,主要用于测试调试代码。
4.2 Standalone Cluster集群部署
软件需求
1.安装Java1.8或者更高版本
2.集群各个节点需要ssh免密登录
Flink Standalone 运行流程前面已经讲过,这里就不在赘叙。
4.3Flink ON
Flink ON YARN工作流程如下所示:
首先提交job给YARN,就需要有一个Flink YARN Client。
第一步:Client将Flink 应用jar包和配置文件上传到HDFS。
第二步:Client向REsourceManager注册resources和请求APPMaster Container
第三步:REsourceManager就会给某一个Worker节点分配一个Container来启动APPMaster,JobManager会在APPMaster中启动。
第四步:APPMaster为Flink的TaskManagers分配容器并启动TaskManager,TaskManager内部会划分很多个Slot,它会自动从HDFS下载jar文件和修改后的配置,然后运行相应的task。TaskManager也会与APPMaster中的JobManager进行交互,维持心跳等。
5.Flink Standalone集群部署
安装Flink之前需要提前安装好JDK,这里我们安装的是JDK1.8版本。
5.1下载
可以到官网:https://archive.apache.org/dist/flink/将Flink1.6.2版本下载到本地。
5.2解压
将下载的flink-1.6.2-bin-hadoop26-scala_2.11.tgz上传至主节点
使用tar -zxvf flink-1.6.2-bin-hadoop26-scala_2.11.tgz命令解压flink安装包
方便后期flink多版本的使用,可以创建flink软连接
ln -s flink-1.6.2 flink
5.3配置环境变量
vi ~/.bashrc
export FLINK_HOME=/home/hadoop/app/flink
export PATH=$FLINK_HOME/bin:$PATH
使配置文件生效
source ~/.bashrc
查看flink版本
flink -v
5.4修改配置文件
1.修改flink-conf.yaml配置文件
vi flink-conf.yaml
#JobManager地址
jobmanager.rpc.address: cdh01
#槽位配置为3
taskmanager.numberOfTaskSlots: 3
#设置并行度为3
parallelism.default: 3
2.修改masters配置
vi masters
cdh01:8081
3.修改slaves配置
vi slaves
cdh01
cdh02
cdh03
5.5主节点安装目录同步到从节点
通过deploy.sh脚本将flink安装目录同步到其他节点。
deploy.sh flink-1.6.2 /home/hadoop/app/slave
在从节点分别创建flink软连接
ln -s flink-1.6.2 flink
5.6启动服务
进入flink bin目录执行启动集群脚本start-cluster.sh
bin/start-cluster.sh
通过web查看flink集群,查看相关集群信息。
5.7测试运行
查看官网案例:https://ci.apache.org/projects/flink/flink-docs-release-1.6/
1.启动nc服务
nc -l 9000
2.提交flink作业
bin/flink runexamples/streaming/SocketWindowWordCount.jar --port 9000
3.输入测试数据
4.查看运行结果
在TaskManager界面查看Flink运行结果
网友评论