美文网首页
Flink-Streaming-overview

Flink-Streaming-overview

作者: 耳边的火 | 来源:发表于2019-04-29 12:34 被阅读0次

Flink中的流应用就是在数据流上应用各种转化(如:filter,update state,difine window,aggregation)。数据流有各种数据源创建而来(如:消息队列,socket流,文件等)。结果输出到sink,如写入文件或者标准输出。Flink程序可以在多种上下文中运行,standalone,内置在其他应用中等。应用可以在本地JVM中执行,也可以在集群的许多机器中执行。

示例程序


下面的程序是一个完成的应用,它演示了如何在web soscke上使用window统计5秒内的字数。你可以复制代码然后在你本地运行。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}

运行应用前,先使用 netcat 在命令行中开启输入流:

nc -lk 9999

输入一些单词就会返回新的结果。这些单词或作为字数统计应用的输入。如果你想看到统计值大于1,可以在5秒内一遍又一遍的输入相同的单词(如果你做不到,可以增加window的大小)

数据源 Data Source


Source指的是你的程序从哪里读取它的输入。你可以使用 StreamExecutionEnvironment.addSource(sourceFunction)在你的程序中添加数据源。Flink自带了一些实现好的数据源函数,淡然你可以实现 SourceFunction 来实现自定义的非并行的source或者实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 来实现并行的source。
StreamExecutionEnvironment有一些实现定义好的数据源方法:

基于文件的数据源:

  • readTextFile(path) - 读取text文件。也就是说使用 TextInputFormat 一行一行的读取数据。
  • readFile(fileInputFormat,path) - 使用给定的 input format读取文件
  • readFile(fileInputFormat,path,watchType,interval,pathFilter,typeInfo) - 这个方法在flink内部,被上面的两个方法所调用。它使用给定的fileInputFomat读取path中的文件。根据 watchType 的值,数据源会定期(interval 毫秒)监控path中的新数据(FileProcessingMode.PROCESS_CONTINUOUSLY),或者仅对当前path下的文件进行一次处理,然后退出(FileProcessingMode.PROCESS_ONCE)。使用 pathFilter ,用户可以排除不想要处理的文件。
    实现:
    在内部,Flink将读取文件分为两个子任务,分别叫做 目录监控 与 数据读取。每个任务都是单独运行的。目录监控是一个单线程的任务,而数据读取任务可以是多线程的并发任务。数据读取任务的并发度取决于job的并发度。目录监控的功能在于定期监控目录,发现需要被处理的文件,将它们分片 split 然后指定分片给下游的reader。reader会进行实际读取数据的操作。每一个split仅会被一个reader读取,而一个reader可能会读取多个split(依次读取)。
    重要说明:
    1.如果 watchType 设置为 FileProcessingMode.PROCESS_CONTINUOUSLY,当文件被修改后,它的内容会被全部重新进行处理。这会破坏“精确一次”的语义,因为向文件中追加数据,会导致整个文件进行重新处理。
    2.如果 watchType 设置为 FileProcessingMode.PROCESS_ONCE,source只会扫描path一次然后退出,而不会等到reader读取所有数据完毕后再退出。当然,reader会继续进行数据读取,直到所有文件内容都读取完毕。关闭source会导致之后不会再有checkpoint。这将导致故障恢复时,需要等待更长的时间,因为job会从上次checkpoint处会进行重新读取数据。

基于socket:

  • socketTextStream - 从socket读取数据。数据可以被 分隔符delimiter 分隔开

基于集合:

  • fromCollection(Collection) - 从java集合中创造数据流。所有集合中的数据必须是同样的类型
  • fromCollection(Iterator,Class) - 从iterator中创造数据流。class参数指定了iterator返回的数据的类型
  • fromElements(T ...) - 从给定的对象序列中创造数据流。所有对象必须是相同的类型
  • fromParallelCollection(SplitableIterator , Class) - 从iterator中并行的创造数据流。class参数指定了iterator返回的数据的类型
  • generateSequence(from,to) - 使用给定的interval并行的生成数字序列

自定义:

  • addSource - 使用source function。如,从Kafka中读取数据,你可以使用 addSource(new FlinkKafkaConsumer08<>(...)).

DataStream Transformations


查阅 operator 文档

Data Sink


Data Sink读取数据流,并将它们写入到file,socket,其他系统或者打印它们。Flink自带了一些output format,它们被封装到一些操作符中:

  • writeAsText() / TextOutputFormat - 将数据作为一整行string,写入文件。通过调用数据的toString方法
  • writeAsCsv(...) / CsvOutputFormat - 将 tuple 以逗号分隔,写入文件。行与行以及field之间的分隔符可以自定义。每一个field的值,是通过调用toString方法获取的
  • pring() / pringToErr() - 将数据的toString方法的值打印到口红纸条。可以选择前缀,在打印输出内容前先打印前缀。这能够区分不同的print的内容。如果并发度大于1,输出同样会打印一个task的标识符。
  • writeUsingOutputFormat() / FileOutputFormat - 使用自定义文件输出的基类与方法。支持自定义的 对象-字节 的转化。
  • writeToSocket - 根据 SerializationSchema 将数据写入socket
  • addSink - 调用传入的自定义 sink function。Flink通过实现 sink function可以与其他系统连接起来(如kafka)

