Spark Streaming学习

作者: shohokuooo | 来源:发表于2017-04-25 23:09 被阅读1684次

    以下内容主要基于Spark2.1.0版本的Spark Streaming内容学习得到。


    还是先把Maven的依赖加入进去:

    https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10

    Overview

    Spark Streaming是core Spark API的一个扩展,对实时数据流进行scalable, high-throughput, fault-tolerant流处理。数据可以从多个来源得到,比如:Kafka,Flume,Kinesis或者TCP socket,并提供高级别的函数诸如map,reduce,join和window这样复合的算法。最终处理后的数据可以通过文件系统、数据库和实时dashboards输出。事实上还支持Spark的机器学习图形处理算法在数据流上。

    接收的数据流来源以及输出的数据结果

    内部是按照如下方式处理的。Spark Streaming接收实时输入数据并将数据分成多个batches,然后Spark engine会产生最终的结果batches流。

    按照batch处理

    Spark Streaming提供一个高级别的抽象概念:discretized streamDStream),来代表一个连续的数据流。DStream既可以从Kafka、Flume和Kinesis输入数据流中创建,也可以从其他DStream上通过高级别的操作产生。在内部DStream是由一连串的RDDs来表示。


    A Quick Example

    在进一步学习Spark Streaming细节之前,可以先通过一个简单的例子看下大致的处理过程。下面这个例子就是从在TCP socket上监听数据服务器的文本数据并统计word个数。

    下面逐步分解Example中的各段语句的作用。

    创建一个StringContext,这也是所有Streaming功能的主入口,并设置batch间隔为1s 使用这个context创建一个DStream来表示来自TCP源的数据流。其中arg[0]为输入的入参hostname,在本地运行时为localhost;arg[1]为输入的第二个入参为端口号,比如9999 使用flatMap可以创建另一个DStream出来,目的是把源DStream的每条记录生成新DStream多条记录。其中FlatMapFunction的对象用于实现Transformation 将FlatMapFunction以lambda表达式的方式实现 然后将words这个DStream通过mapToPair和reduceByKey转换为对单词的统计的DStream变量 同样改成lambda表达式的方式实现 前面的代码只是创建了计算过程,但是只有start之后才开始执行。

    执行过程需要建立两个终端,其中一个运行Netcat作为数据服务器,另一个正常使用spark-submit来提交Maven package出来的jar。

    这是个示意,具体运行自己的application的时候不需要用example。

    基础概念

    Linking

    在Maven中加入Spark-Streaming的依赖就不再赘述了。不过如果在代码中使用Kafka、Flume或者Kinesis则需要额外加入他们的artifact到pom.xml的依赖中。最新的版本在Maven repository中查找。

    Kinesis的库在spark中没有找到

    初始化StreamingContext

    SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);

    JavaStreamingContext ssc = new JavaStreamingContext(conf,newDuration(1000));

    这个初始化类似上面example中的代码,不过在集群使用中更倾向于在spark-submit的命令行中输入具体的master:Spark、Mesos或者YARN等。也就是master = args[0]之类。需要主要的是JavaStreamingContext在内部会创建一个JavaSparkContext,可以通过ssc.sparkContext访问。

    batch的时长间隔是一个必须的配置入参。具体的细节可以在下面的调优章节中看到。

    还可以通过已经创建的JavaSparkContext获得:

    JavaSparkContext sc=...//existing JavaSparkContext

    JavaStreamingContext ssc= new JavaStreamingContext(sc,Durations.seconds(1));

    在创建好StreamingContext之后接下来要做的内容与example的相仿:

    1、定义一个输入源——DStream

    2、定义对DStream的计算:通过transformation输出DStream

    3、启动接收数据并处理:streamingContext.start()

    4、等待处理过程的停止信号:streamingContext.awaitTermination()

    5、代码中控制停止:streamingContext.stop()

    注意事项:

    1、一旦Streaming context被启动,那么就不能修改或者添加任何新的DStream的transformation。

    2、一旦Streaming context被停止,将不能被重启。

    3、同一时刻在一个JVM上只能有一个StreamingContext在运行。

    4、StreamingContext.stop()同时也停止了SparkContext。如果不想把SparkContext停止,需要在stop方法中设置可选参数ssc.stop(false);

    5、SparkContext可以多次创建StreamingContext,只要在创建新的之前老的StreamingContext已经被停止,而且没有停止SparkContext。

    Discretized Streams (DStreams)

    DStream是一连串的RDD,其中每个RDD都包含一个时间间隔中的数据。DStream既是输入的数据流,也是transformation后的处理过的数据流。下图是DStream的表示图示:

    其实对DStream的transformation操作就是对具体RDD的操作,比如前面那个example中的情况:

    Input DStreams and Receivers

    每一个输入的DStream(除了file stream)都与一个Receiver对象相关联。Receiver是从源接收数据并存入内存中。如果想在Application中并行接收多个Stream,那么就需要创建多个input DStream,这也将同时创建多个Receiver来接收多个数据stream。不过需要注意的一点是需要有足够的core来处理这些数据。

    注意事项:

    1、当在本地运行Spark Streaming程序,不要使用"local"/"local[1]"作为master参数。因为这表示只有本地的一个线程在运行这个任务,这时这个线程会运行input DStream所基于的Receiver,就没有额外的线程资源来进行数据的处理了。所以需要设置master参数为local[n],其中n > Receiver的个数。具体的如何设置master的参数参考Spark Properties

    2、同样在集群中运行Streaming任务也需要设置core的个数大于Receiver的个数。不然系统就只会接收数据而不能处理数据了。

    Basic Sources

    在前面的例子中,我们看到通过使用ssc.socketTextStream(...)来创建一个DStream接收从TCP socket来的数据。其实除了socket,StreamingContext API还可以将文件作为数据源来创建DStream。

    File Stream

    为了能从任何文件系统中读取数据使用HDFS API(HDFS、S3、NFS等),DStream的创建方式为:

    streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory);

    Spark Streaming将会监视目录dataDirectory并且处理任何新在这个目录中创建的文件(不支持子目录)。需要注意的是:

    1、文件必须是统一的格式

    2、文件必须是mv到这个文件目录下的(move或者rename)

    3、一旦mv过去文件不能修改,所以如果一个文件被连续的修改,那么新加的内容将不会读取出来。

    对于简单的text文件,有一种简单的创建DStream的方法:streamingContext.textFileStream(dataDirectory). file stream不需要运行一个Receiver,所以也不需要分配core资源。

    Streams based on Custom Receivers

    DStream还可以创建处理用户自定义的Receiver接收到的数据流。详见Custom Receiver Guide

    Queue of RDDs as a Stream

    为了用测试数据测试Spark Streaming,还可以基于一个队列的RDD创建DStream,通过方法streamingContext.queueStream(queueOfRDDs)。其中每一个RDD被认为是DStream的一个batch数据。

    Advanced Sources

    Kafka:Spark Streaming 2.1.0 is compatible with Kafka broker versions 0.8.2.1 or higher. See the Kafka Integration Guide for more details.

    Flume:Spark Streaming 2.1.0 is compatible with Flume 1.6.0. See the Flume Integration Guide for more details.

    Kinesis:Spark Streaming 2.1.0 is compatible with Kinesis Client Library 1.2.1. See the Kinesis Integration Guide for more details.

    Custom Sources

    Input DStreams同样可以创建为用户自定义的数据源。所有需要做的事情就是实现一个自定义的Receiver用以从用户的源中接收数据并推给Spark。详见Custom Receiver Guide

    Receiver的可靠性

    基于可靠性有两种数据源一种是可靠的,另一种是不可靠的。可靠的是指诸如Kafka或者Flume这种有ACK反馈的,不可靠的则没有。

    可靠的Receiver:会在接收到数据后给源发送ACK

    不可靠的Receiver:不会给源发送ACK

    Transformations on DStreams

    类似于RDD,DStream也可以通过transformation对输入的DStream进行处理。下面列出了常用的一些:

    DStream的Transformation

    上面一些transformation将会重点介绍:

    UpdateStateByKey Operation

    流计算往往是7*24小时不间断的,所以需要中间保存一些状态。

    注意:使用updateStateByKey必须配置checkpoint目录,否则会报错

    Transform Operation

    transform主要是对一些RDD支持的Transformation而DStream中没有支持的做扩展。也类似与对DStream的每个RDD进行操作。

    这个代码是利用transform对join操作的实现

    Window Operations

    如下图所示,Spark Streaming提供了一个窗口计算,允许在滑动窗口内的数据进行Transformation。

    窗长是3个时间间隔,按照2个时间间隔进行滑动

    任何一个window operation都需要下面两个参数:

    window length - 窗长

    sliding interval - 窗滑动的时间间隔

    还是以上面那个数单词的case举例,假如现在有个需求是每隔10s就统计下最近30s时间内的单词数目。那么代码将写为:

    窗长和窗滑动的时间间隔都必须是batch时长的整数倍

    还有其他一些使用窗的Transformation:

    和windowLength以及slideInterval相关的Tranformation

    Join Operations

    最后需要重点说下在Spark Streaming中做多个不同类型的join是how easily

    Stream-stream joins

    stream之间可以非常简单的join在一起

    两个window stream相join

    Stream-dataset joins

    下面展示下一个window stream如何和一个Dataset进行join

    用到了transform

    DStream的输出

    output操作类似与RDD的action操作,只有output才会真正执行之前的Transformation操作。

    Design Patterns for using foreachRDD

    主要讲到实际运行时类的序列化和反序列化的设计,以及错误的使用会导致的一些问题。暂时不细说了。

    DataFrame and SQL Operations

    还可以在流数据中轻松使用DataFrame和SQL。首先必须要使用StreamingContext的SparkContext来创建一个SparkSession。下面就是一个将之前的word count的例子改成使用DataFrame和SQL的方式。每个RDD都被转成DataFrame,注册成一个临时的表并使用SQL。

    下面是具体的代码示例:

    代码第一段 以StreamingContext的sparkConf来创建SparkSession 代码第二段

    其中

    JavaRecord的定义

    MLlib Operations

    还可以轻松使用MLlib的机器学习算法。首先存在流机器学习算法(比如:Streaming Linear Regression或者Streaming KMeans等),这些算法可以同时学习流数据产生模型并把模型应用到流数据当中。除了这些算法,还有更多的算法可以从offline的历史数据中学习得到模型,然后将这些模型应用到流数据中。具体的可以看指导文档:MLlib

    相关文章

      网友评论

        本文标题:Spark Streaming学习

        本文链接:https://www.haomeiwen.com/subject/zhuvzttx.html