美文网首页Flink专题
Flink DataStream API编程指南

Flink DataStream API编程指南

作者: 尼小摩 | 来源:发表于2019-01-31 16:22 被阅读116次

    Flink中的DataStream程序是实现数据流转换的常规程序(例如:filtering, updating state, defining windows, aggregating)。数据流最初是从各种来源创建的 (例如 message queues, socket streams, files)。 结果通过sink返回, 通过sink可以将数据写入文件或者是标准输出(例如:命令行终端), Flink程序可以运行在各种上下文环境中,standalone 或者嵌入到其他程序中。可以在本地的JVM中运行,也可以在集群中运行。

    有关Flink API的基本概念,请参阅 basic concepts

    为了创建你自己的Flink DataStream程序,建议从剖析Flink程序开始,逐步添加您自己的stream transformations。其余部分请参考其他操作和高级特性章节。

    实例程序(Example Program)

    下面是一个完整的流式窗口word count应用程序,每个5秒会统计来自web socket中的数据。

    package com.demon.app
    
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.windowing.time.Time
    
    /**
      * @Auther: fc.w
      * @Date: 2019/1/31
      */
    object ExampleProgramApp {
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val text = env.socketTextStream("localhost", 9999)
    
        val counts = text.flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty))
          .map((_, 1))
          .keyBy(0)
          .timeWindow(Time.seconds(5))
          .sum(1)
    
        counts.print()
    
        env.execute("Window Stream WordCount")
      }
    
    }
    
    

    要运行示例程序,首先需要从终端启动输入流:

    nc -lk 9999
    

    仅需要输入一些单词然后点击回车键,就会输出到应用程序中。如果想看大于1的单词统计,在5秒内反复输入相同的单词(如果你输入不了那么快,可以增加时间窗口的长度)

    数据源(Data Sources)

    Sources是程序读取数据的入口,可以通过StreamExecutionEnvironment.addSource(sourceFunction) 添加一个source源到程序中。Flink中包含了已实现的源函数,但也可以通过实现SourceFunction 接口自定义非并行源函数,或者通过实现ParallelSourceFunction接口或继承RichParallelSourceFunction来扩展自定义并行源函数。

    Flink中有几个预定义的流源,可以StreamExecutionEnvironment访问:

    基于文件:
    • readTextFile(path): 逐行读取文本文件,遵守TextInputFormat规范的文件,并将其作为字符串返回。
    • readFile(fileInputFormat, path): 按照指定文件的输入格式读取(一次)文件。
    • readFile(fileInputFormat, path, watchType, interval, pathFilter): 这个方法是前两个的内部调用,根据给定的fileInputFormat读取路径中的文件。根据所提供的watchType(每隔多少ms)定期监控新数据的路径(fileprocessingmode.process_continue), 或者一次性处理当前路径中的数据并退出(FileProcessingMode.PROCESS_ONCE),使用pathFilter,用户可以进一步排除正在处理的文件。

    实现:

    在底层,Flink将文件读取过程分为两个子任务,即目录监控和数据读取。这些子任务都是单独实现的。监控是由一个单一的实现(并行度= 1)而非并行任务,数据读取是由多个并行运行的任务执行的。后者的并行度等于作业的并行度。 单个监控任务的作用是根据watchType定期或仅扫描一次来监控目录,扫描要处理的文件,将他们分割,并将这些split后的数据分配给下游读取器。每个split仅有一个读取器读取,而 a reader可以逐个地阅读多个拆分。

    注意事项:

    1. 如果watchType设置为FileProcessingMode.PROCESS_CONTINUOUSLY,当文件被修改时,它的内容将被完全重新处理。这将破坏“exactly-once”语义,因为在文件末尾追加数据将导致对其所有内容进行重新处理。

    2. 如果watchType设置为FileProcessingMode.PROCESS_ONCE,Source 文件扫描路径一次并退出,无需等待读取器完成对文件内容的读取。当然读取器将继续读取,直到读取所有文件内容。关闭Source将导致此后不再有checkpoints。这可能会导致节点失败后恢复速度变慢,因为作业将从最后一个检查点恢复读取。

    基于scoket:

    • socketTextStream -从socket读取。元素可以用分隔符分隔。

    基于Collection:

    • fromCollection(Seq):从Java.util.Collection创建数据流。集合中所有元素必须是相同的类型。
    • fromCollection(Iterator): 从迭代器创建数据流。该类指定迭代器返回的元素的数据类型。
    • fromElements(elements: _*): 从给定的对象序列中创建数据流。所有对象必须是相同的类型。
    • fromParallelCollection(SplittableIterator) :并行地从迭代器创建数据流, 该类指定迭代器返回的元素的数据类型。
    • generateSequence(from, to) :在给定的区间内并行生成数字序列。
    • addSource(): 附加一个新的源函数,例如,要从Apache Kafka读取数据,可以使用addSource(new FlinkKafkaConsumer08<>(…))。有关详细信息,请参见连接器

    DataStream Transformations

    关于可用的流转换,请参考operators

    Data Sinks

    Data sinks消费DataStreams然后转发给files, sockets, external systems和print。Flink提供了各种内置的输出格式,这些格式封装在DataStreams操作之后:

    • writeAsText() / TextOutputFormat: 以字符串的形式逐行写入元素。字符串是通过调用每个元素的toString()方法获得的。
    • writeAsCsv(...) / CsvOutputFormat :将元组写入以逗号分隔的value文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
    • print() / printToErr() :在标准输出/标准错误流上print每个元素的toString()值。还可以选择在输出之前增加prefix(msg)来帮助区分不同的打印调用。如果并行度大于1,输出还将加上生成输出的任务的标识符。
    • writeUsingOutputFormat() / FileOutputFormat: 方法和基类自定义文件输出,支持自定义对象到字节的转换。
    • writeToSocket: 根据SerializationSchema将元素写入Socket。
    • addSink: 调用自定义sink函数,Flink附带了到其他系统(如Apache Kafka)的连接器,这些连接器实现了sink函数。

    注意,DataStream上的write*()方法主要用于调试。他们没有参与Flink的检查点,这意味着这些函数通常具有at-least-once的语义。目标系统的数据刷新依赖于OutputFormat的实现。这意味着并非所有发送到OutputFormat的元素都立即显示在目标系统中。此外,在失败的情况下,这些记录可能会丢失。

    为了可靠、精确地将流一次传递到文件系统中,使用flink-connector-filesystem。另外,通过.addsink(…)方法的自定义实现可以参与Flink的检查点,实现精确的一次语义。

    Iterations

    迭代流程序实现一个步骤函数并将其嵌入IterativeStream中。因为一个DataStream程序可能永远不会结束,因此没有最大迭代次数。相反,您需要指定流的哪些部分反馈给迭代,哪部分使用split 转换或filter被转发到下游。在这里,我们有一个Iteration示例,其中主要部分(重复计算的部分)是一个简单的map()转换,通过使用filter将元素转发到下游。

    val iteratedStream = someDataStream.iterate(
      iteration => {
        val iterationBody = iteration.map(/* this is executed many times */)
        (iterationBody.filter(/* one part of the stream */), iterationBody.filter(/* some other part of the stream */))
    })
    

    例如下面的程序,它不断地从一系列整数中减去1,直到0:

    package com.demon.app
    
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.windowing.time.Time
    
    /**
      * @Auther: fc.w
      * @Date: 2019/1/31
      */
    object ExampleProgramApp {
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val someIntegers = env.generateSequence(0, 1000)
        val iteratedStream = someIntegers.iterate(iteration => {
          val minusOne = iteration.map(v => v - 1)
          val stillGreaterThanZero = minusOne.filter(_ > 0)
          val lessThanZero = minusOne.filter(_ <= 0)
          (stillGreaterThanZero, lessThanZero)
        })
        iteratedStream.print()
    
        env.execute("Window Stream WordCount")
      }
    
    }
    
    

    执行参数

    StreamExecutionEnvironment包含ExecutionConfig,为Job设置指定配置值。
    有关更多的参数说明,请参考 execution configuration这些参数主要适用于DataStream API:

    • setAutoWatermarkInterval(long milliseconds): 设置Watermark自动触发间隔,也可以通过getAutoWatermarkInterval()获取一个long值。

    容错

    State & Checkpointing描述如何启用和配置Flink的检查点机制。

    Controlling Latency(控制延迟)

    默认情况下,元素不会在网络上逐个传输(这会导致不必要的网络流量),而是被缓冲。缓冲区的大小(实际上是在机器之间传输的)可以在Flink配置文件中设置。虽然这种方法可以很好地优化吞吐量,但当传入流的速度不够快时,它可能会导致延迟问题。为了控制吞吐量和延迟,可以在执行环境(或单个操作符)上使用env.setBufferTimeout(timeoutMillis)设置缓冲区被填满的最大等待时间,在此之后,即使缓冲区没有满,也会自动发送缓冲区。此超时的默认值是100 ms。
    用法:

    val env: LocalStreamEnvironment = StreamExecutionEnvironment.createLocalEnvironment
    env.setBufferTimeout(timeoutMillis)
    
    env.generateSequence(1,10).map(myMap).setBufferTimeout(timeoutMillis)
    
    package com.demon.app
    
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.windowing.time.Time
    
    /**
      * @Auther: fc.w
      * @Date: 2019/1/31
      */
    object ExampleProgramApp {
    
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        /* 分别可以在执行环境和单个操作上使用 */ 
        // 执行环境
        env.setBufferTimeout(100)
    
        val someIntegers = env.generateSequence(0, 1000)
        val iteratedStream = someIntegers.iterate(iteration => {
          val minusOne = iteration.map(v => v - 1)
          val stillGreaterThanZero = minusOne.filter(_ > 0)
          val lessThanZero = minusOne.filter(_ <= 0)
          (stillGreaterThanZero, lessThanZero)
        })
        // 单个操作
        iteratedStream.setBufferTimeout(100)
    
        env.execute("Window Stream WordCount")
      }
    
    }
    
    

    为了最大限度地提高吞吐量,设置setBufferTimeout(-1),它将移除超时,缓冲区只有在满时才刷新。
    为了最小化延迟,将超时设置为接近0的值(例如5或10 ms)。应该避免缓冲区超时为0,因为这会导致严重的性能下降。

    故障排除(Debugging)

    在分布式集群中运行流程序之前,最好确保所实现的算法能够正常工作。因此,数据分析程序通常是检查结果、调试和改进的增量过程。

    Flink提供了简化数据分析程序开发过程的特性。支持IDE中的本地调试、测试数据的注入和结果数据的收集。
    本节给出了一些简化Flink程序开发示例。

    本地执行环境(Local Execution Environment)

    LocalStreamEnvironment在它创建的JVM进程中启动Flink系统。如果从IDE启动LocalEnvironment,可以在代码中设置断点并轻松调试程序。
    创建并使用LocalEnvironment如下所示:

    val env = StreamExecutionEnvironment.createLocalEnvironment()
    
    val lines = env.addSource(/* some source */)
    // build your program
    
    env.execute()
    

    收集数据来源(Collection Data Sources)

    Flink提供了由Java集合支持的特殊数据源,以简化测试。一旦对程序进行了测试,sources和sinks就可以轻松地替换或读写外部系统的sources和sinks。

    采集数据源的使用方法如下:

    val env = StreamExecutionEnvironment.createLocalEnvironment()
    
    // Create a DataStream from a list of elements
    val myInts = env.fromElements(1, 2, 3, 4, 5)
    
    // Create a DataStream from any Collection
    val data: Seq[(String, Int)] = ...
    val myTuples = env.fromCollection(data)
    
    // Create a DataStream from an Iterator
    val longIt: Iterator[Long] = ...
    val myLongs = env.fromCollection(longIt)
    

    注意:当前,集合数据源要求数据类型和迭代器实现Serializable。此外,收集数据源不能并行执行(并行度= 1)。

    迭代器数据接收(Iterator Data Sink)

    Flink还提供了一个接收器,用于DataStream结果的收集和测试。它可以使用如下:

    import org.apache.flink.streaming.experimental.DataStreamUtils
    import scala.collection.JavaConverters.asScalaIteratorConverter
    
    val myResult: DataStream[(String, Int)] = ...
    val myOutput: Iterator[(String, Int)] = DataStreamUtils.collect(myResult.javaStream).asScala
    

    注意:Flink 1.5.0中删除了Flink-streaming-contrib模块。它的类已经被转移到flink-streaming-java和flink-streaming-scala中。

    接下来:

    操作符(Operators): 可用流操作符的规范。
    事件时间(Event Time): Flink的时间概念介绍。
    状态和容错(State & Fault Tolerance): 说明如何开发有状态应用程序。
    连接器(Connectors): 可用输入和输出连接器的描述。

    相关文章

      网友评论

        本文标题:Flink DataStream API编程指南

        本文链接:https://www.haomeiwen.com/subject/zgtlsqtx.html