美文网首页
译:Flink---基本API概念

译:Flink---基本API概念

作者: 雪味伦调 | 来源:发表于2019-02-14 17:24 被阅读0次

    Google翻译

    Flink程序是实现分布式集合转换的常规程序(例如,过滤,映射,状态更新,联合,分组,窗口定义和聚合)。集合从数据源中初始化创建(例如通过读取文件,kafka主题或者从本地,内存集合中)。结果通过接收器返回,接收器可以将数据写入(分布式)文件或标准输出(例如,命令行终端)Flink程序在各种环境中运行,独立,或内嵌于其他程序中。它可以在本地JVM或多机器组成的集群中执行。

    取决于数据源的类型,即有界源和无界源,你可以使用DataSet API编写批处理程序,DataStream API编写流处理程序。本指南将介绍两种API共有的基本概念,但请参阅我们的流处理指南和批处理指南,了解有关使用每个API编写程序的具体信息。

    数据集和数据流


    Flink具有特殊类DataSet和DataStream来表示程序中的数据。您可以将它们视为可以包含重复项的不可变数据集合。在DataSet的情况下,数据是有限的,而对于DataStream,元素的数量可以是无界的。

    这些集合在某些关键方面与常规Java集合不同。首先,它们是不可变的,这意味着一旦创建它们就无法添加或删除元素。你也不能简单地检查里面的元素。

    最初通过在Flink程序中添加源来创建集合,并通过使用诸如map,filter等API方法对它们进行转换来从这些集合中派生新集合。

    Flink程序剖析


    Flink程序看起来像是转换数据集合的常规程序。每个程序包含相同的基本部分:

    1. 包含一个执行环境
    2. 加载/创建初始数据
    3. 指定数据的转换
    4. 指定放置计算结果的位置
    5. 触发程序执行

    我们现在将概述每个步骤,请参阅相应部分以获取更多详细信息。请注意,Java DataSet API的所有核心类都可以在org.apache.flink.api.java包中找到,而Java DataStream API的类可以在org.apache.flink.streaming.api中找到。

    StreamExecutionEnvironment是所有Flink程序的基础。您可以使用这些静态方法获取一个

    StreamExecutionEnvironment:
    
    getExecutionEnvironment()
    
    createLocalEnvironment()
    
    createRemoteEnvironment(String host, int port, String... jarFiles)
    

    通常,您只需要使用getExecutionEnvironment(),因为这将根据上下文做正确的事情: 如果您在IDE中执行程序或作为常规Java程序,它将创建一个本地环境,该环境将在本地计算机上执行您的程序。如果您从程序中创建了一个JAR文件,并通过命令行调用它,则Flink集群管理器将执行您的main方法,getExecutionEnvironment()将返回一个执行环境,用于在集群上执行您的程序。

    对于指定数据源,执行环境有几种方法可以使用各种方法从文件中读取:您可以逐行读取它们,CSV文件或使用完全自定义数据输入格式。要将文本文件作为一系列行读取,您可以使用:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    DataStream<String> text = env.readTextFile("file:///path/to/file");
    

    这将为您提供一个DataStream,然后您可以在其上应用转换来创建新的派生DataStream。
    您可以通过使用转换函数调用DataStream上的方法来应用转换。例如,地图转换如下所示:

    DataStream<String> input = ...;
    
    DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
        @Override
        public Integer map(String value) {
            return Integer.parseInt(value);
        }
    });
    

    这将通过将原始集合中的每个String转换为Integer来创建新的DataStream。
    一旦有了包含最终结果的DataStream,就可以通过创建接收器将其写入外部系统。这些只是创建接收器的一些示例方法:

    writeAsText(String path)
    
    print()
    

    一旦指定了完整的程序,就需要通过调用execute()来触发程序在StreamExecutionEnvironment上执行。根据ExecutionEnvironment的类型,将在本地计算机上触发执行或提交程序以在群集上执行。
    execute()方法返回一个JobExecutionResult,它包含执行时间和累加器结果。
    有关流数据源和接收器的信息,请参阅流指南,以及有关DataStream上支持的转换的更深入信息。
    有关批处理数据源和接收器的信息,请查看批处理指南,以及有关DataSet支持的转换的更深入信息。

    懒评估


    所有Flink程序都是懒惰地执行的:当执行程序的main方法时,数据加载和转换不会直接发生。而是创建每个操作并将其添加到程序的计划中。当执行环境上的execute()调用显式触发执行时,实际执行操作。程序是在本地执行还是在集群上执行取决于执行环境的类型。
    懒惰的评估使您可以构建Flink作为一个整体计划单元执行的复杂程序。

    指定键


    某些转换(join,coGroup,keyBy,groupBy)要求在元素集合上定义键。其他转换(Reduce,GroupReduce,Aggregate,Windows)允许数据在应用之前在键上分组。
    数据集分组:

    DataSet<...> input = // [...]
    DataSet<...> reduced = input
      .groupBy(/*define key here*/)
      .reduceGroup(/*do something*/);
    

    数据流通过指定键通过:

    DataStream<...> input = // [...]
    DataStream<...> windowed = input
      .keyBy(/*define key here*/)
      .window(/*window specification*/);
    

    Flink的数据模型不基于键值对。因此,无需将数据集类型物理打包到键和值中。键是“虚拟的”:它们被定义为实际数据上的函数,以指导分组操作符。
    在下面的讨论中,我们将使用DataStream API和keyBy。对于DataSet API,您只需要用DataSet和groupBy替换。

    元组定义键

    最简单的情况是在元组的一个或多个字段上对元组进行分组:

    DataStream<Tuple3<Integer,String,Long>> input = // [...]
    KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0)
    

    元组以第一个字段分组

    DataStream<Tuple3<Integer,String,Long>> input = // [...]
    KeyedStream<Tuple3<Integer,String,Long>,Tuple> keyed = input.keyBy(0,1)
    

    在这里,我们将元组分组在由第一个和第二个字段组成的复合键上。

    关于嵌套元组的注释:如果你有一个带有嵌套元组的DataStream,例如:

    DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
    

    指定keyBy(0)将使系统使用完整的Tuple2作为键(以Integer和Float为键)。如果要“导航”到嵌套的Tuple2中,则必须使用下面解释的字段表达式。

    使用字段表达式定义键

    您可以使用基于字符串的字段表达式来引用嵌套字段,并定义用于分组,排序,连接或coGrouping的键。
    字段表达式可以非常轻松地选择(嵌套)复合类型中的字段,例如Tuple和POJO类型。

    // some ordinary POJO (Plain old Java Object)
    public class WC {
      public String word;
      public int count;
    }
    DataStream<WC> words = // [...]
    DataStream<WC> wordCounts = words.keyBy("word").window(/*window specification*/);
    
    字段表达式语法:
    • 按字段名称选择POJO字段。例如,“user”指的是POJO类型的“user”字段。
    • 按字段名称或0偏移字段索引选择元组字段。例如,“f0”和“5”分别表示Java元组类型的第一和第六字段。
    • 您可以在POJO和Tuples中选择嵌套字段。例如,“user.zip”指的是POJO的“zip”字段,其存储在POJO类型的“user”字段中。持任意嵌套和混合POJO和元组,例如“f1.user.zip”或“user.f3.1.zip”。
    • 您可以使用“*”通配符表达式选择完整类型。这也适用于非Tuple或POJO类型的类型。

    字段表达式例子:

    public static class WC {
      public ComplexNestedClass complex; //nested POJO
      private int count;
      // getter / setter for private field (count)
      public int getCount() {
        return count;
      }
      public void setCount(int c) {
        this.count = c;
      }
    }
    public static class ComplexNestedClass {
      public Integer someNumber;
      public float someFloat;
      public Tuple3<Long, Long, String> word;
      public IntWritable hadoopCitizen;
    }
    

    这些是上面示例代码的有效字段表达式:

    • "count": WC类中的count字段。
    • "complex": 递归选择POJO类型ComplexNestedClass的字段复合体的所有字段。
    • "complex.word.f2": 选择嵌套Tuple3的最后一个字段。
    • "complex.hadoopCitizen": 选择Hadoop IntWritable类型

    使用Selector Function定义键

    定义键的另一种方法是“键选择器”功能。键选择器函数将单个元素作为输入并返回元素的键。键可以是任何类型,并且可以从确定性计算中导出。
    以下示例显示了一个键选择器函数,它只返回一个对象的字段:

    // some ordinary POJO
    public class WC {public String word; public int count;}
    DataStream<WC> words = // [...]
    KeyedStream<WC> keyed = words
      .keyBy(new KeySelector<WC, String>() {
         public String getKey(WC wc) { return wc.word; }
       });
    

    指定转换函数


    大多数转换都需要用户定义的函数。本节列出了如何指定它们的不同方法

    实现接口

    最基本的方法是实现一个提供的接口:

    class MyMapFunction implements MapFunction<String, Integer> {
      public Integer map(String value) { return Integer.parseInt(value); }
    };
    data.map(new MyMapFunction());
    
    匿名类

    您可以将函数作为匿名类传递:

    data.map(new MapFunction<String, Integer> () {
      public Integer map(String value) { return Integer.parseInt(value); }
    });
    
    Java 8 Lambdas

    Flink支持Lambdas

    data.filter(s -> s.startsWith("http://"));
    data.reduce((i1,i2) -> i1 + i2);
    
    Rich 函数

    需要用户定义函数的所有转换都可以将Rich函数作为参数。例如,不是这样:

    class MyMapFunction implements MapFunction<String, Integer> {
      public Integer map(String value) { return Integer.parseInt(value); }
    };
    

    你可以这样写:

    class MyMapFunction extends RichMapFunction<String, Integer> {
      public Integer map(String value) { return Integer.parseInt(value); }
    };
    

    并像往常一样将函数传递给map函数

    data.map(new MyMapFunction());
    

    Rich函数也可以定义为匿名类:

    data.map (new RichMapFunction<String, Integer>() {
      public Integer map(String value) { return Integer.parseInt(value); }
    });
    

    除了用户定义的函数(map,reduce等)之外,Rich函数还提供了四种方法:open,close,getRuntimeContext和setRuntimeContext。这些用于参数化函数(请参阅将参数传递给函数),创建和完成本地状态,访问广播变量(请参阅广播变量)以及访问运行时信息(如累加器和计数器)(请参阅累加器和计数器)以及有关信息的信息。迭代(参见迭代)。

    支持的数据类型


    Flink对DataSet或DataStream中可以包含的元素类型设置了一些限制。原因是系统分析类型以确定有效的执行策略。
    有六种不同类别的数据类型:

    1. Java Tuples 和 Scala Case 类
    2. Java POJOs
    3. 原始类型
    4. 常规类
    5. Hadoop Writables
    6. Special Types

    Tuples

    Tuples是包含固定数量的具有各种类型的字段的复合类型。 Java API提供从Tuple1到Tuple25的类。Tuples的每个字段都可以是包含更多Tuple的任意Flink类型,从而产生嵌套元组。可以使用字段名称tuple.f4直接访问Tuple的字段,或使用通用getter方法tuple.getField(int position).字段索引从0开始。请注意,这与Scala元组形成对比,但它与Java的一般索引更为一致。

    DataStream<Tuple2<String, Integer>> wordCounts = env.fromElements(
        new Tuple2<String, Integer>("hello", 1),
        new Tuple2<String, Integer>("world", 2));
    
    wordCounts.map(new MapFunction<Tuple2<String, Integer>, Integer>() {
        @Override
        public Integer map(Tuple2<String, Integer> value) throws Exception {
            return value.f1;
        }
    });
    wordCounts.keyBy(0); // also valid .keyBy("f0")
    

    POJOs

    如果满足以下要求,则Flink将Java和Scala类视为特殊的POJO数据类型:

    • 类为public
    • 无参的public 构造函数
    • 所有的字段必须有getter和setter方法
    • Flink必须支持字段的类型。目前,Flink使用Avro序列化任意对象(例如Date)。

    Flink分析POJO类型的结构,即它了解POJO的字段。因此,POJO类型比一般类型更容易使用。此外,Flink可以比一般类型更有效地处理POJO。

    public class WordWithCount {
    
        public String word;
        public int count;
    
        public WordWithCount() {}
    
        public WordWithCount(String word, int count) {
            this.word = word;
            this.count = count;
        }
    }
    
    DataStream<WordWithCount> wordCounts = env.fromElements(
        new WordWithCount("hello", 1),
        new WordWithCount("world", 2));
    
    wordCounts.keyBy("word"); // key by field expression "word"
    

    原始类型

    Flink支持所有Java和Scala原语类型,如Integer,String和Double。

    一般类型

    Flink支持大多数Java和Scala类(API和自定义)。限制适用于包含无法序列化的字段的类,如文件指针,I / O流或其他本机资源。遵循Java Beans约定的类通常可以很好地工作。
    所有未标识为POJO类型的类(请参阅上面的POJO要求)都由Flink作为常规类类型处理。Flink将这些数据类型视为黑盒子,并且无法访问其内容(即,用于有效排序)。使用序列化框架Kryo对常规类型进行反序列化。

    值类型手动描述其序列化和反序列化。它们不是通过通用序列化框架,而是通过使用读取和写入方法实现org.apache.flinktypes.Value接口来为这些操作提供自定义代码。当通用序列化效率非常低时,使用值类型是合理的。一个示例是将元素的稀疏向量实现为数组的数据类型。知道数组大部分为零,可以对非零元素使用特殊编码,而通用序列化只需编写所有数组元素。
    org.apache.flinktypes.CopyableValue接口以类似的方式支持手动内部克隆逻辑。
    Flink带有与基本数据类型对应的预定义值类型。(ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue)这些Value类型充当基本数据类型的可变变体:它们的值可以更改,允许程序员重用对象并从垃圾收集器中减轻压力。

    Hadoop Writables

    您可以使用实现org.apache.hadoop.Writable接口的类型。 write()和readFields()方法中定义的序列化逻辑将用于序列化。

    特殊类型

    您可以使用特殊类型,包括Scala的Either,Option和Try。 Java API有自己的自定义Either实现。与Scala的Either类似,它代表两种可能类型的值,左或右。两者都可用于错误处理或需要输出两种不同类型记录的运算符。

    类型擦除和类型推断

    仅针对Java
    Java编译器在编译后抛弃了大部分泛型类型信息。这在Java中称为类型擦除。这意味着在运行时,对象的实例不再知道其泛型类型。例如,DataStream <String>和DataStream <Long>的实例与JVM看起来相同。
    Flink在准备执行程序时(当调用程序的主要方法时)需要类型信息。Flink Java API尝试重建以各种方式丢弃的类型信息,并将其显式存储在数据集和运算符中。您可以通过DataStream.getType()检索类型。该方法返回TypeInformation的一个实例,这是Flink表示类型的内部方式。
    类型推断有其局限性,在某些情况下需要编程人员的“合作”。这方面的示例是从集合创建数据集的方法,例如ExecutionEnvironment.fromCollection(),你可以在其中传递描述类型的参数。但是像MapFunction <I,O>这样的通用函数也可能需要额外的类型信息。
    ResultTypeQueryable接口可以通过输入格式和函数来实现,以明确告知API有关其返回类型的信息。调用函数的输入类型通常可以通过先前操作的结果类型来推断。

    累加器和计数器


    累加器是具有添加操作和最终累积结果的简单构造,可在作业结束后使用。
    最直接的累加器是一个计数器:您可以使用Accumulator.add(V值)方法递增它。在工作结束时,Flink将汇总(合并)所有部分结果并将结果发送给客户。在调试过程中,或者如果你想快速了解有关数据的更多信息,累加器非常有用。
    link目前有以下内置累加器。它们中的每一个都实现了Accumulator接口。

    • IntCounter, LongCounter and DoubleCounter: 有关使用计数器的示例,请参见下文。
    • Histogram: 离散数量的箱的直方图实现。在内部,它只是一个从Integer到Integer的映射。你可以使用它来计算值的分布,例如字数统计程序的每行字数分布。
    如何使用累加器

    首先,您必须在要使用它的用户定义转换函数中创建累加器对象(此处为计数器)。

    private IntCounter numLines = new IntCounter();
    

    其次,你必须注册累加器对象,通常在rich function的open()方法中。在这里你还可以定义名称。

    getRuntimeContext().addAccumulator("num-lines", this.numLines);
    

    你现在可以在运算符函数中的任何位置使用累加器,包括open()和close()方法。

    this.numLines.add(1);
    

    整个结果将存储在JobExecutionResult对象中,该对象是从执行环境的execute()方法返回的(当前这仅在执行等待作业完成时才有效)。

    myJobExecutionResult.getAccumulatorResult("num-lines")
    

    所有累加器每个作业共享一个命名空间。因此,您可以在作业的不同操作函数中使用相同的累加器。 Flink将在内部合并所有具有相同名称的累加器。

    关于累加器和迭代的注释:目前累加器的结果仅在整个作业结束后才可用。我们还计划在下一次迭代中使前一次迭代的结果可用。您可以使用聚合器来计算每次迭代统计信息,并根据此类统计信息确定迭代的终止。

    自定义累加器

    要实现自己的累加器,只需编写Accumulator接口的实现即可。如果您认为您的自定义累加器应与Flink一起提供,请随意创建拉取请求。
    你可以选择实现Accumulator 或者 SimpleAccumulator
    Accumulator<V,R> 最便捷:它为要添加的值定义类型V,为最终结果定义结果类型R.例如。对于直方图,V是数字,R是直方图。SimpleAccumulator 适用于两种类型相同的情况,比如计数器

    相关文章

      网友评论

          本文标题:译:Flink---基本API概念

          本文链接:https://www.haomeiwen.com/subject/izmeeqtx.html