美文网首页
Flink 应用剖析

Flink 应用剖析

作者: 王小奕 | 来源:发表于2021-04-15 16:58 被阅读0次

    Flink 程序看起来跟普通程序没什么区别,都是处理数据流。每个程序都有几个相同的基础部分组成:
    1、申请执行环境
    2、加载/创造原始数据
    3、声明对于这些数据的转换过程
    4、声明存储这些转换后数据的目标地址
    5、触发程序结束
    接下来会对每个步骤做一个概述,想要了解细节的话请参考各自的部分。
    Java DataStream API 所有的核心类都可以在这里找到。

    1、申请执行环境

    StreamExecutionEnvironment是所有 Flink 程序的基础,通过调用它的以下这几个静态方法可以得到一个执行环境:

    StreamExecutionEnvironment.getExecutionEnvironment()
    
    StreamExecutionEnvironment.createLocalEnvironment()
    
    StreamExecutionEnvironment.createRemoteEnvironment(String host, int port, String... jarFiles)
    

    官方例子的main方法中第一行程序:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    

    一般来说,直接调用getExecutionEnvironment()即可,他会根据上下文自动处理。如果你像平时一样是在本地 IDE 执行程序,他会创建一个本地环境在你自己的机器上执行代码。如果你将代码打包成一个jar包,并且通过命令行(flink run XXX.jar)的方式提交给 Flink 集群,Flink 会为你的程序生成一个执行环境,让他在集群内完成执行。

    2、加载/创造原始数据

    谈到数据源的声明,以基于文件读取型的场景为例,Flink 的执行环境支持了多种方法让程序从文件读取:

    • 逐行读取
    • 读取为 CSV 文件
    • 使用其他提供的源

    下面以从txt文件逐行读取为例,讲解代码实现:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<String> text = env.readTextFile("file:///path/to/file");
    

    首先,依然是申请执行环境,之后调用该执行环境对象的env.readTextFile方法即可得到一个数据流。
    除了readTestFile()方法,Flink 还支持别的方式,同时也支持其他场景的数据源加载,见官方文档,这里仅做列表,不详细解释:

    2.1 文件读取型数据源

    • readTextFile(path):从 text 文件中逐行读取,读取进来后的事件是 string 型的
    • readFile(fileInputFormat, path):读取fileInputFormat类型的文档
    • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo):前面两个方法的内部调用实现

    2.2 Socket 型数据源

    • socketTextStream:从 socket 中读取,各元素以分隔符分开

    2.3 集合型

    • fromCollection(Collection):从Java.util.Collection中读取数据生成数据流。集合中所有元素必须是同类型。
    • fromCollection(Iterator, Class):从迭代器中读取数据生成数据流。参数Class用以声明元素类型
    • fromElements(T ...):从数组中读取对象数据生成数据流,所有对象必须是同类型。
    • fromParallelCollection(SplittableIterator, Class):Creates a data stream from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.
    • generateSequence(from, to):在from~to之间并行生成数据返回为一个数组。

    2.4 定制

    • addSource:添加一个新的数据源生成方法。如:从 Kafka 中读取时,可以这么写:addSource(new FlinkKafkaConsumer<>(...))。具体语法参考connector

    3、声明对于这些数据的转换过程

    得到源数据流后,接下来就是调用 Flink 的各个 Operators 来对数据流中的数据做“变身”动作了。以下面的代码为例:

    DataStream<String> input = ...;
    
    DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
        @Override
        public Integer map(String value) {
            return Integer.parseInt(value);
        }
    });
    

    这段代码调用了map这个算子,将input流中的所有string型数据都转换成int型,生成一个新的数据流赋值给parsed。
    当然,你可以做更复杂的“变身”,更多官方支持的算子及其语法介绍点我了解

    4、声明存储这些转换后数据的目标地址

    当你得到一个最终状态的数据流后,接下来要考虑的就是如何把这个数据流写道外部系统了,也就是创建一个 sink。最常见的就是打印到控制台,可以这么实现:

    parsed.print();
    

    Data sinks 消费数据流,并且把他们写入文件、sockets、外部系统,或者直接打印他们。Flink 内置了许多输出格式。

    • writeAsText() / TextOutputFormat:以string形式按行输出元素。该string是通过调用元素的toString()方法得到的。
    • writeAsCsv(...) / CsvOutputFormat:每个元素转换成一个逗号隔开的二元数组。行于列的分隔符可以配置。列的值同样是调用元素的toString()方法得到的。
    • print() / printToErr():直接打印到标准控制台。还可以传一个前缀参数加载字符串之前,用以区分不同的print()调用。 当集群开启并发模式且大于1的时候,还会把产生该数据的task的标识符作为前缀加到输出之前。
    • writeUsingOutputFormat() / FileOutputFormat:自定义文件类型,需要支持自定义类型与bytes类型的转换。
    • writeToSocket:根据序列化规则输出到socket。
    • addSink:调用外部sink方法。可以查看connector了解更多。

    一般来说,write*()方法都是用来掉使用的,这些方法输出的元素不参与 Flink 的 checkpoint 机制。产生的直接效果就是,这些方法输出的文档可能会有丢失或者延迟,而我们无法通过 Flink 的checkpoint 机制来保证。
    因此,为了保证流数据的精确送达,建议使用 StreamingFileSink。同时,通过.addSink(...)接口调用的外部实现也可以参与 Flink 的checkpoint机制,从而保证唯一性送达。

    5、触发程序结束

    完成上述所有步骤后,接下来就是触发程序执行,方法就是调用当前执行环境的execute()方法。根据执行环境的不同,Flink会自动选择是在本地执行,还是把代码提交给集群执行。
    调用env.execute()方法后,Flink 会等待程序执行完成,并根据执行模式不同做不同的后续处理。
    如果你不想等待job执行完成,你可以异步触发程序执行,这可以通过调用当前执行环境的executeAysnc()方法实现。此时,他会返回给你一个 JobClient,之后,你可以通过与 JobClient 交互来得到你提交的 job 的执行情况,如:

    final JobClient jobClient = env.executeAsync();
    final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();
    

    最后这一步在帮助理解 Flink 算子的执行时机以及方法极其重要。Flink 程序都是懒执行的,即:当程序的main()方法被执行时,数据的加载以及转换工作并不会马上开始,Flink 会首先创建一个 dataflow graph,并把他们添加进去。只有当execute()方法被触发时,算子们的处理才真正开始,这与执行环境类型无关,不管你是在本地执行,还是在集群中执行,都是如此。

    6、一个完整的例子

    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;
    
    public class WindowWordCount {
    
        public static void main(String[] args) throws Exception {
            // 申请执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 声明数据源:从套接字localhost:9999读取
            // 声明数据源的处理过程:.flat.Map().keyBy().window().sum()
            // 生成最终的结果数据源
            DataStream<Tuple2<String, Integer>> dataStream = env
                    .socketTextStream("localhost", 9999)
                    .flatMap(new Splitter())
                    .keyBy(value -> value.f0)
                    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                    .sum(1);
            // 配置sink,将结果集打印到控制台
            dataStream.print();
            // 触发执行
            env.execute("Window WordCount");
        }
    
        public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
            @Override
            public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
                for (String word: sentence.split(" ")) {
                    out.collect(new Tuple2<String, Integer>(word, 1));
                }
            }
        }
    
    }
    

    编码完成后,本地启动socket输入端:

    nc -lk 9999
    

    然后敲入一些单词,之后启动 job,查看 job 的控制台输出,是你输入的这些么?

    相关文章

      网友评论

          本文标题:Flink 应用剖析

          本文链接:https://www.haomeiwen.com/subject/hsndlltx.html