注意的是 write*() 方法主要用于调试的目的。它们没有参与flink的checkpoint过程,这就意味着使用这些函数是“at-least-once”至少一次语义。数据如何写入目标系统是由OutputFormat决定的,也就是说发送到OutputFormat的数据并不一定会立即写入目标系统(如批量写入情况)。因此,在遇到故障时,这些数据有可能会丢失。
为了稳定地,精确一致的将流数据写入问加你系统,建议使用 flink-connector-filesystem。当然,如果自定义了sink function,通过 addSink 添加该自定义的sink,也可以参与flink的checkpoint过程,保持 exactly-once 语义。

Iterator


迭代流程序实现了step function,并且内置在 IterativeStream中。由于DataStream程序可能不会停止,因此iteration中不会有最大数量限制。你需要定义流中的哪些数据需要继续迭代,哪些数据可以发送到下游的操作符,你可以使用split或者filter实现。下面我们使用 filter 来演示。首先,我们定义一个 IterativeStream :

IterativeStream<Integer> iteration = input.iterate();

然后,我们定义在循环中,需要对数据流做哪些操作(下面我们就简单的使用map作为演示)

DataStream<Integer> iterationBody = iteration.map(/* this is executed many times */);

为了定义迭代器何时关闭,可以调用 IterativeStream 的 closeWith(feedbackStream) 方法。传入 closeWith() 的数据流会再次进入迭代器,放到迭代器的head。一个常用的模式是,使用filter将流的一部分重新放入迭代器,而另一部分下发到下游操作符这些filter可以定义“终止”的逻辑,也就是一个数据可以不再进入迭代器,而是被转发到下游操作符。

iteration.closeWith(iterationBody.filter(/* one part of the stream */));
DataStream<Integer> output = iterationBody.filter(/* some other part of the stream */);

例如,下面的程序就是对数据进行减1操作,直到为0:

DataStream<Long> someIntegers = env.generateSequence(0, 1000);

IterativeStream<Long> iteration = someIntegers.iterate();

DataStream<Long> minusOne = iteration.map(new MapFunction<Long, Long>() {
  @Override
  public Long map(Long value) throws Exception {
    return value - 1 ;
  }
});

DataStream<Long> stillGreaterThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value > 0);
  }
});

iteration.closeWith(stillGreaterThanZero);

DataStream<Long> lessThanZero = minusOne.filter(new FilterFunction<Long>() {
  @Override
  public boolean filter(Long value) throws Exception {
    return (value <= 0);
  }
});

Execution Parameter 执行参数


StreamExecutionEnvironment 包括 ExecutionConfig ,它允许设置运行时所需的job配置。
请参阅 execution configuration 获取更多参数的解释。下面的参数仅属于 DataStream API:

  • setAutoWatermarkInterval(long milliseconds) : 设置 watermark 发射的间隔。你可以公共 long getAutoWatermartkInterval() 获取当前的值。

故障容忍

查阅 State & Checkpointing

控制延迟


默认情况下,数据在网络间传输时,并不是一个一个的传输(造成不必要的网络拥堵),而是缓存后一起传输。buffer的大小可以在Flink 的配置文件中配置。尽管这种方式可以优化吞吐率,但是当输入流的速度不够快时,会造成延迟问题。为了平衡吞吐率和延迟,你可以使用 env.setBufferTimeout(timeoutMillis) 来设置最大等待时间。超过这个时间后,即便buffer没有填满,也要发出去。默认值为100ms。

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setBufferTimeout(timeoutMillis);

env.generateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);

为了最大化吞吐率,设置 setBufferTimeout(-1) 会移除超时设置,仅当buffer填满后才发送。为了最小化延迟,设置超时的值接近0(如 5 或 10 毫秒)。应该避免设置值为0,因为这会导致服务性能下降。

调试 Debugging

在提交任务到分布式集群运行前,最好确认程序可以按预期运行。因此,实现一个数据分析应用,通常是一个增量的过程:检查结果,调试,优化。
Flink提供了本地IDE调试的功能,简化了数据分析应用的开发。包括加载测试数据,收集结果数据。这一部分会显示如何简化flink程序的开发,便于测试调试程序。

本地运行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

DataStream<String> lines = env.addSource(/* some source */);
// build your program

env.execute();
加载测试数据
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

// Create a DataStream from a list of elements
DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5);

// Create a DataStream from any Java collection
List<Tuple2<String, Integer>> data = ...
DataStream<Tuple2<String, Integer>> myTuples = env.fromCollection(data);

// Create a DataStream from an Iterator
Iterator<Long> longIt = ...
DataStream<Long> myLongs = env.fromCollection(longIt, Long.class);

注意:需要提供数据烈性,iterator要实现 Serializable。不能并发执行

迭代Sink
import org.apache.flink.streaming.experimental.DataStreamUtils

DataStream<Tuple2<String, Integer>> myResult = ...
Iterator<Tuple2<String, Integer>> myOutput = DataStreamUtils.collect(myResult)

相关文章

  • Flink-Streaming-overview

    Flink中的流应用就是在数据流上应用各种转化(如:filter,update state,difine wind...

网友评论

      本文标题:Flink-Streaming-overview

      本文链接:https://www.haomeiwen.com/subject/ybmenqtx.html