美文网首页
Flink(1.13) 执行模式(Execution Mode)

Flink(1.13) 执行模式(Execution Mode)

作者: 万事万物 | 来源:发表于2021-08-22 09:53 被阅读0次

    前言

    Flink从1.12.0上对流式API新增一项特性:可以根据你的使用情况和Job的特点, 可以选择不同的运行时执行模式(runtime execution modes).
    流式API的传统执行模式我们称之为STREAMING 执行模式, 这种模式一般用于无界流, 需要持续的在线处理
    1.12.0新增了一个BATCH执行模式, 这种执行模式在执行方式上类似于MapReduce框架. 这种执行模式一般用于有界数据.
    默认是使用的STREAMING 执行模式

    选择执行模式

    BATCH执行模式仅仅用于有界数据, 而STREAMING 执行模式可以用在有界数据和无界数据.
    一个公用的规则就是: 当你处理的数据是有界的就应该使用BATCH执行模式, 因为它更加高效. 当你的数据是无界的, 则必须使用STREAMING 执行模式, 因为只有这种模式才能处理持续的数据流.

    配置BATH执行模式

    执行模式有3个选择可配:

    1. STREAMING(默认):有界数据和无界数据
    2. BATCH:有界数据
    3. AUTOMATIC:

    传统的方式:

    1. 批处理:
    // 获取环境
    ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
    
    // 读取资源
    DataSource<String> dataSource = executionEnvironment.readTextFile("文件地址");
    
    1. 有界流
    StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();
    
    // 读取资源
    DataStreamSource<String> streamSource = env.readTextFile("文件地址");
    
    env.execute();
    
    1. 无界流
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    // 监听端口
    DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999)
    
    env.execute();
    

    批处理与流处理的区别:

    1. 批处理处理数据,是一批一批对数据处理,spark就是一个微批数据处理引擎,可以理解成先对数据积压,然后达到一定量再一块处理。
    2. 流处理,有数据就处理,不需要积压数据
    3. 批处理无需保留数据状态,处理完就输出。
    4. 流处理需要保留数据状态,因为也有可能还有该数据。
    5. 批处理完成,程序就停止。
    6. 流处理,需要一直等待,即使后面不会有数据产生,程序依然保存运行状态。

    有界与无界的理解:

    有界流与无界流的区别在于读取的数据是否有尽头,若读取的数据类似于文件(知道开始的位置,结束的位置),无界流就是知道开始但不知道什么时候结束,如网络,Kafka,需要不同的监听着,等待处理数据。

    案例(wordcount)

    流式处理

    程序比较简单,就没加注释

        @Test
        public void wordCount1() throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<String> source = env.readTextFile("D:\\project\\idea\\flink\\input\\wordcount.txt");
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (value, out)
                    -> Arrays.stream(value.split(" "))
                    .forEach(s -> out.collect(Tuple2.of(s, 1)))).returns(Types.TUPLE(Types.STRING, Types.INT));
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> result = flatMap.keyBy(e -> e.f0).sum(1);
    
            result.print();
    
            env.execute();
        }
    

    结果

    5> (python,1)
    12> (word,1)
    3> (java,1)
    13> (xml,1)
    1> (pon,1)
    11> (log,1)
    7> (txt,1)
    1> (pon,2)
    11> (exe,1)
    3> (java,2)
    11> (log,2)
    5> (python,2)
    5> (hello,1)
    5> (python,3)
    5> (hello,2)
    3> (java,3)
    13> (xml,2)
    14> (count,1)
    11> (log,3)
    13> (xml,3)
    14> (batch,1)
    

    批处理

        @Test
        public void wordCount2() throws Exception {
    
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            // 和流式处理,是两套完全不同的api
            DataSource<String> source = env.readTextFile("D:\\project\\idea\\flink\\input\\wordcount.txt");
    
            FlatMapOperator<String, Tuple2<String, Integer>> flatMap = source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (value, out)
                    -> Arrays.stream(value.split(" "))
                    .forEach(s -> out.collect(Tuple2.of(s, 1)))).returns(Types.TUPLE(Types.STRING, Types.INT));
    
            AggregateOperator<Tuple2<String, Integer>> result = flatMap.groupBy(0).sum(1);
    
            result.print();
        }
    

    结果

    (pon,2)
    (hello,2)
    (log,3)
    (xml,3)
    (exe,1)
    (java,3)
    (python,3)
    (txt,1)
    (batch,1)
    (count,1)
    (word,1)
    

    设置执行模式

    传统上的批处理和流处理,需要两套不同的API来处理,不太符合Flink中流批一体的理念,此时执行模式的出现完美的解决了问题。只需要指定一个执行模式,就可以完成流与批之间的相互转换,其他代码都不用修改。

    执行模式所支持的模式:

    @PublicEvolving
    public enum RuntimeExecutionMode {
    
        /**
         * The Pipeline will be executed with Streaming Semantics. All tasks will be deployed before
         * execution starts, checkpoints will be enabled, and both processing and event time will be
         * fully supported.
         */
        STREAMING,
    
        /**
         * The Pipeline will be executed with Batch Semantics. Tasks will be scheduled gradually based
         * on the scheduling region they belong, shuffles between regions will be blocking, watermarks
         * are assumed to be "perfect" i.e. no late data, and processing time is assumed to not advance
         * during execution.
         */
        BATCH,
    
        /**
         * Flink will set the execution mode to {@link RuntimeExecutionMode#BATCH} if all sources are
         * bounded, or {@link RuntimeExecutionMode#STREAMING} if there is at least one source which is
         * unbounded.
         */
        AUTOMATIC
    }
    

    转换成批处理

        @Test
        public void wordCount1() throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 转成批处理,其他都不用改
            env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    
            DataStreamSource<String> source = env.readTextFile("D:\\project\\idea\\flink\\input\\wordcount.txt");
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (value, out)
                    -> Arrays.stream(value.split(" "))
                    .forEach(s -> out.collect(Tuple2.of(s, 1)))).returns(Types.TUPLE(Types.STRING, Types.INT));
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> result = flatMap.keyBy(e -> e.f0).sum(1);
    
            result.print();
    
            env.execute();
        }
    

    结果

    1> (pon,2)
    5> (hello,2)
    5> (python,3)
    3> (java,3)
    7> (txt,1)
    14> (batch,1)
    14> (count,1)
    13> (xml,3)
    11> (exe,1)
    11> (log,3)
    12> (word,1)
    

    注意:

    1. 在13版本之前不要使用执行模式,若数据只有一个(如: (txt,1)),那么该数据不会被输出,13版本修复了该问题。
    2. 批处理不会存状态(处理完就直接输出了,所以没有必要保留状态)

    相关文章

      网友评论

          本文标题:Flink(1.13) 执行模式(Execution Mode)

          本文链接:https://www.haomeiwen.com/subject/btnkbltx.html