美文网首页
Flink__Flink1.10.0Basic API Conc

Flink__Flink1.10.0Basic API Conc

作者: guyuetftb | 来源:发表于2020-11-05 20:13 被阅读0次

    Flink 程序是可以在分布式数据集上实现类似常规数据加工操作的框架 (例如:filtering, mapping, updating state, joining, grouping, defining windows, aggregating)。数据集合从 Source 创建(例如: 从文件中读取,从kafka topics中消费, 从本地内存中读取)。数据处理结果通过Sink,写出到分布式文件系统标准输出等。Flink程序可以在多种上下文环境中运行,如standalone,或 嵌入其他程序中。 Flink程序可以在本地 JVM 中运行,也可以在包含多台机器的集群中运行。

    通过不同类型的数据源,即:有限数据源,或无限数据源,你可以编写批处理程序或流处理程序,其中 DataSet API 用于批处理,DataStream API 用于流处理。本文将介绍两个 API 的共同的概念,但是有关使用每个 API 编写具体程序的信息,请参阅流处理指南批处理指南

    注意: 当我们展示如何使用APIs的实际例子时,流处理中使用StreamingExecutionEnvironmentDataStream API,批处理中使用ExecutionEnvironmentDataSet API。

    DataSet and DataStream

    Flink 程序通过特定的 DataSet 和 DataStream 类型表示数据。你可以认为他们是不可改变的、包含重复数据的集合。在DataSet中数据是有限的。在DataStream中数据是无限的。
    这些集合在某些关键点上不同于普通Java集合。首先,这些集合是不可变的,也就是说一旦他们被创建,你不能添加、删除里面的元素。你也不能简单的检查窥视其中的元素。
    Flink 程序中的集合最初是通过在Flink程序中添加的 Source 来创建的,然后通过使用例如 map,filter 等的API 算子 对其进行转换来从中获得新的集合。

    Anatomy of a Flink Program

    Flink 程序看起来很像普通的数据转换程序。每个 Flink 程序包含以下几个基本部分:

    1. 获取一个 execution environment对象。
    2. 加载/创建 初始化数据。
    3. 对这个数据进行transformation 加工操作。
    4. 指定计算结果要输出的地方。
    5. 触发 Flink 程序执行。

    现在我们给出每个步骤的概要描述,有关更多详细信息请参阅相应的部分。注意 与 DataSet API相关的所有核心类都位于org.apache.flink.api.java
    中,与DataStream API相关的所有核心类都位于org.apache.flink.streaming.api
    包中。
    StreamExecutionEnvironment是所有 Flink 应用程序的基础。你可以通过调用StreamExecutionEnvironment类的其中一个静态方法来获取该对象

    getExecutionEnvironment()
    createLocalEnvironment()
    createRemoteEnvironment(String host, int port, String... jarFiles)
    

    通常,你仅仅需要调用getExecutionEnvironment()方法来获取 StreamExecutionEnvironment,因为 Flink框架会根据上下文环境来获取正确的 StreamExecutionEnvironment。如果你在本地环境的IDE中执行你的应用或当作普通Java程序执行应用,Flink 框架会创建一个local environment本地的执行环境。如果把你的程序打成 Jar 包,通过命令行执行程序,Flink Cluster Manager将会执行你的main函数,getExecutionEnvironment() 方法将创建一个在集群环境中运行所需要的execution environment

    通过特定的Source,Flink执行环境execution environment实现了多种数据读取方式: 你可以从 CSV文件中行一行一行读取数据,或者从定义的 InputFormat 中读取。要将文本文件以行的形式读入,可以使用以下函数:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    DataStream<String> text = env.readTextFile("file:///path/to/file");
    

    上面的程序会返回一个 DataStream,之后你可以在这个 DataStream 上进行各种转转换transformation 生成新的 DataStream。例如:

    DataStream<String> input = ...;
    
    DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
        @Override
        public Integer map(String value) {
            return Integer.parseInt(value);
        }
    });
    

    这将原始的 DataStream 的每个 String 转换为一个新的包含 Integer 数据DataStream
    一旦你创建的 DataStream中包含了需要的最终结果,你就可以通过创建一个 Sink 将结果输出到外部系统。如以下例子:

    // 输出到文件
    writeAsText(String path)
    
    // 输出到标准输出
    print()
    

    一旦你设定(开发)完Flink程序,就需要调用StreamExecutionEnvironment对象的execute()函数来触发程序执行。程序具体是在你本地执行,还是会提交到集群中依赖于程序中创建的ExecutionEnvironment的类型。
    execute() 方法会等待Job执行完才返回JobExecutionResult 对象,其中包含程序执行耗费的时间和累加器的结果。
    如果你不想等待Job执行完,你可以调用StreamExecutionEnvironment的异步执行方法executeAysncexecuteAysnc会返回一个 JobClient,你可以通过 JobClient 与你刚提交的 Job 通信交互。下面的例子就通过调用executeAsync()方法实现execute()一样的功能

    final JobClient jobClient = env.executeAsync();
    
    final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult(userClassloader).get();
    

    有关DataStream Source和Sink 的更多,更深入的信息,以及DataStream上如何进行更多的transformation操作,请参考 Streaming Guide

    Lazy Evaluation

    Flink 所有的程序都是延迟执行的。无论是在 本地环境 运行 Flink 程序,还是在 集群环境 运行 Flink 程序。当 Flink 程序的 main 方法被调用后,数据加载(Load)和转换(Transformation)操作不会立即执行。而是先创建具体操作(operation),并将操作(operation)添加到程序执行计划中。只有在 execution environment对象上明确调用execute()函数时,Flink 程序具体的操作 operation才会执行。
    lazy evaluation(延迟执行)使你可以构建复杂的Flink程序,Flink将其作为一个整体计划的单元执行。

    Specifying Keys

    一些转换操作transformations (join, coGroup, keyBy, groupBy)要求集合元素包含一个 Key 值。其他转换操作(Reduce, GroupReduce, Aggregate, Windows)允许在使用数据之前对数据分组。

    DataSet按key分组,然后进行窗口操作:

    DataSet<...> input = // [...]
    DataSet<...> reduced = input
      .groupBy(/*define key here*/)
      .reduceGroup(/*do something*/);
    

    同样的,也可以对DataStream按key 分组,然后进行窗口操作:

    DataStream<...> input = // [...]
    DataStream<...> windowed = input
      .keyBy(/*define key here*/)
      .window(/*window specification*/);
    

    Flink 的数据模型不是基于 Key/Value 对的。因此,你不需要将数据集类型转换成Key/Value 的形式。Key 是虚拟的,具体哪个值为Key,是通过在实际的数据集上指定相应的函数实现的,通过这种方式来指导分组运行符(grouping operator)

    注意: 在下面的讨论中,我们将使用 DataStream API 和 keyBy。对于 DataSet API 相应替换成 DataSet 和 groupBy 即可。

    Define keys for Tuples

    最简单的例子是根据Tuple 的一个或多个fields,对Tuple(元组)进行聚合操作。下面的例子是按 Tuple 的 第一 field 分组。

    DataStream<Tuple3<Integer,String,Long>> input = // [...]
    KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
    

    下面我们根据元素的第1个,第2个 fields对元素分组。

    DataStream<Tuple3<Integer,String,Long>> input = // [...]
    KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
    

    关于内嵌Tuple 有一个需要注意的地方,指定keyBy(0)系统会使用整个 Tuple2<Integer, Float> 作为分组的 Key(包含一个 Integer 和 Float 值)。如果要“深入”到嵌套的Tuple2中,则必须使用接下来介绍的字段表达式(field expression)指定键值。

    DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
    

    Define keys using Field Expressions

    可以通过基于字符串(string-based)的字段表达式(field expression)来引用内嵌的字段,通过这种方式为grouping, sorting, joing, or coGrouping等运算符指定 key 值。
    字段表达式(Field expressions)使得从Tuple 和 POJO 等复杂数据类型中选择 key 变得非常容易。
    在下面的例子中,我们定义了一个包含2个字段的 POJO: WC(word,count)。使用 word字段分组,仅需要把word字段的名子传入keyBy()函数即可。

    // some ordinary POJO (Plain old Java Object)
    public class WC {
      public String word;
      public int count;
    }
    DataStream<WC> words = // [...]
    DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
    
    Field Expression Syntax:
    • 通过 POJO 的字段名指定字段。例如: user指的是一个POJO的user字段。
    • 通过 Tuple 从0开始的索引,或Tuple 的字段名指定字段。例如:f05指的是Tuple 的第1个和第6个字段。
    • 你可以选择 POJO 或 Tuple 中内嵌的字段。例如: user.zip指的是一个 POJO 中 user字段的zip字段。Flink 支持POJO 和 Tuple 类型的任意混合,比如:f1.user.zip或者user.f3.1.zip
    • 你也可以通过*(星号)来模糊匹配所有的类型。*(星号)形式的模糊匹配也适用不是 POJO 和 Tuple 类型的。
    public static class WC {
      public ComplexNestedClass complex; //nested POJO
      private int count;
      // getter / setter for private field (count)
      public int getCount() {
        return count;
      }
      public void setCount(int c) {
        this.count = c;
      }
    }
    public static class ComplexNestedClass {
      public Integer someNumber;
      public float someFloat;
      public Tuple3<Long, Long, String> word;
      public IntWritable hadoopCitizen;
    }
    
    • 'count':是 WC 类的 count 字段。
    • 'complex': 递归选择复合数据类型ComplexNestedClass POJO 的所有字段。
    • 'complex.word.f2': 选择内嵌 Tuple3的最后一个字段。
    • 'complex.hadoopCitizen': 选择 Hadoop IntWritable 类型。

    Define keys using Key Selector Functions

    另一种指定 key 的方式是通过key selector(key选择器)。key 选择器输入一个元素,输出这个元素的 key。这个 key 可以是任意类型,也可以从计算中获得。

    下面的例子简单展示了如何通过一个KeySelector 函数返回一个 对象的字段作为键的例子。

    // some ordinary POJO
    public class WC {public String word; public int count;}
    DataStream<WC> words = // [...]
    KeyedStream<WC> keyed = words
      .keyBy(new KeySelector<WC, String>() {
         public String getKey(WC wc) { return wc.word; }
       });
    

    Specifying Transformation Functions

    大多数的转换操作(transformations)需要用户自定义函数。下面介绍几种不同的指定自定义函数的方式。

    Implementing an interface 实现一个接口
    class MyMapFunction implements MapFunction<String, Integer> {
      public Integer map(String value) { return Integer.parseInt(value); }
    };
    data.map(new MyMapFunction());
    
    Anonymous classes匿名类
    data.map(new MapFunction<String, Integer> () {
      public Integer map(String value) { return Integer.parseInt(value); }
    });
    
    Java 8 Lambdas
    data.filter(s -> s.startsWith("http://"));
    
    data.reduce((i1,i2) -> i1 + i2);
    
    Rich functions

    所有接收用户自定义函数的转换操作(transformations)都可以用一个rich function来代替:

    class MyMapFunction implements MapFunction<String, Integer> {
      public Integer map(String value) { return Integer.parseInt(value); }
    };
    
    // 也可以这样写
    class MyMapFunction extends RichMapFunction<String, Integer> {
      public Integer map(String value) { return Integer.parseInt(value); }
    };
    
    data.map(new MyMapFunction());
    
    // 同样的rich function也可以用匿名函数来实现
    data.map (new RichMapFunction<String, Integer>() {
      public Integer map(String value) { return Integer.parseInt(value); }
    });
    

    除了提供用户自定义的功能(map, reduce)之外,RichFunction 还提供了4个函数: open, close, getRuntimeContext, setRuntimeContext
    这对于参数化函数(passing parameters to function),创建和最终确定本地状态,访问广播变量(broadCast variables)以及访问Flink运行时的相关信息(Accumulators and Counters),迭代信息(iterations)都是很有用的。

    Supported Data Types

    Flink 对可以在DataSetDataStream中使用的元素类型设置了做了一些限制。原因是有助于Flink分析元素类型,从而高效的执行策略。
    Flink 支持以下7种不同的数据类型:

    1. Java Tuple 和 Scala Case classes.
    2. Java POJOs
    3. Primitive Type (原始数据类型)
    4. Regular Classes (普通类)
    5. Values
    6. Hadoop Writables
    7. Special Types.
    Tuples and Case Classes

    Tuple是一个包含固定字段、不同数据类型的复杂数据类型。Java API 提供了从 Tupl1到 Tupl25的实现。Tuple 的每个字段可以是任意的 Flink 类型,甚至是Tuple 类型,这样就形成了Tuple 嵌套的形式。 Tuple 中的字段可以直接使用字段名直接访问,如tuple.f4,或使用getter 方法访问,如tuple.getField(int position)。字段的索引值从0开始。注意: Java 中的 Tuple 的索引与 Scala 中的 Tuple 的索引不同,但是与 Java 中其他普通索引相一致。

    DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
        new Tuple2<String, Integer>("hello", 1),
        new Tuple2<String, Integer>("world", 2));
    
    wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
        @Override
        public Integer map(Tuple2<String, Integer> value) throws Exception {
            return value.f1;
        }
    });
    
    wordCounts.keyBy(0); // also valid .keyBy("f0")
    
    POJOs

    如果 Java 类或 Scala 类满足以下要求,Flink 会将他们视为特殊的 POJO数据类型:

    • 类必须声名为 public
    • 必须有一个无参的构造函数。
    • 所有字段要么是 public 的,要么提供了可供外界访问的 gettersetter方法。
    • Flink 已经注册的序列化程序必须支持字段的类型。

    POJOs 通常由一个PojoTypeInfo来表示,用PojoSerializer实现序列化。例外情况是当POJO是Avro类型(Avro特定记录)或通过“Avro反射类型”生成。这时用AvroTypeInfo来表示,用AvroSerializer来实现序例化。如果你有需要,也可以注册你自己实现的 serializer,请参考 Serialization 来获取更多的信息。
    Flink 分析 POJO 的类型结构,即了解 POJO 的字段。使用 POJO 类型作为结果比普通类型更容易使用。此外,Flink 处理 POJO 类型的效率也高于处于普通类型。

    public class WordWithCount {
    
        public String word;
        public int count;
    
        public WordWithCount() {}
    
        public WordWithCount(String word, int count) {
            this.word = word;
            this.count = count;
        }
    }
    
    DataStream<WordWithCount> wordCounts = env.fromElements(
        new WordWithCount("hello", 1),
        new WordWithCount("world", 2));
    
    wordCounts.keyBy("word"); // key by field expression "word"
    
    Primitive Types

    Flink 支持所有 Java 和 Scala 的原始数据类型: Integer, String, 和 Double 等。

    General Class Types

    Flink支持大多数 Java 和 Scala类(API 和自定义)。但 Flink限制使用包含无法序列化的字段的类,例如文件指针,I/O 流,或者其他 native resources。遵循 Java Bean 规范的类都可以很好的工作。
    所有不符合 POJO 类型的类都被 Flink 当做general class(能用类)来处理。Flink 以黑盒的方式来处理这些数据类型,并且不能访问他们的内容 (例如: 为了高效排序)。General types (通用类型) 类型需要使用序列化框架 Kryo来实现对象的序列化器/反序例化。

    Values

    值类型手动描述其序例化和反序例化。它们不使用通用的序例化框架,而是实现org.apache.flinktypes.Value接口,提供read 和 write方法实现序列化。当通用的序例化框架效率很低时,使用Value类型是非常有用的。例如,使用数组存储一个稀疏向量,我们知道数组的大部分元素值都是0,通过使用特定的编码只保存非0元素,而通用序列化框架只会简单把数组所有元素都序列化。

    org.apache.flinktypes.CopyableValue 接口以同样的方式支持手动内部克隆逻辑。

    Flink内部预定义了与基本数据类型相对应的 Values 类型 (Flink comes with pre-defined Value types that correspond to basic data types.)。这些值类型代表的是基本数据类型的可变变体: 这些 Values 类型是可以被修改的,允许用户程序重复使用,减少垃圾回收的压力。

    Hadoop Writables

    用户也可以使用实现了Hadoop org.apache.hadoop.Writable 接口的类型。write()方法定义序列化逻辑,readFields()方法定义反序列化逻辑。

    Type Erasure(类型擦除) & Type Inference

    注意: 本段只与Java有关
    Java 编译器在编译后会丢弃Java泛型信息。这就是著名的Java类型擦除。也就是说在运行时,对象实例已经不知道他的类型信息了。类如: DataStream<String> 的 实例 和 DataStream<Long> 的 实例,对JVM 来说是一样的。

    当Flink准备执行执行FlinkJob时,Flink需要知道这些对象的具体类型。Flink会尝试重建以各种形式丢弃的类型信息,并把这些类型显示的存储在数据集和算子Operator中。你可以通过DataStream.getType()接口获取类型。该方法返回一个TypeInformation类型的实例,这是Flink内部表述类型的方式。

    类型接口也有其局限性,在某些情况下需要程序员的"配合"。例如,通过"集合"创建数据集的方法 ExecutionEnvironment.fromCollection(),你可以传递一个类型描述参数。但是像MapFunction<I, O>能用函数就需要额外的类型信息。

    输入数据实现ResultTypeQueryable接口,通过函数的形式明确告诉API返回的具体类型。调用函数的输入类型通常能通过前一个函数的输出结果推测出来。

    Accumulators & Counters

    累加器结构比较简单,由累加操作和最终结果构成,当Flink-Job执行完成可获得累加器的执行结果。

    最简单的累加器是计数器: 程序员可以通过Accumulator.add(V value)方法对累加器累加。在Flink Job即将执行完时,Flink会合并所有分片的累加结果,把最终的累加结果发送给Client。计算器在debugging 或者 你想快速了解某些数据信息的时候非常有用。
    目前Flink内置了一些计数器。每个计算器都实现了 Accumulator 接口。

    • IntCounter, LongCounter and DoubleCounter: 参看下面的示例:
    • A histogram implementation for a discrete number of bins. 它内部是一个'整型'到'整型'的映射。你可以用它来统计数值的分布情况,例如: 在WordCount程序中统计每行包含的单词数。
    How to use accumulators:

    第1步: 你必须在想使用 累加器的函数中定义一个累加器。

    private IntCounter numLines = new IntCounter();
    

    第2步: 你必须注册创建的累加器,通常会在RichFunction的open()函数中注册创建的累加器,也可以为累加器增加一个名称。

    getRuntimeContext().addAccumulator("num-lines", this.numLines);
    

    这时,你可以在算子函数中使用注册的累加器,包括 open()close()函数。

    this.numLines.add(1);
    

    累加的最终结果会保存在JobExecutionResult对象中,JobExecutionResult对象是从execution environmentexecute()方法中返回的。
    每个作业的所有累加器共享一个命名空间。这样就可以在你的作业的不同算子中使用相同的累加器。

    关于累加器迭代器的注意事项: 目前累加器的结果只有在整个作业完成后才能获得。我们计划实现下次迭代中使用之前的迭代结果。你可以使用Aggregators计算每次迭代统计,并基于统计信息决定什么时候终止迭代。

    Custom accumulators:

    要实现自己的累加器,只需要简单的实现 Accumulator 接口。可以选择实现Accumulator or SimpleAccumulator

    相关文章

      网友评论

          本文标题:Flink__Flink1.10.0Basic API Conc

          本文链接:https://www.haomeiwen.com/subject/hsjgvktx.html