Flink应用开发

作者: 零度沸腾_yjz | 来源:发表于2019-03-07 21:34 被阅读4次

    项目构建

    项目模板

    Flink应用项目可以使用Maven或SBT来构建项目,Flink针对这些构建工具提供了相应项目模板。
    Maven模板命令如下,我们只需要根据提示输入应用项目的groupId、artifactId、version和package路径即可。

    mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.2
    

    目录结构和我们使用IDEA创建的目录结构基本一样,只是它会帮我们引入Flink依赖和日志依赖。

    <flink.version>1.7.2</flink.version>
    <scala.binary.version>2.11</scala.binary.version>
    ...
    <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    

    flink-java和flink-streaming-java_2.11是我们使用Java开发Flink应用程序的必要依赖。

    默认也帮我们引入maven-shade-plugin插件,所以在打包的时候记得将mainClass改成自己的主类。

    SBT模板可以使用以下命令获取:

    sbt new tillrohrmann/flink-project.g8
    

    SBT版本需要大于等于0.13.13版本。

    应用程序依赖

    Flink应用程序开发依赖项可以分为两类:

    • Flink核心依赖(Flink core Dependencies):它是Flink运行系统所需的类和依赖项,也就是Flink项目的核心代码和所使用的依赖。比如实现的:调度、通信、checkpoint、API等。我们上面所引入的就是Flink的核心依赖,对于核心依赖我们只需要将依赖作用范围scope设置为provided即可,也就是不将依赖打入jar包。因为对于这些核心依赖,Flink运行集群能够为我们提供。

    • 应用程序依赖(User Application Dependencies):这部分就是我们开发应用程序所需要的一些其它依赖项,比如连接器、格式化库、Flink CEP、Flink SQL、Flink ML等。在打包应用程序的时候,我们需要将这些依赖项与我们的应用程序代码一同打入到一个jar包中。

    //Flink核心依赖
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>1.7.2</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>1.7.2</version>
      <scope>provided</scope>
    </dependency>
    //应用程序依赖
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
        <version>1.7.2</version>
    </dependency>
    

    Flink 开发

    Flink应用程序一般先是通过数据源创建分布式数据集合,比如读取文件、Kafka或本地缓存。然后对分布式集合进行各种转换操作,比如过滤、聚合、映射、修改状态、定义窗口等。最后通过接收器(sink)返回结果,结果可以写入文件(分布式)、DB或标准化输出。

    根据数据源的类型,也就是有界数据源(bounded)或无界数据源(unbounded),我们可以编写批处理程序(batch)和流处理程序(streaming)。Flink对于批处理程序和流处理程序提供了不同的API,其中DataSet API用于批处理,DataStream API用于流处理。尽管提供的API不同,但是Flink底层数据处理方式是一致的。

    在Flink程序中DataSet和DataSteam用来表示程序中的数据集合,这些数据集是不可变的(immutable)。DataSet代表有限的数据集,而DataSteam代表无限的数据集合。

    编写Flink应用程序

    编写Flink应用程序基本可以分为以下5个步骤:

    1. 获取应用程序的执行环境(execution environment)。
    2. 加载/创建初始数据集合。
    3. 对数据集执行转换操作(transformation)。
    4. 指定计算结果输出。
    5. 触发程序执行。

    DataSet API在org.apache.flink.api.java 包中;DataStream API在org.apache.flink.streaming.api包中。

    批处理和流处理的程序步骤是一致的,下面给出流处理作业的编写步骤。

    获取应用程序执行环境

    获取执行环境是Flink程序的基础。对于流处理程序的执行环境为StreamExecutionEnvironment,我们可以通过StreamExecutionEnvironment静态方法获取。对于批处理作业执行环境为ExecutionEnvironment,同样需要使用ExecutionEnvironment获取。

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host,port,jarFile);
    

    一般我们只需要通过getExecutionEnvironment()方法获取执行环境即可,因为他会根据当前环境自动创建合适的执行环境,比如我们在本地IDE执行程序,它将创建一个本地执行环境(local environment),该环境将在本地机器执行;如果我们将应用程序打成jar包交给Flink集群执行,getExecutionEnvironment()将返回一个集群执行环境。createLocalEnvironment()会创建一个本地执行环境,createRemoteEnvironment()会创建一个远程执行环境。
    同理,批处理执行环境ExecutionEnvironment以同样方式创建执行环境。

    加载/创建初始数据集合

    加载/创建初始数据集合,一般主要是读取分布式文件、读取Kafka队列等。

    //通过执行环境提供的方法读取外部数据集
    DataStreamSource<String> textFile =  env.readTextFile("/Users/yangjianzhang/development/flink-1.7.2/LICENSE");
    

    读取数据集之后会返回一个DataStream(DataStreamSource是DataStream子类,用于获取数据源数据集的),我们之后的转换、输出存储操作都可以通过DataStream提供的API进行操作了。

    执行转换操作

    我们能够通过DataSteam提供的API进行各种转换(transformation)操作。转换函数我们可以通过三种方式实现:实现接口、匿名类和Lambda表达式。

    //实现接口
    class MyMap implements MapFunction<String,String> {
                @Override
                public String map(String s) throws Exception {
                    return s.toUpperCase();
                }
    }
    data.map(new MyMap());
    //通过匿名类方式
    textFile.map(new MapFunction<String, String[]>() {
                @Override
                public String[] map(String s) throws Exception {
                    return s.split(" ");
                }
            });
    //使用Lambda表达式
    DataStream<String> filterLine  = textFile.filter(line -> line.contains("flink"));
    

    转换操作除了提供了基础接口(比如MapFunction),还提供了丰富函数(Rich Function)。Rich Function除了提供用户定义函数,还提供了其它四个方法:open、close、getRuntimeContext和setRuntimeContext。在一些场景,这些方法都是很有用的。

    指定计算结果输出

    我们可以将计算结果打印出来,也可以直接将结果写入到文件中。

    filterLine.print();     
    filterLine.writeAsText("/Users/yangjianzhang/development/data/flink/contains.txt");
    

    触发程序执行

    Flink应用程序是懒执行的(lazy execution)。也就是说当程序main方法被执行时,数据的加载和转换并不会立即触发,而是会将每一步操作添加到执行计划中,当执行环境通过execute()方法显示触发时,才会进行具体的执行操作。
    execute()方法会返回一个JobExecutionResult,它包含执行时间以及累加器(accumulator)结果。

    //触发程序执行
    JobExecutionResult result = env.execute();
    

    通过懒执行评估(lazy evaluation)机制,我们可以构建复杂的数据处理程序,Flink会将整个执行计划作为一个执行单元来一起执行。

    Demo

    下面是根据上面编写程序步骤给出的完整Demo:

    public class DemoStreamingJob {
        public static void main(String[] args) throws Exception {
            //Streaming process
    
            //step1 获取/创建执行环境
            //自动选择正确的执行环境
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //step2 加载/创建初始数据
            DataStreamSource<String> textFile =  env.readTextFile("/Users/yangjianzhang/development/flink-1.7.2/LICENSE");
            //step3  对数据源数据进行转换操作
            DataStream<String> filterLine  = textFile.filter(line -> line.contains("flink"));
    
            //step4 指定计算结果输出位置
     filterLine.writeAsText("/Users/yangjianzhang/development/data/flink/contains.txt");
    
            //step5 最后触发程序执行
            JobExecutionResult result = env.execute();
        }
    }
    

    指定key操作

    Flink数据模型并不是基于键值对的,所以我们不需要将数据放到键值对中再传递给Flink。但是有时许多转换操作需要基于key值进行操作,比如join、groupBy、Reduce、Aggregator等等。
    Flink针对上面这些问题,提出了“虚拟”key的概念。也就是说在传递过来的数据上,通过指定具体数据项为这个消息的key。
    比如我们传递进来的数据是一个tuple元组:

    ("yjz",27,10234),
    ("yjz",27,21456)
    ("ls",28,12345)
    

    我们可以指定元组中的第一个元素为key。

    DataStream<Tuple3<String,Integer,Long>> streamInput = ...;
    DataStream<...> windowed =  streamInput.keyBy(0).window(...);
    

    如果是DataSet可以通过groupBy来指定key:

    DataSet<Tuple3<String,Integer,Long>> inputDataSetTuple = ...;
    DataSet<...> reduced =  inputDataSetTuple.groupBy(0).reduce(...);
    

    上面只是针对tuple类型数据通过位置来简单的指定key,下面我们看一下Flink都支持了哪些指定key的方式。

    定义Tuple类型中的key

    上面我们已经讲了可以通过指定position来指定虚拟key,我们还可以使用的的更复杂一些。比如有些场景需要使用组合多个field的方式来指定key:

    KeyedStream<Tuple3<String,Integer,Long>> =  streamInput.keyBy(0,1);
    

    上面使用元组中的第一个字段和第二个字段组合成一个key来使用。

    使用字段表达式(Key Expression)指定key

    使用字段表达式能够更加灵活的指定key,它可以用来指定POJO对象、Tuple元组中的key。
    对于POJO对象可以通过a.b的形式来指定key。比如有以下POJO对象:

    class Wc{
        public User user;
        public int count; 
    }
    class User{
        public String name;
        public Tuple2<String,Integer> tuple;
        public int age;
    }
    

    可以使用User作为key:words.keyBy("user")。
    也可以使用User中的name作为key:words.keyBy("user.name")。

    对于元组类型,我们即可以直接使用下标(从0开始),也可以使用"fx"来代表,比如第一个元素则用"f0"表示(感觉和直接使用下标没有区别)。当然可以和POJO对象组合使用:words.keyBy("user.tuple.f0")。

    使用key选择器

    使用key选择器能够以单个元素输入,并可以返回任意类型的元素key。
    下面是返回字符串类型的key:

    KeyStream<String> keyed = textFile.keyBy(new KeySelector<User,String>() {
                @Override
                public String getKey(User user) throws Exception {
                    return user.name;
                }
            });
    

    Flink支持的数据类型

    为了确保系统能够以确切有效的方式针对不同类型执行不同的策略,Flink提供了以下六种不同类别的数据类型。

    1. Java Tuple和Scala class类型。
    2. POJO对象类型。
    3. 原始数据类型(Primitive Type)。
    4. 常规类型(General Class Types)
    5. Values类型(自定义序列化类型)
    6. Hadoop Writable

    Java Tuple和Scala class

    元组类型是包含固定数量,具有各种类型字段的复合类型。Java API提供了Tuple1~Tuple25的元组类型,后面的数字代表元组中的元素个数,所以我们可以看出最多支持25个元素的元组。但是我们可以通过嵌套来存储更多的元素数据,每个字段都可以是Flink的任意类型数据(包括元组)。元组通过tuple.f0或tuple.getField(int position)来获取字段数据,索引从0开始。

    DataStreamSource<Tuple2<String,Integer>> wordCounts = env.fromElements(new Tuple2<String,Integer>("hello",2),new Tuple2<String,Integer>("word",5));
    DataStream<Integer> counts = wordCounts.map(word -> word.f1);
            wordCounts.keyBy(0);
    

    Scala中的Case class可以代替Java中的Tuple类型。

    POJO对象类型

    如果Java或Scala类满足以下情况,则可以作为POJO对象类型被Flink处理。

    1. 类必须是public访问类型。
    2. 必须有一个无参的公共构造函数。
    3. 所有字段字段都有public类型,或者有对应的setXxx()和getXxx()方法。
    4. 类中的字段类型必须是Flink所支持的数据类型。

    自己一直理解Scala中的Case Class就是Java中的POJO,但是官方文档将Java Tuple和Scala Case Class放在一起,估计是以使用方式来进行划分的。
    Flink中的POJO是使用Avro序列化框架进行序列化的。

    public class UserCount{
                public String name;
                public int count;
    
                public UserCount() {}
                public UserCount(String name,int count) {
                    this.name = name;
                    this.count = count;
                }
     }
    DataStreamSource<UserCount> userCount = env.fromElements(new UserCount("zhangsan",1),new UserCount("lisi",4));
    userCount.keyBy("name");
    

    原始数据类型(Primitive Types)

    Flink支持Java或Scala中的所有原始数据类型,比如String、int、Double等。

    常规类型(General Class Types)

    除了POJO类型外,Flink能够支持Java和Scala大部分类。但是对于一些包含不能被序列化的字段类、I/O流类、或其它本地资源类是不支持的。
    Flink会对常规类型以黑盒方式进行操作(无法访问其内容),使用Kryo进行常规类的序列化与反序列化。

    上面POJO使用Avro序列化框架,这里的常规类型使用Kryo序列化框架,原因需要考证一下。

    Values类型(自定义序列化类型)

    值类型是使用手动序列化与反序列化来代替通用序列化框架。通过实现org.apache.flinktypes.Value接口来自己实现序列化与反序列。当使用通用序列化框架效率比较低的时候,使用Value类型是非常合理的。比如对于一个数组,我们知道它大部分都是0,那么我们可以对非零元素进行特殊编码即可,而不是使用通用框架对所有元素进行编码。
    Flink对于基本类型提供预定义的Value类型:ByteValue、ShortValue、IntValue、LongValue、FloatValue、DoubleValue、StringValue、CharValue、BooleanValue。

    Hadoop Writable

    我们还可以使用实现了org.apache.hadoop.Writable接口的数据类型。其中使用write()方法和readFields()方法进行序列化和反序列化。

    除了上面这六种类型数据,我们还可以使用一些特殊类型,比如Scala的Either、Option、Try等,Java中自定义的Either等。

    关注我

    欢迎关注我的公众号,会定期推送优质技术文章,让我们一起进步、一起成长!
    公众号搜索:data_tc
    或直接扫码:🔽


    欢迎关注我

    相关文章

      网友评论

        本文标题:Flink应用开发

        本文链接:https://www.haomeiwen.com/subject/sotkpqtx.html