美文网首页
Flink(1.13) 核心概念

Flink(1.13) 核心概念

作者: 万事万物 | 来源:发表于2021-08-20 14:36 被阅读0次

    TaskManager与Slots

    Flink中每一个worker(TaskManager)都是一个JVM进程,它可能会在独立的线程上执行一个Task。为了控制一个worker能接收多少个task,worker通过Task Slot来进行控制(一个worker至少有一个Task Slot)。
    这里的Slot如何来理解呢?很多的文章中经常会和Spark框架进行类比,将Slot类比为Core,其实简单这么类比是可以的,可实际上,可以考虑下,当Spark申请资源后,这个Core执行任务时有可能是空闲的,但是这个时候Spark并不能将这个空闲下来的Core共享给其他Job使用,所以这里的Core是Job内部共享使用的。接下来我们再回想一下,之前在Yarn Session-Cluster模式时,其实是可以并行执行多个Job的,那如果申请两个Slot,而执行Job时,只用到了一个,剩下的一个怎么办?那我们自认而然就会想到可以将这个Slot给并行的其他Job,对吗?所以Flink中的Slot和Spark中的Core还是有很大区别的。
    每个task slot表示TaskManager拥有资源的一个固定大小的子集。假如一个TaskManager有三个slot,那么它会将其管理的内存分成三份给各个slot。资源slot化意味着一个task将不需要跟来自其他job的task竞争被管理的内存,取而代之的是它将拥有一定数量的内存储备。需要注意的是,这里不会涉及到CPU的隔离,slot目前仅仅用来隔离task的受管理的内存。

    • 每个TaskManager提供的任务插槽数。
    # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
    
    taskmanager.numberOfTaskSlots: 1
    
    • 总结
    1. 一个worker至少有个一个task slot
    2. slot之间可以共享资源
    3. slot之间是内存隔离不是cpu隔离

    4.2.2Parallelism(并行度)

    并行度

    一个特定算子的子任务(subtask)的个数被称之为这个算子的并行度(parallelism),一般情况下,一个流程序的并行度,可以认为就是其所有算子中最大的并行度。一个程序中,不同的算子可能具有不同的并行度。
    Stream在算子之间传输数据的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式,具体是哪一种形式,取决于算子的种类。

    • 并行度
      是一个动态的概念,是指在运行中的。比如 有五台游戏机,但是只有两台有人再玩,那么并行度是五还是二?答案是二,因为工作的只有两台。而五表示处理能力,表示最多能支持五个人同时一起玩,是一个静态概念。

    • flink默认并行度 1

    # The parallelism used for programs that did not specify and other parallelism.
    
    parallelism.default: 1
    
    • One-to-one:
      stream(比如在source和map operator之间)维护着分区以及元素的顺序。那意味着flatmap 算子的子任务看到的元素的个数以及顺序跟source 算子的子任务生产的元素的个数、顺序相同,map、fliter、flatMap等算子都是one-to-one的对应关系。类似于spark中的窄依赖

    • Redistributing:
      stream(map()跟keyBy/window之间或者keyBy/window跟sink之间)的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如,keyBy()基于hashCode重分区、broadcast和rebalance会随机重新分区,这些算子都会引起redistribute过程,而redistribute过程就类似于Spark中的shuffle过程。类似于spark中的宽依赖

    并行度:

    • A,B,C三个算子,并行度为3,需要几个slot?
      至少需要3三个slot

    • slot 只有2个,会怎么样?
      将无法运行,直接报错。

    优先级:
    算子指定 > env全局指定 > 提交参数 > 配文件

    同一个算子的并行实例,不能在同一个slot里,

    slot数量必须大于等于最大并行度,才可以正常运行,总结slot的数量是由最大并行度所决定的。

    并行对与subtask的关系:

    共享组

    在默认情况下,JobManager 会将志同道合算子划分到一起,他们都属于在一个组内(default)

    示例图

    如上图:socket 算子和 FlatMap 分在一个组内。若想把它们拆开可以自定义共享组,每个算子都有slotSharingGroup函数,作用就是将其独立划分成一个组。

        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 监听端口
            DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);
    
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
                // 切割每行数据,并收集到 Collector中
                Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
            }).returns(Types.TUPLE(Types.STRING, Types.INT)).slotSharingGroup("flatMapGroup")
                    
                    .keyBy(0).sum(1);
    
            sum.print("test");
    
            env.execute();
    
        }
    

    注意这段代码,将flatMap 重新划分一个组,这样它就单独使用slot了。

         SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
                // 切割每行数据,并收集到 Collector中
                Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
            }).returns(Types.TUPLE(Types.STRING, Types.INT)).slotSharingGroup("flatMapGroup")
    

    再运行;flat map就单独拎出去了。


    共享组的好处:同一个slot中,其他subtask之间就变争抢slot中资源,好比5个人在一个房子了,什么东西都是五个人再用,共享组相当于单独一个房间,房间里的资源都是属于他一个人的。实际工作中,通常会将任务比较重的,单独划分成一个组。

    1. 只有属于同一个slot共享组的subtask,才可以共享同一个slot
    2. 属于同一个算子的subtask,不能共享同一个slot。
      比如Map算子并发度为3,相当于把任务分成了三份,若他们都在同一个slot中执行,将毫无意义,也是不允许的。所以会将它们分在不同的slot中,这样才能更好提高执行的并发度

    相关文章

      网友评论

          本文标题:Flink(1.13) 核心概念

          本文链接:https://www.haomeiwen.com/subject/swnnvltx.html