TaskManager与Slots
Flink中每一个worker(TaskManager)都是一个JVM进程,它可能会在独立的线程上执行一个Task。为了控制一个worker能接收多少个task,worker通过
Task Slot
来进行控制(一个worker至少有一个Task Slot)。
这里的Slot如何来理解呢?很多的文章中经常会和Spark框架进行类比,将Slot类比为Core,其实简单这么类比是可以的,可实际上,可以考虑下,当Spark申请资源后,这个Core执行任务时有可能是空闲的,但是这个时候Spark并不能将这个空闲下来的Core共享给其他Job使用,所以这里的Core是Job内部共享使用的。接下来我们再回想一下,之前在Yarn Session-Cluster模式时,其实是可以并行执行多个Job的,那如果申请两个Slot,而执行Job时,只用到了一个,剩下的一个怎么办?那我们自认而然就会想到可以将这个Slot给并行的其他Job
,对吗?所以Flink中的Slot和Spark中的Core还是有很大区别的。
每个task slot表示TaskManager拥有资源的一个固定大小的子集。假如一个TaskManager有三个slot,那么它会将其管理的内存分成三份给各个slot。资源slot化意味着一个task将不需要跟来自其他job的task竞争被管理的内存,取而代之的是它将拥有一定数量的内存储备。需要注意的是,这里不会涉及到CPU的隔离
,slot目前仅仅用来隔离task的受管理的内存。
- 每个TaskManager提供的任务插槽数。
# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
taskmanager.numberOfTaskSlots: 1
- 总结
- 一个worker至少有个一个task slot
- slot之间可以共享资源
- slot之间是内存隔离不是cpu隔离
4.2.2Parallelism(并行度)
并行度一个特定算子的
子任务(subtask)的个数
被称之为这个算子的并行度(parallelism),一般情况下,一个流程序的并行度,可以认为就是其所有算子中最大的并行度。一个程序中,不同的算子可能具有不同的并行度。
Stream在算子之间传输数据的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式,具体是哪一种形式,取决于算子的种类。
-
并行度
是一个动态的概念,是指在运行中的。比如 有五台游戏机,但是只有两台有人再玩,那么并行度是五还是二?答案是二,因为工作的只有两台。而五表示处理能力,表示最多能支持五个人同时一起玩,是一个静态概念。 -
flink默认并行度 1
# The parallelism used for programs that did not specify and other parallelism.
parallelism.default: 1
-
One-to-one:
stream(比如在source和map operator之间)维护着分区以及元素的顺序。那意味着flatmap 算子的子任务看到的元素的个数以及顺序跟source 算子的子任务生产的元素的个数、顺序相同,map、fliter、flatMap等算子都是one-to-one的对应关系。类似于spark中的窄依赖
-
Redistributing:
stream(map()跟keyBy/window之间或者keyBy/window跟sink之间)的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如,keyBy()基于hashCode重分区、broadcast和rebalance会随机重新分区,这些算子都会引起redistribute过程,而redistribute过程就类似于Spark中的shuffle过程。类似于spark中的宽依赖
并行度:
-
A,B,C三个算子,并行度为3,需要几个slot?
至少需要3三个slot -
slot 只有2个,会怎么样?
将无法运行,直接报错。
优先级:
算子指定 > env全局指定 > 提交参数 > 配文件
同一个算子的并行实例,不能在同一个slot里,
slot数量必须大于等于最大并行度,才可以正常运行,总结slot的数量是由最大并行度所决定的。
并行对与subtask的关系:
共享组
在默认情况下,JobManager 会将志同道合
算子划分到一起,他们都属于在一个组内(default
)
如上图:socket 算子和 FlatMap 分在一个组内。若想把它们拆开可以自定义共享组,每个算子都有slotSharingGroup
函数,作用就是将其独立划分成一个组。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 监听端口
DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
// 切割每行数据,并收集到 Collector中
Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
}).returns(Types.TUPLE(Types.STRING, Types.INT)).slotSharingGroup("flatMapGroup")
.keyBy(0).sum(1);
sum.print("test");
env.execute();
}
注意这段代码,将flatMap 重新划分一个组,这样它就单独使用slot了。
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
// 切割每行数据,并收集到 Collector中
Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
}).returns(Types.TUPLE(Types.STRING, Types.INT)).slotSharingGroup("flatMapGroup")
再运行;flat map就单独拎出去了。
共享组的好处:同一个slot中,其他subtask之间就变争抢slot中资源,好比5个人在一个房子了,什么东西都是五个人再用,共享组相当于单独一个房间,房间里的资源都是属于他一个人的。实际工作中,通常会将任务比较重的,单独划分成一个组。
- 只有属于同一个slot共享组的subtask,才可以共享同一个slot
- 属于同一个算子的subtask,不能共享同一个slot。
比如Map算子并发度为3,相当于把任务分成了三份,若他们都在同一个slot中执行,将毫无意义,也是不允许的。所以会将它们分在不同的slot中,这样才能更好提高执行的并发度
网友评论