美文网首页
Flink Basic API Concepts

Flink Basic API Concepts

作者: MiyoungCheng | 来源:发表于2017-07-10 10:22 被阅读0次

    Basic API Concepts

    Flink程序是实现基于分布式采集的转换程序(如:过滤器,映射,更新状态,连接,分组,定义窗口,聚合计算),

    集合最初从源头创建(读取文件,kafka, 本地,内存),最终的计算结果通过sinks返回,例如:把数据写入分布式文件,标准的输出(如:终端的命令行输出),flink运行在各种各样的环境中,独立运行,或嵌入到其他程序中。可以运行在本地的JVM中,或者包含有许多机器的集群中。

    根据数据源的类型(有边界数据源与无边界数据源),你可以写批处理(batch)程序或者数据流程序 ,写批处理程序用DataAPI,写数据流程序用DataStream程序,这些指南将介绍两种API常见的基本概念,但请参阅我们的数据流指南和批处理指南,以了解关于每个API编写程序的具体信息。

    注意:当在实际情况中如何使用API的例子中,我们将使用流数据执行环境(StreamingExecutionEnvironment)和流数据API。在使用DataSetAPI中,概念是相同的,只是需要执行环境(ExecutionEnvironment)和接口(DataSet)变化下。

    DataSet and DataStream

    Flink在处理数据是由有两个特殊的类DataSet、DataStream,你可以把他们看作不能改变并且包含重复的数据,在DataSet的情况下数据是有边界的,而对于DataStream来说数据的数量是无边界的。

    这些集合(collections)不同于传统的JAVA的集合(collections)在某些关键方面。首先他们是不可变的,一旦创建就不能添加和移除,你也不能简单的检查里面的元素。

    集合最初是通过在Flink的程序中添加一个源来创建的,通过使用如map、filter之类的API方法对源集合进行转换,从而派生出新的集合。

    Anatomy of a Flink Program

    Flink的程序程序看起来像普通的程序,可以改变数据的收集。每个程序有相同的基础部分组成:

    1.创建一个执行环境。

    2.加载(load)/创建初始数据

    3.指定对数据的转换方法。

    4.指定在什么地方放置计算的结果

    5.触发程序执行

    现在我们将对每一个步骤做一个概述,请参考相应的部分以获得更多的详细信息。注意所有的Java DataSet API的核心类能够被找到在org.apache.flink.api.java,同时Java DataStream API 的核心类在能被找到在org.apache.flink.streaming.api

    流的执行环境(StreamExecutionEnvironment)是所有Flink程序的基础。你可以获得一个使用这些StreamExecutionEnvironment静态方法:

    getExecutionEnvironment()

    createLocalEnvironment()

    createRemoteEnvironment(String host,intport,String...jarFiles)

    通常情况下,你仅会用到getExecutionEnvironment(),因为该方法会根据环境做一些正确的事情:如果您在IDE中或常规Java程序中执行你编写的程序,该将创建一个本地环境,该环境将在本地机器上执行你所编写的程序。如果你创建的是JAR程序,并且调用jar程序在命令行,Flink集群将执行你的main方法并且getExecutionEnvironment()将返回一个执行环境来执行你的程序在集群上。

    对于指定数据源,执行环境有几个方法可以使用各种各样的方式读取文件:可以逐行读取 CVS文件,使用完全自定义的数据输入格式。只需将文本文件作为序列读取,你可以使用:

    finalStreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();DataStreamtext=env.readTextFile("file:///path/to/file");

    这将为您提供一个数据流,然后您可以应用转换来创建新的派生数据流。

    您可以使用转换函数调用数据流的方法来实现转换。例如:下面这样Map 转换:

    DataStreaminput=...;

    DataStreamparsed=input.map(newMapFunction(){

    @OverridepublicIntegermap(String value){returnInteger.parseInt(value);}});

    这将创建新的数据流,把原始数据中每个String类型转化为Integer。

    一旦你获得一个最终结构的数据流,你能够写出结果到外部通过创建一个sink,下面是一个创建Sink的例子:

    writeAsText(String path)print()

    一旦您指定了完整的程序,您就需要通过调用StreamExecutionEnvironment方法的上的execute()触发程序执行,基于执行环境的类型,判断是在本地执行,还是提交到集群中执行。

    execute()方法返回一个JobExecutionResult结果,它包括一个执行时间和累加结果.

    请看数据流指南,了解关于数据流和Sink信息,以及关于数据流支持转换的更多深度信息。

    请参阅批处理指南,了解关于批处理数据来源和Sink信息,并了解关于数据集的支持转换的更多深度信息。

    Lazy Evaluation

    所有的Flink程序都是被惰性地执行的,当程序的main方法被执行时,数据加载和转换不会立即发生。而是每个操作被创建和添加到程序计划中,只有在执行环境中的execute()被显式调用时,操作才会实际上是执行。然而程序是在本地执行还是在集群上执行是基于执行环境的类型。

    lazy evaluation能够让你创建基于Flink作为整体的执行单元复杂的程序。

    Specifying Keys

    ,一些转换(join,coGroup,keyBy,groupBy)需要一个Key被定义基于数据集的项。其他的转换(Reduce,GroupReduce,Aggregate,Windows)允许在基于Key进行分组在被应用前

    一个DataSet被分组的代码:

    DataSet<...>input=// [...]DataSet<...>reduced=input.groupBy(/*define key here*/).reduceGroup(/*do something*/);

    在使用数据流时可以指定一个键

    DataStream<...>input=// [...]DataStream<...>windowed=input.keyBy(/*define key here*/).window(/*window specification*/);

    Flink的数据模型不是基于键值对的。因此你不需要物理的把数据弄成键值对,Keys是抽象的,它们被定义为用于指导分组操作符的实际数据的函数。

    在接下来的讨论中,我们将用到DataStreamAPI和keyBy,对于DataSet API你仅需要替换DataSet和groupBy

    Define keys for Tuples

    最简单的实例进行分组Tuples基于一个或多个字段上分组Tuple:

    KeyedStream,Tuple>keyed=input.keyBy(0)

    tuples被分组基于第一个字段

    DataStream>input=// [...]KeyedStream,Tuple>keyed=input.keyBy(0,1)

    我们分组Tuples基于组合的Key,第一个字段和第二个字段。

    嵌套Tuples:如果你有一个数据流的嵌套tuple像下面一样

    DataStream,String,Long>>ds;

    指定keyBy(0)将导致系统使用整体Tuple2作为key(使用Integer和Float作为Key),如果你想““导航””到内嵌的Tuple2您必须使用下面的字段表达式进行解释

    Define keys using Field Expressions

    可以使用String-based字段表达式来引用嵌套的字段并且定义Key用来grouping,sorting,joining,coGrouping。字段表达式可以很容易地选择(嵌套)复合类型的字段,比如Tuple和POJO类型。

    在下面的示例中,我们有一个WC POJO,它有两个字段“word”和“count”。按照字段词来分组,我们只是将它的名称传递给keyBy()函数。

    // some ordinary POJO (Plain old Java Object)publicclassWC{publicString word;publicintcount;}DataStreamwords=// [...]DataStreamwordCounts=words.keyBy("word").window(/*window specification*/);

    字段表达式语法:

    1.选择POJO字段根据字段的名字。例如,“user”指的是POJO类型的“user”字段。

    2.通过字段名或0-偏移字段索引来选择Tuple字段,例如,“f0”和“5”分别指的是Java Tuple类型的第一个和第六个字段。

    3.您可以在pojo和Tuples中选择嵌套的字段。例如:"user.zip"指的是zip被存在一个POJO类型为"user"字段中,任何内嵌和混合的POJO和Tuples 支持 如:"f1.user.zip"和"user.f3.1.zip".这种形式。

    4.您可以使用“*”选择完整的类型,这也适用于不是Tuples或POJO类型的类型。

    字段表达式例子:

    publicstaticclassWC{publicComplexNestedClass complex;//nested POJOprivateintcount;// getter / setter for private field (count)publicintgetCount(){returncount;}publicvoidsetCount(intc){this.count=c;}}publicstaticclassComplexNestedClass{publicInteger someNumber;publicfloatsomeFloat;publicTuple3word;publicIntWritable hadoopCitizen;}

    这些是上面示例代码的有效字段表达式:

    ."count": 在WC类中的count字段。

    "complex":递归地选择POJO类型复杂的字段的所有字段。

    "complex.word.f2":选择嵌套的Tuple3的最后一个字段。

    "complex.hadoopCitize":选择Hadoop IntWritable类型。

    Define keys using Key Selector Functions

    定义键的另一种方法是“key selector”函数。一个key选择器函数将一个元素作为输入,并返回元素的key。key可以是任意类型的,可以从任意计算中派生出来。

    下面的示例显示了一个键选择函数,它简单地返回一个对象的字段:

    // some ordinary POJOpublicclassWC{publicString word;publicintcount;}DataStreamwords=// [...]KeyedStreamkyed=words.keyBy(newKeySelector(){publicStringgetKey(WC wc){returnwc.word;}});

    Specifying Transformation Functions

    大多数转换需要用户自定义方法。本节列出了如何指定它们的不同方式

    Implementing an interface

    最基本的方法是实现一个提供的接口。

    classMyMapFunctionimplementsMapFunction{publicIntegermap(String value){returnInteger.parseInt(value);}});data.map(newMyMapFunction());

    Anonymous classes

    您可以作为匿名类传递一个函数:

    data.map(newMapFunction(){publicIntegermap(String value){returnInteger.parseInt(value);}});

    Java 8 Lambdas

    Flink还在Java API中支持Java 8。请参阅完整的Java 8指南。

    data.filter(s->s.startsWith("http://"));

    data.reduce((i1,i2)->i1+i2);

    Rich functions

    所有需要用户定义函数的转换都可以将其作为一个富函数。,例如: 替换

    classMyMapFunctionimplementsMapFunction{publicIntegermap(String value){returnInteger.parseInt(value);}});

    你写的

    classMyMapFunctionextendsRichMapFunction{publicIntegermap(String value){returnInteger.parseInt(value);}});

    然后像往常一样将方法传递给映射转换:

    data.map(newMyMapFunction());

    富函数也可以定义为一个匿名类:

    data.map(newRichMapFunction(){publicIntegermap(String value){returnInteger.parseInt(value);}});

    除了用户自定义的方法(map, reduce等)富函数还提供了四个方法:open,close,getRuntimeContext,setRuntimeContext

    这些对于参数化函数(seePassing Parameters to Functions)、创建和最终的局部状态,访问broadcast变量,存储如累加器和计数器之类的运行时信息(seeAccumulators and Counters)),以及迭代的信息(seeIterations),都是很有用的。

    Supported Data Types

    Flink对可能在DataSet或DataStream中的元素类型进行了一些限制。这样做的原因是系统分析这些类型来决定有效的执行策略

    有六种不同类型的数据类型:

    1.Java Tuples和Scala Case Classes

    2.Java POJOs

    3.Primitive Types

    4.Regular Classes

    5.Values

    6.Hadoop Writables

    7.Special Types

    Tuples and Case Classes

    Tuples包含有不同类型的固定数量的字段的复合类型。Java API提供了从Tuple1到Tuple25的类。Tuples的每个字段都可以是任意的Flink类型,包括进一步的Tuple,结果是嵌套Tuple。可以使用字段的名称直接访问像tuple.f4,或者使用通用的getter方法tuple.getField(int position),字段开始于0,这与Scala的Tuples形成了对比,但是它更符合Java的一般索引。

    DataStream>wordCounts=env.fromElements(newTuple2("hello",1),newTuple2("world",2));wordCounts.map(newMapFunction,Integer>(){@OverridepublicIntegermap(Tuple2value)throwsException{returnvalue.f1;}});wordCounts.keyBy(0);

    POJOs

    Java和Scala类被Flink当作一种特殊的POJO数据类型,如果它们满足以下要求:

    1.类必须是public

    2.它必须有一个没有参数的公共构造函数(默认构造函数)

    3.所有字段要么是public,要么必须通过getter和setter函数访问。对于一个名为foo的字段,getter和setter方法必须命名为getFoo()和setFoo()。

    4.一个Field类型必须由Flink支持。目前,Flink使用Avro来序列化任意对象(比如日期)。

    Flink分析了POJO类型的结构。它学习了一个POJO的字段。因此,POJO类型比一般类型更容易使用。此外,Flink可以比一般类型更有效地处理POJO。

    publicclassWordWithCount{publicString word;publicintcount;publicWordWithCount(){}publicWordWithCount(String word,intcount){this.word=word;this.count=count;}}

    DataStreamwordCounts=env.fromElements(newWordWithCount("hello",1),newWordWithCount("world",2));wordCounts.keyBy("word");// key by field expression "word"

    Primitive Types

    Flink支持所有的Java和Scala基本类型,比如Integer,String, andDouble.

    General Class Types

    Flink支持大多数Java和Scala类(API和自定义)。限制使用包含不能序列化字段的类,比如文件指针、输入/输出流或其他本地资源,

    遵循Java bean约定的类通常工作得很好。

    所有没有被确定为POJO类型的类(参见上面的POJO需求)都是由Flink作为一般类类型处理的。Flink将这些数据类型视为黑盒子,无法访问他们的内容(即:有效的排序)。一般类型使用Kryo的序列化框架来进行/序列化。

    Values

    值类型可以手动描述它们的序列化和反序列化。他们没有使用通用的序列化框架,而是通过实现org.apache.flinktype来为这些操作提供定制代码。使用读取和写入的方法的值接口,使用值类型是合理的,因为一般的序列化是非常低效的。例如,一个数据类型实现了作为数组的元素的稀疏向量。由于知道数组大部分为零,所以可以为非零元素使用特殊的编码,而一般的序列化则只需要编写所有的数组元素。

    org.apache.flinktypes.CopyableValue接口以类似的方式支持手动的内部克隆逻辑。

    Flink带着与基本数据类型对应的预定义值类型。(ByteValue,ShortValue,IntValue,LongValue,FloatValue,DoubleValue,StringValue,CharValue,BooleanValue)。这些值类型充当基本数据类型的可变变体:它们的值可以被修改,允许程序员重用对象并从垃圾收集器中释放压力。

    Hadoop Writables

    您可以使用实现org.apache.hadoop的类型。可写接口。在write()和readFields()方法中定义的序列化逻辑将用于序列化。

    Special Types

    您可以使用特殊类型,包括Scala的选项、选项和尝试。Java API也有自己的自定义实现。与Scala类似,它代表了一种两种可能类型的值,左或右。对于需要输出两种不同类型记录的错误处理或操作符,这两种方法都很有用。

    Type Erasure & Type Inference

    注意:本节只与Java相关。

    在编译之后,他的Java编译器会抛出大量的泛型类型信息。这在Java中被称为类型擦除。这意味着在运行时,对象的实例不再知道它的泛型类型。例如,对于JVM来说,DataStream和DataStream的实例看起来是一样的,Flink在准备执行程序的时候需要类型信息(当程序的主要方法被调用时)。Flink Java API试图重构以各种方式抛出的类型信息,并将其显式地存储在数据集和操作符中。您可以通过数据ream.gettype()检索类型。这个方法返回一个类型信息的实例,这是Flink的内部方式来表示类型。

    类型推断有其局限性,在某些情况下需要程序员的“合作”。示例集合创建数据集的方法,如ExecutionEnvironment.fromCollection(),在那里你可以传递一个参数描述类型。但是,像MapFunction小于I这样的泛型函数可能需要额外的类型信息。

    可以通过输入格式和函数来实现结果ttypequeryable接口,从而明确地告诉API它们的返回类型。函数调用的输入类型通常可以通过前一个操作的结果类型来推断。

    Accumulators & Counters

    累加器是一个简单的构造,有一个添加操作和一个最终积累的结果,在作业结束后可用。

    最简单的累加器是一个counter:你可以增加值通过Accumulator.add(V value)方法。在工作结束时,Flink将总结(合并)所有的部分结果,并将结果发送给客户。在调试过程中,积累器是有用的,或者如果您想要了解更多关于您的数据的信息。

    Flink目前有以下内置的累加器。每个都实现了累加器接口。

    IntCounter,LongCounterandDoubleCounter:下面是使用计数器的示例

    Histogram:一个用于离散数量的容器的直方图实现。在内部,它只是一个从整数到整数的映射。您可以使用它来计算值的分布,例如,一个单词计数程序的每一行字的分布。

    How to use accumulators:

    首先,您必须在用户定义的转换函数中创建一个累加器对象(这里是一个counter),您想要使用它。

    privateIntCounter numLines=newIntCounter();

    其次,必须注册累加器对象,通常在富函数的open()方法中,这里还定义了名称。

    getRuntimeContext().addAccumulator("num-lines",this.numLines);

    您现在可以在操作符函数的任何地方使用累加器,包括open()和close()方法。

    this.numLines.add(1);

    整个结果将存储在JobExecutionResult对象中,该对象从执行环境的execute()方法中返回(当前只有当执行等待作业完成时才会工作)。

    myJobExecutionResult.getAccumulatorResult("num-lines")

    所有的Accumulator都在每个作业中共享一个名称空间。因此,您可以在不同的操作符函数中使用相同的Accumulator。Flink将在内部将所有的Accumulator都合并在一起。

    关于accumulators和iterations的注意:目前,accumulators的结果只有在整个job结束后才可用。我们还计划在下一次迭代中使用前一个迭代的结果。您可以使用Aggregators来计算每次迭代的统计数据,并根据这些统计数据来终止迭代。

    Custom accumulators:

    要实现你自己的accumulators,你只需写下你的累加器接口的实现。如果你认为你的定制累加器应该和Flink一起运送,就可以创建一个拉请求。

    您可以选择实现累加器或simple累加器。

    Accumulator是最灵活的:它定义一个V的值来添加值,一个结果类型R用于最终的结果。对于直方图,V是一个数字,而R是一个直方图。SimpleAccumulator适用于两种类型相同的情况,例如用于计数器。

    相关文章

      网友评论

          本文标题:Flink Basic API Concepts

          本文链接:https://www.haomeiwen.com/subject/esdyhxtx.html