美文网首页Spark 3.0.0 官方文档翻译
05 Spark Streaming Programming G

05 Spark Streaming Programming G

作者: Whaatfor | 来源:发表于2020-07-21 10:57 被阅读0次

    转载请注明出处,谢谢合作~

    该篇中的示例暂时只有 Scala 版本~

    Spark Streaming 编程指南

    概述

    Spark Streaming 是 Spark 核心 API 的扩展,提供了可扩展、高吞吐且容错的对实时流数据的处理能力。数据来源可以是 Kafka,Kinesis,或者 TCP sockets 等许多类型。数据可以使用复杂的算法逻辑进行计算,这些逻辑可以通过像 mapreducejoinwindow 这样的高阶函数来定义。最后,计算结果可以输出到文件系统,数据库,以及实时仪表盘。其实,你还可以将机器学习(machine learning)和图计算(graph processing)的算法应用在数据流上。

    Spark Streaming

    在内部,Spark Streaming 的工作机制如下。Spark Streaming 接收实时数据流,然后将这些数据切分成多个小的批次,最终通过 Spark 引擎的计算生成批次形式的结果数据流。

    Spark Streaming

    Spark Streaming 提出了一个叫做离散化流(discretized stream 或者 DStream)的高阶抽象概念,来表示连续的流式数据。DStream 可以通过像 Kafka 和 Kinesis 这样的数据源来创建,也可以通过对其他 DStream 应用高阶算子来创建。在内部,一个 DStream 被表示为一系列的 RDDs

    本篇文档介绍了如何使用 DStream 来编写 Spark Streaming 程序,可以使用 Scala,Java 或者 Python 语言。

    注意:有一些 API Python 与其他语言不同,或者不支持。(暂时只提供 Scala 版本的代码,所以忽略了这一句:Throughout this guide, you will find the tag Python API highlighting these differences.)


    快速示例

    在深入学习如何编写 Spark Streaming 应用程序之前,让我们先来看看一个简单 Spark Streaming 应用程序的是什么样的。假如需求是统计文本行中的单词计数,文本数据来源是一个 TCP socket,则步骤如下。

    首先,导入 Spark Streaming 相关的类和一些 StreamingContext 中的隐式转化,其中包含一些与其他的类(比如说 DStream)相关的有用的方法。StreamingContext 类是 Spark Streaming 程序的编程入口,创建一个本地模式的拥有两个线程的 StreamingContext 对象,并设置批次间隔为 1 秒。

    import org.apache.spark._
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
    
    // Create a local StreamingContext with two working thread and batch interval of 1 second.
    // The master requires 2 cores to prevent a starvation scenario.
    
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(1))
    

    通过这个上下文对象,可以创建一个代表来源于 TCP socket 流式数据的的 DStream,需要配置主机名(例如 localhost)和端口(例如 9999)。

    // Create a DStream that will connect to hostname:port, like localhost:9999
    val lines = ssc.socketTextStream("localhost", 9999)
    

    DStream 对象 lines 代表了从数据源到来的流式数据,其中的每一条数据都是一个文本行。接下来,以空格为分隔符将文本行切割成单词。

    // Split each line into words
    val words = lines.flatMap(_.split(" "))
    

    flatMap 是个一对多的 DStream 算子,将源头 DStream 中的每条数据生成多条新数据,创建一个新的 DStream。在此,每个文本行都会被切割成多个单词,生成的单词流由 DStream 对象 words 表示。然后来统计这些单词的数量。

    import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
    // Count each word in each batch
    val pairs = words.map(word => (word, 1))
    val wordCounts = pairs.reduceByKey(_ + _)
    
    // Print the first ten elements of each RDD generated in this DStream to the console
    wordCounts.print()
    

    DStream 对象 words 被映射(一对一转换)为一个 (word, 1) 键值对类型的 DStream,再通过聚合运算获取这些单词在每个批次的数据中出现的频率。最后,wordCounts.print() 会每秒钟打印统计值中的一部分。

    注意,当代码执行到这一行的时候,Spark Streaming 只是定义了程序启动后的计算逻辑,真正的执行还没有开始。启动流式处理任务需要在计算逻辑都配置好之后,调用

    ssc.start()             // Start the computation
    ssc.awaitTermination()  // Wait for the computation to terminate
    

    完整的代码参见 Spark Streaming example NetworkWordCount

    如果你已经下载(downloaded)并安装(built)了 Spark,可以通过下面的方式来运行该示例程序。需要先启动 Netcat 来作为数据提供方:

    $ nc -lk 9999
    

    之后,在另一个终端,启动示例程序:

    $ ./bin/run-example streaming.NetworkWordCount localhost 9999
    

    接下来,在 Netcat 所在的终端中输入的所有文本行会被计算,每秒钟输出到控制台一次,就像下面这样。

    # TERMINAL 1:
    # Running Netcat
    # TERMINAL 2: RUNNING NetworkWordCount
    $ nc -lk 9999 hello world ... $ ./bin/run-example streaming.NetworkWordCount localhost 9999
    ...
    -------------------------------------------
    Time: 1357008430000 ms
    -------------------------------------------
    (hello,1) (world,1)
    ...

    基础概念

    接下来,先放下简单的示例程序,来看看 Spark Streaming 中的基础概念。

    启用 Spark Streaming

    跟 Spark 核心 API 一样,Spark Streaming 也可以通过 Maven 配置。编写 Spark Streaming 应用程序需要将下面的依赖添加到 SBT 或者 Maven 项目中。

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>3.0.0</version>
        <scope>provided</scope>
    </dependency>
    
    libraryDependencies += "org.apache.spark" % "spark-streaming_2.12" % "3.0.0" % "provided"
    

    对于从其他未收录在 Spark Streaming 核心 API 的数据源接入数据,比如 Kafka 和 Kinesis,需要额外添加相应的 spark-streaming-xyz_2.12 依赖。例如,常见的数据源如下。

    Source Artifact
    Kafka spark-streaming-kafka-0-10_2.12
    Kinesis spark-streaming-kinesis-asl_2.12 [Amazon Software License]

    最新支持的数据源及相关版本参见 [Maven repository](https://search.maven.org/#search|ga|1|g%3A"org.apache.spark" AND v%3A"3.0.0")。


    初始化 StreamingContext

    开始编写一个 Spark Streaming 应用程序时,必须首先创建一个 StreamingContext 对象,它是 Spark Streaming 应用程序的入口。

    可以通过 SparkConf 对象来创建 StreamingContext 对象。

    import org.apache.spark._
    import org.apache.spark.streaming._
    
    val conf = new SparkConf().setAppName(appName).setMaster(master)
    val ssc = new StreamingContext(conf, Seconds(1))
    

    参数 appName 指定应用程序的名称,会显示在集群 UI 界面上。参数 master 的可选值有 Spark, Mesos, Kubernetes or YARN cluster URL,或者一个表示本地模式的 「local[*]」 字符串。在实践中,当应用程序运行在一个集群上时,通常不把参数 master 硬编码到程序中,而是通过脚本(launch the application with spark-submit)的参数指定。不过对于测试环境和单元测试可以使用「local[*]」来运行 Spark Streaming 应用程序。注意,在初始化 StreamingContext 对象时,内部会创建一个 SparkContext 对象(Spark 应用的编程入口),可以通过 ssc.sparkContext 访问。

    必须指定批次数据的时间间隔,改参数需要根据应用程序的延迟容忍度和可用的集群资源来定夺。详情参见 Performance Tuning 章节。

    还可以通过一个现有的 SparkContext 对象来创建 StreamingContext 对象。

    import org.apache.spark.streaming._
    
    val sc = ...                // existing SparkContext
    val ssc = new StreamingContext(sc, Seconds(1))
    

    在上下文环境定义好之后,必须执行以下步骤。

    1. 定义输入数据源来创建 DStream。
    2. 通过应用转换和输出算子来定义 DStream 的流式计算逻辑。
    3. 通过 streamingContext.start() 方法启动程序。
    4. 执行 streamingContext.awaitTermination() 方法等待程序停止运行(手动或者出现异常)。
    5. 应用程序可以通过 streamingContext.stop() 方法手动停止。
    注意事项
    • 一旦一个 StreamingContext 启动之后,不再接受新的流式计算逻辑。
    • 一个 StreamingContext 停止之后可以被重启。
    • 在一个 JVM 中只能同时存在一个 StreamingContext 对象。
    • 调用 StreamingContext 对象的 stop() 会同时停止 SparkContext 对象。如果只需要停止 StreamingContext 对象,可以传递 stop() 参数 stopSparkContext 为 false。
    • 一个 SparkContext 对象可以被重用创建多个 StreamingContext 对象,创建新的 StreamingContext 对象时需要之前的 StreamingContext 对象已经停止(不停止 SparkContext 对象)。

    离散化流(Discretized Stream)

    Discretized StreamDStream)是 Spark Streaming 提出的基础抽象概念。DStream 代表连续的流式数据,无论是从数据源接入的数据流,还是从一个 DStream 转换计算而来的新数据流。在内部,一个 DStream 由一系列连续的 RDD 组成,RDD 是 Spark 中最基本的抽象概念,表示一个不可变的,分布式的数据集(详情参见 Spark Programming Guide)。DStream 中每一个 RDD 都包含了某一时间段接收到的数据,入下图所示。

    Spark Streaming

    每一个应用于 DStream 的算子都会被转移到底层的 RDD 上。例如,在前面的示例(earlier example)中,将流式的文本行数据切割成单词,DStream 对象 linesflatMap 算子被应用到了其中的每个 RDD 上,生成了 DStream 对象 words 的 RDD。如下图所示。

    Spark Streaming

    RDD 的转换逻辑由 Spark 引擎进行计算。DStream 算子为开发者提供了便捷的高阶 API,隐藏了底层大部分的细节。DStream 算子将会在后面的章节中讨论。


    输入 DStream 和接收器

    输入 DStream 是代表从流式数据源中接收到的流式数据。在快速示例中(quick example),lines 是一个输入 DStream,代表了从 Netcat 服务中接收到的流式数据。每一种输入 DStream(除了文件流,将在后面的章节中讨论)都关联一个接收器对象 Receiver (Scala doc, Java doc) ,接收器从流式数据源中接收数据,并存储在 Spark 的内存中之后进行计算。

    Spark Streaming 提供了两种类型的内置数据源。

    Spark Streaming provides two categories of built-in streaming sources.

    • 基础数据源: 内置在 StreamingContext API 中的数据源。例如,文件系统和 socket 连接。
    • 高级数据源: 比如 Kafka,Kinesis 等等,需要额外的依赖支持,参见 linking 章节。

    后面会讨论每种类型中的一部分数据源。

    注意,如果需要并行接收多个数据流,可以创建多个输入 DStream(更多的介绍参见 Performance Tuning 章节),这种方式会创建多个接收器同时接收多个数据流。但是请注意 Spark 中的 worker/executor 是一个长时间运行的任务,那么每一个接收器都会占用分配给 Spark Streaming 任务资源中的一个核心。所以,请记住在实践中需要为 Spark Streaming 应用程序提供足够的核数(或者本地模式下的线程数)来接收数据和处理数据。

    注意事项
    • 当以本地模式运行 Spark Streaming 应用程序时,请不要使用「local」或者「local[1]」作为 master URL,这样的配额制意味着只有一个线程会被用来参与计算。如果使用了基于接收器的输入 DStream(比如 socket,kafka 等等),那么唯一的一个线程将会被用来运行接收器,那么就没有线程来处理接收到的数据了。所以,当以本地模式运行时,请使用「local[n]」作为 master URL,其中 n 大于接收器的数量(如何设置 master 参见 Spark Properties)。
    • 将上述逻辑拓展到集群模式,给 Spark Streaming 应用程序分配的核数必须大于接收器的数量,否则程序只会接收数据,但是并不计算。

    基础数据源

    从前面的示例(quick example)中的 ssc.socketTextStream(...),我们已经了解了如何将 TCP socket 作为数据源,创建相应的数据接收器。除了 socket,StreamingContext API 还提供了从文件流中创建 DStream 作为输入的方法。

    文件流

    对于从兼容 HDFS API 的文件系统(即,HDFS, S3, NFS 等等)中读取文件数据,可以通过 StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass] 方法创建 DStream。

    文件流数据源不需要创建接收器,所以没必要为程序提供多余的核心来接收文件数据。

    streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
    

    对于简单的文本文件,最容易的方法是 StreamingContext.textFileStream(dataDirectory)

    streamingContext.textFileStream(dataDirectory)
    
    如何监控目录

    Spark Streaming 会监控数据目录 dataDirectory,并计算所有该目录中创建的文件。

    • 一个简单的目录就可以被监控,例如 "hdfs://namenode:8040/logs/"。所有该路径下的文件一旦被发现,就会被计算。
    • 可以使用通配符(POSIX glob pattern),例如 "hdfs://namenode:8040/logs/2017/*"。此时,DStream 中会包含名称匹配该模式的所有目录下的文件。也就是说,这是一个目录匹配模式,而不是目录中文件的匹配模式。
    • 所有的文件都必须是相同的数据格式。
    • 文件是否属于某一时间段取决于其修改时间,而不是创建时间。
    • 一旦参与计算,在当前时间窗口内对文件进行的修改将不会导致该文件被重复读取。即,更新操作被忽略了
    • 一个目录下的文件越多,扫描该目录所花费的时间就越长——即使没有文件被修改。
    • 如果在指定的路径中使用了通配符,比如 "hdfs://namenode:8040/logs/2016-*",将某个不匹配的目录改名为匹配的目录,会将该目录添加到监控的目录中。该目录中只有修改时间在当前窗口内的文件才会被计算。
    • 调用 FileSystem.setTimes() 方法可以修改一个文件的时间戳,可以通过这种方式让它归属于之后的窗口,即使文件内容没有变化。
    使用对象存储文件系统作为数据源

    像 HDFS 这样的「完备的」文件系统倾向于将文件输出流创建的时刻设置为文件的修改时间。当文件输出流被打开后,即使数据还没有完全写入完成,它也可能会被添加到 DStream 中——在这之后对文件的更新部分将会被忽略。即,修改可能会丢失,数据可能会被丢弃。

    为了保证某个时间窗口内的所有的更新都可以被计算,请将文件写入到一个没被监控的目录,之后,当文件写入完成时,立即将其更名为监控目录下的文件。如果被重命名之后的文件的创建时间在当前时间窗口内出现在了扫描目录下,新的数据将会被计算。(如果写的时间太长,错过了当前时间窗口,这文件如果不更新修改时间,就读取不到了)

    相反,像 Amazon S3 和 Azure Storage 这样的对象存储文件系统,通常会在数据被复制之后才设置修改时间。此外,重命名对象文件会将文件修改时间设置为 rename() 操作发生的时间,所以可能不会被归属于其最初创建时间所在时间窗口中。

    对于对象存储系统中的文件需要谨慎的测试,来验证存储系统的时间戳行为与 Spark Streaming 期望的保持一致。在对象存储文件系统直接将文件写入目标目录可能是将其作为流式数据看待的比较合理的策略。

    对此话题的更多详情,参见 Hadoop Filesystem Specification

    基于自定义接收器的数据流

    DStream 可以通过自定义的接收器来输入数据。详情参见 Custom Receiver Guide

    将 RDD 队列作为数据流

    如果使用测试数据测试 Spark Streaming 应用程序,可以通过 streamingContext.queueStream(queueOfRDDs) 方法基于一个 RDD 队列创建 Dstream。每一个队列中的 RDD 都会被当做 DStream 中的一个批次,并像流一个被计算。

    更多关于 socket 和文件的数据流,参见 API 文档,Scala 版本的 StreamingContext,Java 版本的 JavaStreamingContext 和 Python 版本的 StreamingContext

    高级数据源

    Python API:在 Spark 3.0.0 中,除了以上数据源,Python API 还支持了 Kafka 和 Kinesis。

    这一类数据源需要非 Spark 自带的依赖库,其中一些的依赖比较繁琐(比如 Kafka)。所以,为了最大限度的避免版本冲突的问题,创建这类数据源 DStream 的依赖被分离出来,在使用时需要显示引入(linked)。

    注意,这些高级数据源在 Spark shell 中无法使用,所以使用这些数据源的程序无法在 Spark shell 中测试。如果真的需要在 Spark shell 中使用这些数据源,必须下载相关的依赖并把它们加入到 classpath 中。

    其中一些高级数据源如下所示。

    自定义数据源

    Python API: 目前 Python 还不支持自定义数据源。

    输入 DStream 还可以通过自定义数据源创建。需要实现一个自定义的接收器(将在下一章节讨论),该接收器可以从自定义数据源中读取数据并导入到 Spark 中。详情参见 Custom Receiver Guide

    接收器的可靠性

    依据可靠性可以将数据源分成两种。有些数据源允许接收到的数据被确认,如果系统正确的确认了从这些可靠的数据源中读取的数据,则可以保证没有数据会丢失。所以有以下两种接收器。

    1. 可靠的接收器——一个可靠的接收器能够在数据被接收并存储到 Spark 之后正确的向可靠数据源发送确认消息。
    2. 不可靠的接收器——一个不可靠的接收器无法向数据源发送确认消息。这种接收器适用于不支持数据确认的数据源,或者不需要确认消息的数据源。

    有关如果编写可靠的接收器的详情参见 Custom Receiver Guide


    DStream 的 Transfformation 算子

    与 RDD 类似,可以对输入 DStream 的数据进行转换操作,DStream 支持许多 Spark RDD 中可用的 Transfformation 算子。常见的 Transfformation 算子如下。

    Transformation Meaning
    map(func) 把 DStream 中每个元素经过函数 func 的映射结果生成一个新的 DStream。
    flatMap(func) 与 map 相似,但是每一个输入值都可以被映射成 0 或者多个新的元素。
    filter(func) 把 DStream 中每个元素经过过滤函数 func 的计算,返回 true 的元素生成一个新的 DStream。
    repartition(numPartitions) 对 DStream 进行重分区操作,通过增加或者减少分区改变 DStream 的计算并行度。
    union(otherStream) 合并两个 DStream 成为一个。
    count() 将 DStream 中每个 RDD 中的元素计数生成一个包含单个计数值的 RDD,返回这些 RDD 构成新的 DStream。
    reduce(func) 返回一个由单值 RDD 构成的新的 DStream,每个 RDD 中的单值都由源 DStream 中的每个 RDD 的元素应用聚合函数 func 聚合而来,该聚合函数需要符合交换律和结合律,从而计算可以并行执行。
    countByValue() 在一个类型为 K 的 DStream 上调用该算子时,返回一个 (K, Long) 键值对类型的 DStream,代表每个 RDD 中不同键的计数值。
    reduceByKey(func, [numTasks]) 在一个类型为 (K, V) 键值对的 DStream上调用该算子时,返回一个新的类型为 (K, V) 键值对的 DStream,其中每个键的聚合值由聚合函数计 func 算而来。注意:默认情况下,该算子使用 Spark 默认的并行任务数(本地模式是 2,集群模式由参数 spark.default.parallelism 控制)来进行分组聚合。可以通过参数 numTasks 自定义并行度。
    join(otherStream, [numTasks]) 在两个类型分别为 (K, V) 和 (K, W) 的 DStream 上调用该算子时,返回一个新的类型为 (K, (V, W)) 的 DStream,其中包含对每个键的元素组合。
    cogroup(otherStream, [numTasks]) 在两个类型分别为 (K, V) 和 (K, W) 的 DStream 上调用该算子时,返回一个类型为 (K, Seq[V], Seq[W]) 元组的 DStream。
    transform(func) 返回一个新的 DStream,其中的 RDD 由源 DStream 中的 RDD 应用函数计func 算而来。该算子用来自定义 DStream 中 RDD 的计算。
    updateStateByKey(func) 返回一个新的有状态 Dstream,其中每个键值的新状态值由指定的函数计算之前的状态和新的数据得来。该算子可以用来维护自定义状态数据。

    其中一些算子值得深入探讨。

    UpdateStateByKey 算子

    updateStateByKey 算子可以根据新接收的数据更新自定义的状态,使用该算子需要以下步骤。

    1. 定义状态——状态可以是自定义的数据类型。
    2. 定义状态更新函数——指定根据已有状态值和新接收数据计算新状态值的函数。

    在每个批次中,Spark 会对所有存在的键值应用状态更新函数,无论该键值是否有新的数据到来。如果更新函数返回 None,那么该键值对将会被清理。

    让我们来看一个示例,假设需要维护一个文本数据流中每个单词的计数值,在此,计数值就是状态,是一个整数。则定义状态更新函数:

    def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
        val newCount = ...  // add the new values with the previous running count to get the new count
        Some(newCount)
    }
    

    该函数可以应用于一个包含单词的数据流 DStream(比如前面示例 earlier example 中的 pairs 对象是一个包含数据 (word, 1) 键值对的 DStream)。

    val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
    

    对其中的每个单词都会调用状态更新函数,参数 newValues 是一个值都为 1 的序列,而参数 runningCount 中包含了之前的计数值。

    注意,使用 updateStateByKey 算子需要设置检查点目录,详情参见 checkpointing 章节。

    Transform 算子

    transform 算子(以及像 transformWith 这样的变体算子)可以指定一个自定义的 RDD 到 RDD 的函数,作用于 DStream 上的每个 RDD。该算子可以用来使用那些没有暴露在 DStream API 中的 RDD 算子。例如,将 DStream 中的每个 RDD 和另一个静态 RDD 进行连接操作是没有暴露在 DStream API 中的,可以通过 transform 算子轻松的实现。这种方式可以带来很大的想象空间,例如,可以将输入的流式数据和预计算好的垃圾数据(可能也是 Spark 生成的)进行连接操作而达到实时数据清洗的目的。

    val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
    
    val cleanedDStream = wordCounts.transform { rdd =>
      rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
      ...
    }
    

    注意,在每个批次中都会调用指定的函数,这种机制可以实现基于时间的 RDD 算子,即,RDD 算子,分区数,广播变量等等可以在不同的批次中发生改变。

    Window 算子

    Spark Streaming 也支持窗口计算,可以在一个滑动窗口中执行转换操作,如下图所示。

    Spark Streaming

    如图所示,当窗口在源 DStream 上滑动时,落入窗口的 RDD 会被融合在一起来生成 windowed DStream 的 RDD。在上面的图中,算子被应用于最近 3 个时间单元的数据,并且每次滑动 2 个时间单元。所以每个窗口算子都需要指定两个参数。

    • 窗口时间长度 - 窗口的持续时间(图中是 3)。
    • 滑动时间间隔 - 触发窗口算子计算的时间滑动距离(图中是 2)。

    这两个参数必须是源 DStream 批次时间间隔的整数倍(图中是 1).

    接下来通过一个示例表述一下。考虑扩展前面的示例,每 10 秒钟统计一次前 30 秒的单词计数,为此需要在类型为 (word, 1) 的 DStream 对象 pairs 上使用 reduceByKey 算子,对过去的 30 秒接收的数据做计算。在滑动窗口场景下可以使用 reduceByKeyAndWindow 算子。

    // Reduce last 30 seconds of data, every 10 seconds
    val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
    

    常见的窗口算子如下所示,所有的这些算子都需要两个参数——窗口长度滑动距离

    Transformation Meaning
    window(windowLength, slideInterval) 根据源 DStream 返回一个新的基于窗口批次计算的 DStream。
    countByWindow(windowLength, slideInterval) 返回流式滑动窗口中数据条目的计数值。
    reduceByWindow(func, windowLength, slideInterval) 返回一个单值流(每个批次只包含一个值),该值由聚合函数 func 根据滑动窗口中的数据计算而来。聚合函数需要满足交换律和结合律,使其能够并行计算得到正确的结果。
    reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 在一个类型为 (K, V) 的 DStream 上调用该算子时,返回一个新的类型为 (K, V) 的DStream,其中的数据为滑动窗口中每个不同的键值和相应的数据根据聚合函数 func 计算而来聚合值。注意:默认情况下,该算子使用 Spark 默认的并行任务数(本地模式是 2,集群模式由参数 spark.default.parallelism 控制)来进行分组聚合。可以通过参数 numTasks 自定义并行度。
    reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) reduceByKeyAndWindow() 算子的一个更高效的版本,每个滑动窗口中的聚合值通过之前窗口的聚合值增量计算而来。该算子的实现方式是聚合进入滑动窗口的新数据,并「逆向聚合」离开该窗口的老数据。例如,「增加」和「减少」窗口内键值的计数。然而,该算子只适用于聚合函数存在相应的「逆向聚合」函数(参数 invFunc)的场景。跟 reduceByKeyAndWindow 算子一样,可以通过可选的参数 numTasks 自定义聚合计算并行度。注意在使用该算子时必须设置检查点(checkpointing)。
    countByValueAndWindow(windowLength, slideInterval, [numTasks]) 在一个类型为 (K, V) 的 DStream 上调用该算子时,返回一个 (K, Long) 键值对类型的 DStream,代表滑动窗口中不同键的计数值。跟 reduceByKeyAndWindow 算子一样,可以通过可选的参数 numTasks 自定义聚合计算并行度。

    Join 算子

    最后,需要重点介绍在 Spark Streaming 中进行不同类型的连接计算是多么方便。

    Stream-stream join

    一个数据和可以轻松的和另一个数据流进行连接。

    val stream1: DStream[String, String] = ...
    val stream2: DStream[String, String] = ...
    val joinedStream = stream1.join(stream2)
    

    这里,在每个批次中,stream1 生成的 RDD 会和 stream2 生成的 RDD 进行连接。还可以进行其他类型的连接操作,比如,leftOuterJoinrightOuterJoinfullOuterJoin。此外,在窗口之间进行连接运算也很有用,这也很简单。

    val windowedStream1 = stream1.window(Seconds(20))
    val windowedStream2 = stream2.window(Minutes(1))
    val joinedStream = windowedStream1.join(windowedStream2)
    
    Stream-dataset join

    这种连接操作已经在前面的 DStream.transform 算子中介绍过了,这里再提供一个连接窗口数据流和静态数据集的例子。

    val dataset: RDD[String, String] = ...
    val windowedStream = stream.window(Seconds(20))...
    val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
    

    实际上,还可以动态的调整被连接的静态数据集。提供给 transform 算子的函数每个批次都会触发一次计算,所以会使用 dataset 对象当前指向的那个静态数据集。

    完整的 DStream 转换操作算子列表详见 API 文档。Scala 版本参见 DStreamPairDStreamFunctions,Java 版本参见 JavaDStreamJavaPairDStream,Python 版本参见 DStream


    DStream 的输出算子

    输出算子可以将 DStream 中的数据存储到像数据库或者文件系统这样的外部系统中。由于输出算子真正的让转换逻辑计算得来的数据输出到外部系统,它们才触发所有转换操作的实际计算(类似 RDD 中的 action 算子)。目前,有以下几种输出算子:

    Output Operation Meaning
    print() 在 driver 端打印 DStream 中每个批次数据的前 10 个到控制台。该算子适用于开发和调试场景。
    Python API 在 Python API 中请使用 pprint() 方法。
    saveAsTextFiles(prefix, [suffix]) 将 DStream 中的数据存储为文本文件。每个批次的文件名由定义的前缀和可选的后缀生成:"prefix-TIME_IN_MS[.suffix]"
    saveAsObjectFiles(prefix, [suffix]) 将 DStream 中的数据存储为序列化后的 Java 对象,文件格式为 SequenceFiles。每个批次的文件名由定义的前缀和可选的后缀生成:"prefix-TIME_IN_MS[.suffix]"
    Python API 该算子在 Python API 中不可用。
    saveAsHadoopFiles(prefix, [suffix]) 将 DStream 中的数据存储为 Hadoop 文件。每个批次的文件名由定义的前缀和可选的后缀生成:"prefix-TIME_IN_MS[.suffix]"
    Python API 该算子在 Python API 中不可用。
    foreachRDD(func) 该算子是最通用的输出算子,对每个数据流中的 RDD 调用函数 func。函数中可以将 RDD 中的数据存储到外部系统,比如存储为文件,通过网络发送或者写入到数据库。注意函数 func 在 driver 端执行,通常调用 RDD 的 action 算子来触发计算。

    foreachRDD 算子的设计模式

    dstream.foreachRDD 算子是一个功能很强大的工具,将数据发送到外部存储系统。然而,重要的是如何正确且高效的运用这个工具。有一些常见的错误应该避免。

    通常存储数据到外部系统都需要一个连接对象(例如,连接到远程服务的 TCP 连接),该连接用来向外部系统发送数据。为此,开发者可能会不经意间在 driver 端创建一个连接,之后尝试在 executor 中使用这个连接。

    dstream.foreachRDD { rdd =>
      val connection = createNewConnection()  // executed at the driver
      rdd.foreach { record =>
        connection.send(record) // executed at the worker
      }
    }
    

    这种方式是错误的,因为 driver 需要将连接对象序列化之后发送到 executor 去。而像这样的连接一般不会在不同的节点之间传输,这种做法通常会发生序列化错误(连接对象无法序列化),初始化错误(连接对象没有在 executor 端初始化)等等。正确的做法是在 executor 端创建连接对象。

    然而,这有可能导致另一个常见的错误——为每一条数据都创建一个连接。例如:

    dstream.foreachRDD { rdd =>
      rdd.foreach { record =>
        val connection = createNewConnection()
        connection.send(record)
        connection.close()
      }
    }
    

    一般来说,创建一个连接对象的开销很大,所以为每条数据都创建和销毁连接对象会造成不必要的高负载,显著的降低程序的吞吐量。更好的解决方案是使用 rdd.foreachPartition 算子,在一个 RDD 分区中创建一个连接对象,使用该对象服务该分区中的所有数据。

    dstream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        val connection = createNewConnection()
        partitionOfRecords.foreach(record => connection.send(record))
        connection.close()
      }
    }
    

    这种方式将创建连接对象的开销分摊给了分区中的所有数据。

    最后,还可以进一步优化,在多个批次的 RDD 之间复用连接对象。可以通过维护一个静态的连接池,在不同的 RDD 之间重用连接对象,进一步降低开销。

    dstream.foreachRDD { rdd =>
      rdd.foreachPartition { partitionOfRecords =>
        // ConnectionPool is a static, lazily initialized pool of connections
        val connection = ConnectionPool.getConnection()
        partitionOfRecords.foreach(record => connection.send(record))
        ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
      }
    }
    

    注意,连接池中的连接对象应该在需要使用时创建,并设置空闲超时时间。这是发送数据到外部系统最高效的方式。

    其他注意事项
    • DStream 是惰性计算的,有输出算子触发,就像 RDD 由 action 算子触发一样。尤其是,DStream 中的 RDD action 算子触发接收到的数据的计算。所以,如果应用程序没有输出算子,或者 dstream.foreachRDD() 算子中没有 RDD action 算子,那么将没有计算被执行。程序将接收数据,然后丢弃之。
    • 默认情况下,输出算子按照代码中被定义的顺序逐个执行。

    DataFrame 和 SQL 算子

    可以在流式数据中轻松的使用 DataFrames and SQL 算子,为此必须通过 StreamingContext 所用的 SparkContext 对象创建一个 SparkSession 对象。另外为了失败重启,必须创建一个懒加载的单例 SparkSession 对象,参见下面的例子。这个示例修改了前面的示例程序(word count example),通过 DataFrame 和 SQL 进行单词计数。每一个 RDD 都被转换成了一个 DataFrame,并注册为一张临时表,再通过 SQL 语句查询。

    /** DataFrame operations inside your streaming program */
    
    val words: DStream[String] = ...
    
    words.foreachRDD { rdd =>
    
      // Get the singleton instance of SparkSession
      val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
      import spark.implicits._
    
      // Convert RDD[String] to DataFrame
      val wordsDataFrame = rdd.toDF("word")
    
      // Create a temporary view
      wordsDataFrame.createOrReplaceTempView("words")
    
      // Do word count on DataFrame using SQL and print it
      val wordCountsDataFrame = 
        spark.sql("select word, count(*) as total from words group by word")
      wordCountsDataFrame.show()
    }
    

    完整的代码参见 source code

    还可以在另一个线程中对流式数据中定义的临时表执行 SQL 查询(即,异步执行),但是需要确保 StreamingContext 保留了足够的数据供异步查询使用。否则,对于不知晓异步查询存在的 StreamingContext 来说,有可能在一步查询结束之前就清理了老的数据。例如,如果需要查询最后一个批次,但是查询需要耗时 5 分钟,请调用 streamingContext.remember(Minutes(5)) 方法。

    关于 DataFrame 的其他信息参见 DataFrames and SQL


    机器学习库算子

    还可以使用机器学习库(MLlib)提供的机器学习算法。首先,目前已经提供了一些流式数据机器学习算法(例如,Streaming Linear Regression, Streaming KMeans 等等),这些算法可以同时对流式数据进行训练,就好像将模型应用在了流式数据上。除了这些,对于其他更多的机器学习算法,可以先对一个模型进行离线计算(即,使用历史数据),之后在应用到流式数据上。详情参见 MLlib


    缓存 / 持久化

    和 RDD 类似,DStream 也允许开发者将流式数据缓存在内存中。在 DStream 对象上调用 persist() 方法会自动缓存每个 RDD 的数据到内存中。这种方式适用于流式数据会被多次计算的场景(多个算子计算相同的数据)。对于像 reduceByWindowreduceByKeyAndWindow 这样的基于窗口的算子,以及像 updateStateByKey 这样的有状态算子,缓存是默认开启的。所以,窗口算子生成的 DStream 会自动将数据缓存在内存中,开发者不需要调用 persist() 方法。

    对于从网络中接收数据的输入流(比如,Kafka,socket 等等),默认的持久化级别是将数据复制到两个节点,来进行容错。

    注意,跟 RDD 不同,DStream 默认的持久化级别是将序列化后的数据保存在内存中。更多的讨论参见 Performance Tuning 章节。关于不同持久化级别的更多信息参见 Spark Programming Guide


    检查点机制

    一个流式应用程序通常会一直运行,所以必须可以处理与计算逻辑无关的失败场景(例如,系统宕机,JVM 异常退出等等)。为了使之成为可能,Spark Streaming 任务需要保存足够的信息到容错的存储系统中,从而使其可以从故障中恢复。有两种数据会被存储到检查点。

    • 元信息存储 - 存储与流式应用程序计算逻辑相关的信息到容错的存储系统,比如 HDFS。这些信息用来重失败的节点中恢复流式应用程序段 driver 数据(后面会详细讨论。元信息包括:
      • 配置信息 - 创建流式应用程序的配置项。
      • DStream 算子 - 流式应用程序中定义的算子集合。
      • 未完成的批次 - 未完成的已加入执行队列的批次。
    • 数据存储 - 存储生成的 RDD 到可靠的存储系统。对于一些有状态的算子,存储这些数据是有必要的。这类算子生成的 RDD 依赖于之前批次的 RDD,导致依赖链随着时间而增长。为了避免恢复时无限制的时间膨胀 (与时间成比例的依赖链),RDD 的中间计算状态会周期性的存储到可靠的系统中(比如 HDFS)来削减依赖链。

    总的来说,存储元信息到检查点是为了恢复 driver 数据,而存储 RDD 数据到检查点是为了恢复有状态算子的数据。

    何时启用检查点

    对于下列场景,必须启用检查点机制:

    • 使用有状态算子 - 如果应用程序中使用了 updateStateByKey 或者 reduceByKeyAndWindow (使用逆向聚合函数)算子,则必须提供检查点路径来周期性的储存 RDD 数据。
    • 从失败的 driver 中恢复应用程序 - 存储到检查点的应用程序元信息用来恢复 driver 数据。

    注意,没有用到上面提到的有状态算子的简单流式应用程序可以不开启检查点。还是可以从 driver 故障中部分恢复应用程序的运行(部分已接收但是还未来得及处理的数据会丢失)。这通常是可以接受的,有很多 Spark Streaming 应用程序以这种方式运行。未来可能会提供对非 Hadoop 系统的支持。

    如何配置检查点

    可以通过配置一个容错的,可靠的文件系统(例如,HDFS,S3 等等)路径来开启检查点机制,相关信息将会存储到该路径。请调用 streamingContext.checkpoint(checkpointDirectory) 方法来配置检查点,之后便可以使用前面提到的有状态算子。另外,如果想让应用程序从 driver 故障中恢复,需要对运算程序进行下面的改造。

    • 当程序第一次启动时,会创建一个新 StreamingContext 对象,定义了所有的计算逻辑之后调用 start() 方法。
    • 当程序从故障中重启之后,会根据检查点路径的检查点数据创建 StreamingContext 对象。

    这样的行为可以通过使用 StreamingContext.getOrCreate 方法轻易的实现,如下所示。

    // Function to create and setup a new StreamingContext
    def functionToCreateContext(): StreamingContext = {
      val ssc = new StreamingContext(...)   // new context
      val lines = ssc.socketTextStream(...) // create DStreams
      ...
      ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
      ssc
    }
    
    // Get StreamingContext from checkpoint data or create a new one
    val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
    
    // Do additional setup on context that needs to be done,
    // irrespective of whether it is being started or restarted
    context. ...
    
    // Start the context
    context.start()
    context.awaitTermination()
    

    如果检查点路径 checkpointDirectory 存在,则会从检查点数据中重新创建 StreamingContext 对象。如果检查点路径不存在(即,第一次启动程序),那么会调用 functionToCreateContext 方法创建一个新的 StreamingContext 对象。参见 Scala 示例代码 RecoverableNetworkWordCount,该示例将从网络中接收到的文本行进行单词计数之后存储到文件中。

    除了使用 getOrCreate 方法,还需要保证 driver 进程会在遇到故障后自动重启,这只能通过部署应用程序的基础环境来显示。更多讨论参见 Deployment 章节。

    注意,存储 RDD 数据到检查点会造成存储到外部系统的开销,可能会增加相应的 RDD 的计算时间。所以,检查点触发的时间间隔需要谨慎设置。对于短时间的批次(比如 1 秒),对每个批次都触发检查点机制会显著降低吞吐量。相反,过长的检查点触发时间间隔会导致数据血缘和子任务的状态膨胀,对计算有不利的影响。对于需要检查点的有状态算子,默认的检查点触发时间间隔是批次时间间隔的整数倍,最小值为 10 秒,可以通过 dstream.checkpoint(checkpointInterval) 方法设置该参数。通常情况下,可以尝试将检查点触发时间间隔设置为滑动时间间隔的 5 到 10 倍。


    累加器,广播变量和检查点

    在 Spark Streaming 中,累加器(Accumulators)和广播变量(Broadcast variables)无法从检查点中恢复。如果开启了检查点机制,同时使用了累加器或者广播变量,必须创建相应的惰性初始化的单例对象,来保证在 driver 从故障中重启之后可以被重新初始化,如下所示。

    object WordBlacklist {
    
      @volatile private var instance: Broadcast[Seq[String]] = null
    
      def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
        if (instance == null) {
          synchronized {
            if (instance == null) {
              val wordBlacklist = Seq("a", "b", "c")
              instance = sc.broadcast(wordBlacklist)
            }
          }
        }
        instance
      }
    }
    
    object DroppedWordsCounter {
    
      @volatile private var instance: LongAccumulator = null
    
      def getInstance(sc: SparkContext): LongAccumulator = {
        if (instance == null) {
          synchronized {
            if (instance == null) {
              instance = sc.longAccumulator("WordsInBlacklistCounter")
            }
          }
        }
        instance
      }
    }
    
    wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
      // Get or register the blacklist Broadcast
      val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
      // Get or register the droppedWordsCounter Accumulator
      val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
      // Use blacklist to drop words and use droppedWordsCounter to count them
      val counts = rdd.filter { case (word, count) =>
        if (blacklist.value.contains(word)) {
          droppedWordsCounter.add(count)
          false
        } else {
          true
        }
      }.collect().mkString("[", ", ", "]")
      val output = "Counts at time " + time + " " + counts
    })
    

    完整的代码参见 source code


    部署应用程序

    本章节讨论部署 Spark Streaming 应用程序的步骤。

    前置条件

    运行 Spark Streaming 应用程序需要以下几个步骤。

    • 由集群管理器管理的集群 - 这是 Spark 应用程序最基础的环境,详情参见 deployment guide
    • 打包好的应用程序 JAR 文件 - 必须将应用程序编译成一个 JAR 文件。如果使用 spark-submit 脚本启动应用程序,则不需要提供 Spark 或者 Spark Streaming 相关的依赖。然而,如果用到了 Kafka 之类的高级数据源(advanced sources),则必须将相应的依赖打包进应用程序 JAR 文件。例如,使用到 KafkaUtils 的应用程序必须将依赖 spark-streaming-kafka-0-10_2.12 及其相关依赖加入到应用程序 JAR 文件中。
    • 为 executor 配置足够的内存 - 由于接收到的数据必须存储在内存,executor 必须拥有足够的内存来保存接收到的数据。注意,如果使用了窗口大小为 10 分钟的算子,那么系统必须至少可以将 10 分钟的数据存储到内存中。所以所需要的内存取决于应用程序的算子如何执行计算。
    • 配置检查点路径 - 如果流式应用程序需要,则必须配置一个兼容 Hadoop API 的容错存储系统(例如,HDFS,S3 等等)路径作为检查点目录,使得流式应用程序可以从故障中恢复。详情参见 checkpointing 章节。
    • 配置应用程序 driver 的自动重启 - 为了使 driver 进程可以从故障中恢复,运行流式引用程序的部署环境必须监控 driver 进程,并在其失败后重新启动。不同的集群管理器(cluster managers)采用不同的工具来实现。
      • Spark Standalone - 一个 Spark 应用程序 driver 可以被提交到 Spark Standalone 集群(参见 cluster deploy mode)上执行,即,应用程序的 driver 端运行在某一个 worker 节点上。另外,Standalone 集群管理器可以监控 driver 进程,无论是 driver 异常退出还是其所在节点宕机,都能够重新加载该 driver 程序。详情参见 Spark Standalone guide
      • YARN - Yarn 支持相似的机制来自动重启一个应用程序。详情参见 Yarn 官方文档。
      • Mesos - Marathon 已经被用来在 Mesos 上实现该方案。
    • 配置预写日志(WAL) - 自 Spark 1.2 开始,引入了预写日志(WAL)机制来实现强容错保证。一旦开启该功能,所有接收器接收到的数据都会写入检查点目录,作为预写日志。这种方式避免了 driver 重启造成的数据丢失,可以保证没有数据丢失(详情参见 Fault-tolerance Semantics 章节)。该机制可以通过配置参数(configuration parameterspark.streaming.receiver.writeAheadLog.enabletrue 来开启。然而,这种强容错语义牺牲了单个接收器的吞吐量。可以通过并行运行多个接收器(more receivers in parallel)来规避这一弊端。另外,建议在开启 WAL 机制时禁用接收数据的副本机制,毕竟预写日志已经将数据存储在了支持副本的存储系统。可以通过设置输入流的存储级别为 StorageLevel.MEMORY_AND_DISK_SER 来关闭副本机制。当使用 S3 存储预写日志时,请记得开启 spark.streaming.driver.writeAheadLog.closeFileAfterWritespark.streaming.receiver.writeAheadLog.closeFileAfterWrite。详情参见 Spark Streaming Configuration。注意,即使开启了 I/O 加密,Spark 也不会加密存储到预写日志的文件。如果需要对预写日志进行加密,应该将日志写入原生支持数据加密的存储系统中。
    • 设置最大接收速率 - 如果集群资源不能够支持应用程序处理数据的速度跟上接收器接收数据的速度,接收器可以通过设置以为 records / sec 单位的最大速率值来限制接收速率。参见适用于接收器的配置参数(configuration parametersspark.streaming.receiver.maxRate 和适用于 Direct Kafka 的配置参数 spark.streaming.kafka.maxRatePerPartition。在 Spark 1.5 中引入了反压机制,不需要再对接收速率进行限制,因为 Spark 会自动计算速率限值,并在计算条件发生改变是动态调整。反压机制可以通过将配置参数(configuration parametersspark.streaming.backpressure.enabled 设置为 true 来开启。

    更新应用程序代码

    如果一个正在运行的 Spark Streaming 应用程序需要更新代码,那么有两种可行的方案。

    • 启动更新后的 Spark Streaming 应用程序,和之前的应用程序同时运行。一旦新的应用程序(和老的应用程序接收相同的数据)热身完毕,准备好长期运行,老的程序就可以停止了。注意,这种方案只适用于数据源支持发送相同的数据到不同目的地(即,老的和新的应用程序)的场景。
    • 优雅的停止现有的应用程序(参见的 StreamingContext.stop(...) 或者 JavaStreamingContext.stop(...) 的优雅停止选项),保证在程序停止前已经计算完了所有接收到的数据。之后启动更新后的应用程序,从之前的应用程序停止的地方开始接收数据。注意,这种方案只适用于支持缓存的数据源(比如 Kafka),需要在旧的应用程序停止之后更新后的应用程序启动之前缓存数据。新的程序读取之前程序的检查点信息,继续运行。检查点信息包含了序列化后的 Scala/Java/Python 对象,新的程序反序列化这些对象可能会出错。此时,可以为新的应用程序配置一个不同的检查点路径或者删除之前的检查点路径。

    监控应用程序

    除了 Spark 自身的监控能力(monitoring capabilities),还有一些 Spark Streaming 特有的功能。在使用 StreamingContext 时,应用程序界面(Spark web UI)上会显示额外的 Streaming Tab 页,展示了正在运行的接收器的统计数据(接收器是否启动,接收到的数据总数,接收器异常等等)和完整的批次信息(批次处理事件,队列延迟等等)。这些可以用来监控流式应用程序的执行情况。

    界面上的下面两个指标尤其重要:

    • Processing Time - 计算每个批次数据的耗时。
    • Scheduling Delay - 一个批次在队列中等等前面的批次计算完成的时间。

    如果批次数据的计算时间总是大于批次间隔时间,同时/或者队列延迟时间持续增长,则表明系统处理数据的速度跟不上接收数据的速度,计算进度滞后了。此时,可以考虑降低(reducing)批次处理时间。

    还可以通过 StreamingListener 接口来监控 Spark Streaming 应用程序,能够从中获得接收器的状态和数据处理时间。注意,该接口是一个开发者 API,在将来可能会提供更多的任务执行相关信息。



    性能调优

    让在集群上运行的 Spark Streaming 应用程序达到最高的性能需要一点调优操作。本章节介绍了一些可以优化应用程序性能表现的一些配置参数。在高层面,需要关注两件事情:

    1. 通过高效的利用集群资源,来降低每个批次数据的处理时间。
    2. 设置合理的批次大小,让数据到来之后尽快被处理(即,数据处理的速度跟上数据抽取的速度)。

    减少批次计算时间

    Spark 中有一些优化措施能够最小化每个批次数据的计算时间。更详细的讨论参见 Tuning Guide。本章节重点介绍其中最重要的一些措施。

    数据接收的并行度

    从网络中接收到的数据(比如 Kafka,socket 等等)需要被反序列化之后存储到 Spark 管理的内存中。如果数据接收成为了系统瓶颈,就需要考虑并行化数据接收。注意,每一个输入 DStream 创建一个单独的接收器(运行在某个 worker 节点上)来接收数据,所以接收多个数据流就需要创建多个输入 DStream,并配置接收数据源中不同的数据分区。例如,一个接收两个 Kafka 主题数据的输入 DStream 可以被分割成两个输入 DStream,每个接收一个 Kafka 主题的数据。这样就会运行两个接收器,达到并行接收数据的效果,从而提升吞吐量。多个输入 DStream 可以被合并成一个 DStream,之后的计算逻辑可以应用到这个单一的 DStream 上,如下所示。

    val numStreams = 5
    val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
    val unifiedStream = streamingContext.union(kafkaStreams)
    unifiedStream.print()
    

    另一个需要注意的参数时接收器的数据分块时间间隔,有配置参数(configuration parameterspark.streaming.blockInterval 控制。对于大多数接收器来说,接收到的数据在存储到内存之前首先被合并成数据块。每个批次中的数据块的数量决定了计算(类 map 算子)这些接收数据的子任务数。每个接收器在每个批次数据中的子任务数大概是(批次时间间隔/数据分块时间间隔)。例如,如果批次时间间隔为 2 秒,数据分块时间间隔为 200 毫秒,那么一个批次将会生成 10 个子任务。如果子任务数量太少(即,少于各个计算节点的核数),那么可用的资源不会被全部用来计算数据,导致效率低下。可以通过减少数据分块时间间隔来增加每个批次的子任务数量。然而,推荐的最小数据分块时间间隔是 50 毫秒,再低的话调度子任务的开销也会成为一个问题。

    另一种多输入流解决方案是显示将输入流充分区(使用 inputStream.repartition(<number of partitions>) 方法)。这种方式将接收到的数据分发到集群中不同的节点上进行计算。

    对于直接接入数据流,参见 Spark Streaming + Kafka Integration Guide

    数据处理的并行度

    如果某个阶段的并行子任务数量不够多,可能会导致集群资源没有被充分利用。例如,对于像 reduceByKeyreduceByKeyAndWindow 这样的分布式聚合算子,默认的并行度由配置参数(configuration propertyspark.default.parallelism 控制,可以将并行度当做参数传递给聚合算子(参见 PairDStreamFunctions 文档),或者配置参数(configuration propertyspark.default.parallelism 来改变默认值。

    数据序列化

    可以通过调整序列化格式来降低数据序列化的开销。在流式计算场景下,有两种数据会被序列化。

    • 输入数据:默认情况下,接收器接收到的数据存储在 executor 的内存中,持久化级别是 StorageLevel.MEMORY_AND_DISK_SER_2。即,数据被序列化成字节来缓解 GC 压力,并备份到其他节点一份来进行容错。同时,数据首先被存储到内存,在内存不足以装下所有的输入数据时溢写到磁盘。这种序列化方式明显存在性能损耗——接收器必须反序列化接收到的数据,之后重新根据 Spark 序列化格式重新序列化数据。
    • 流式计算算子生成的持久化 RDD:流式计算生成的 RDD 可能会持久化到内存中。例如,窗口算子会将数据持久化到内存,因为有些数据可能会被多次计算。然而,与 Spark Core API 的默认持久化级别 StorageLevel.MEMORY_ONLY_SER 不同,流式计算生成的 RDD 的持久化级别是 StorageLevel.MEMORY_ONLY_SER(即,序列化后),来最小化 GC 的开销。

    在以上两种场景下,使用 Kryo 序列化器都可以同时降低 CPU 和内存的开销。详情参见 Spark Tuning Guide。对于 Kryo,需要注册用户自定义的类,并且禁用对象引用追踪(参见 Configuration Guide 中有关 Kryo 的配置)。

    当流式计算任务需要保存的数据量不是很大的时候,将数据以未序列化的格式保持久化到内存中是可行的,而且不会造成过大的 GC 开销。例如,如果用几秒钟当做批次时间间隔,同时没有使用窗口算子,可以显示的将持久化级别设置为不序列化的类型。这样会降低由序列化导致的 CPU 开销,从而在 GC 压力不大的情况下提升性能。

    子任务调度开销

    如果每秒钟调度的子任务过多(比如,一秒钟有 50 或者更多),那么调度子任务到其他节点的开销就会很明显,并增加数据计算延迟,达不到亚秒级。此类开销可以通过下面的方式降低:

    • 执行模式:以独立集群的方式运行 Spark 或者采用粗粒度的 Mesos 模式在调度子任务时会比细粒度的 Mesos 模式用时更短。详情参见 Running on Mesos guide

    上述措施可以将批次数据处理时间降低到 100 毫秒的级别,实现亚秒级的计算延迟。


    设置合理的批次间隔

    为了让 Spark Streaming 应用程序能够在集群上稳定运行,程序计算数据的速度应该跟得上数据到来的速度。也就是说,批次数据的计算速度应该跟得上批次数据生成的速度。通过在流式应用界面看板上监控(monitoring)处理时间可以判断是否达到了这项指标,其中的批次数据处理时间应该小于批次时间间隔。

    出于流式计算机制的缘故,对于特定的集群资源池,流式应用程序所采用的批次时间间隔会对程序可以维持的数据计算速率有显著的影响。例如,考虑前面的 WordCountNetwork 示例,对于某一数据接收速率,程序可能可以维持 2 秒一个批次的数据生成速率,但是 500 毫秒就不行。所以批次时间间隔需要合理设置,使之能够支撑生产环境下的数据生成速率。

    测试时设置一个保守的批次时间间隔(比如说,5-10 秒)和较低的数据生成速率是衡量合理批次大小的一个不错的办法,来验证程序的计算速度是否能跟得上数据生成的速度,可以通过每个批次端到端的数据延迟来观察(查看 Spark driver 日志中的「Total delay」 或者使用 StreamingListener 接口)。如果数据延迟的时间和批次时间间隔相匹配,那么程序是稳定的。否则,如果数据延迟持续增加,就意味着计算速度跟不上数据生成速度,程序就是不稳定的。一旦测试出了比较稳定的配置,可以继续尝试增加数据生成速率同时/或者减少批次时间间隔。注意,数据生成速率偶然激增导致的数据延迟临时变高是可以接受的,只要之后延迟回落到低水平就行(即,低于批次时间间隔)。


    内存调优

    关于内存调优和控制 Spark 应用程序的 GC 表现在文档 Tuning Guide 中已经有了详细的讨论。强烈建议阅读该文档。在本章节,只讨论一些针对 Spark Streaming 应用程序的调优参数。

    Spark Streaming 应用程序需要的集群内存资源在很大程度上取决于计算逻辑中使用的转换算子。例如,如果使用了 10 分钟大小的窗口算子,那么集群必须提供足够的内存使其能容纳 10 分钟内接收到的数据,或者在使用 updateStateByKey 算子时生成了大量的键值,那么需要的内存就比较多。相反,如果只是简单的 map-filter-store 算子(ETL),需要的内存就比较少。

    通常情况下,由于接收器接收到的数据默认的持久化级别是 StorageLevel.MEMORY_AND_DISK_SER_2,内存装不下的数据会溢写到磁盘,这样会影响 Spark Streaming 任务的性能,所以建议提供足够的内存供程序使用。最好的方式是在小范围测试程序的内存使用情况并作出相应的评估。

    内存调优的另一个关键点是垃圾回收。由于流式计算任务要求低延迟,JVM 因为垃圾回收发生长时间的暂定是无法接受的。

    有一些参数可以帮助调优内存使用和 GC 开销。

    • DStream 的持久化级别:像之前 Data Serialization 章节提到的那样,输入数据和 RDD 数据默认是以序列化后的字节形式保存的,相比未序列化的格式而言,这种方式同时降低了内存使用和 GC 开销。启用 Kryo 序列化进一步减少了序列化后的数据大小和内存空间。在进一步减少内存消耗可以通过压缩来实现(参见 Spark 配置参数 spark.rdd.compress),但是代价是 CPU 的开销变大了。
    • 清理老数据:默认情况下,所有的输入数据和由 DStream 算子生成的持久化 RDD 会被自动清理。Spark Streaming 程序会自己决定何时进行数据清理。例如,如果使用了 10 分钟为大小的窗口算子,那么 Spark Streaming 会保存最近 10 分钟的数据,丢弃更早的数据。通过设置参数 streamingContext.remember 也可以让数据被保存更久。
    • CMS 垃圾回收器:强烈建议使用并发执行的标记-清除 GC 模式,这样可以将 GC 相关的暂定维持在一个低的水平。尽管并发 GC 模式会降低系统的吞吐量,依旧建议使用这种模式来实现更稳定的批次数据处理时间。请确保在 driver 端(在 spark-submit 脚本中适用 --driver-java-options 参数)和 executor 端(使用 Spark configuration spark.executor.extraJavaOptions)都设置了 CMS GC 参数。
    • 其他提示:进一步降低 GC 开销还可以做以下尝试。
      • 使用堆外内存(OFF_HEAP)持久化 RDD 数据,详情参见 Spark Programming Guide
      • 采用更多数量的小内存 executor,这样可以在每个 JVM 堆内存上缓解 GC 压力。

    重要须知
    • 一个 DStream 和一个接收器相关联,如果需要并行读取数据,就要创建多个接收器,和多个 DStream。一个接收器会在一个 executor 中运行,占用一个核。请保证除了接收器占用的核心之外,还有足够的核来支撑数据计算,即 spark.cores.max 参数需要将接收器占用的核也考虑在内。分配接收器到 executor 的路由是以轮询的方式进行的。
    • 当数据从数据源中接收到之后,接收器会创建多个数据块,每个数据块时间间隔(毫秒)内会生成一个新的数据块。在一个批次时间间隔内会生成 N 个数据块,其中 N = 批次时间间隔 / 数据块时间间隔。这些数据块由当前 executor 的 BlockManager 分发到其他 executor 的 BlockManager。之后,运行在 driver 端的 Network Input Tracker 会获悉这些数据块的位置,进行下一步的计算。
    • 在 driver 端,一个批次时间间隔内生成的数据块会用来创建一个 RDD,这些数据块就是 RDD 的分区,每个分区都在 Spark 中都是一个子任务。数据块时间间隔等于批次时间间隔意味着创建的 RDD 只会有一个单独的分区,大概率会在本地执行计算。
    • 计算这些数据块的 map 子任务会在存储该数据块的 executor(接收该数据块的 executor,和另一个备份该数据块的 executor)上执行,跟数据块时间间隔无关,除非进行了非本地的任务调度。更大的数据块时间间隔意味着更大的数据块,将参数 spark.locality.wait 设置成一个较大的值会增加在本地计算数据块的概率。需要在数据块时间间隔和 spark.locality.wait 两个参数之间寻求一个平衡点,来保证大的数据块尽量都在本地进行计算。
    • 除了依靠批次时间间隔嗯哼数据块时间间隔,还可以通过调用 inputDstream.repartition(n) 方法指定分区数。该方法会将数据重新洗牌,随机分散到 n 个分区。是的,是为了更高的并行度,尽管代价是 shuffle。一个 RDD 的计算由 driver 端的作业调度器(jobscheduler)来调度,在某一给定时刻,只有一个作业(job)会被激活。所以,如果一个作业正在执行,其他的就要排队等待。
    • 如果存在两个 DStream,那么会构建两个 RDD,并生成两个作业任务,它们会一个计算完成之后再计算另一个。为了避免多余的调度开销,可以将两个 DStream 融合成一个。这样可以保证两个 DStream 中的 RDD 会合并成一个单独的 unionRDD,也就只会生成一个作业。然而,这些 RDD 的分区并不会受影响。
    • 如果批次数据计算时间大于批次时间间隔,那么很明显接收器所使用的内存将会慢慢膨胀,最终抛出异常(通常是 BlockNotFoundException)。目前,并没有什么办法可以暂停接收器。请使用 Spark 配置参数 spark.streaming.receiver.maxRate 来限制接收器的接收速率。


    容错语义

    在本章节,会讨论在 Spark Streaming 任务计算失败时的行为。

    背景介绍

    为了理解 Spark Streaming 提供的容错语义,先来回顾一下 Spark 有关 RDD 的容错语义。

    1. RDD 是一个不可变的,可以被确定性的重新计算,分布式的数据集。每个 RDD 都记得自己的数据血缘,也就是上游的从输入源经过不同算子生成的 RDD,这些算子满足确定性计算的要求。
    2. 如果由于节点故障导致 RDD 中的某一个分区计算结果丢失,那么该分区可以通过数据血缘追溯到上游容错数据集,重新计算该分区。
    3. 假设所有的 RDD 转换操作都是确定性的,最后那个 RDD 的计算结果将永远是相同的,无论 Spark 集群发生什么样的故障。

    Spark 操作的数据来源于容错的文件系统,比如 HDFS 或者 S3。所以,从容错数据中生成的 RDD 也是容错的。然而,这与 Spark Streaming 的计算场景不同,Spark Streaming 处理的数据通常是来源于网络(除了文件数据源生成的 fileStream)。为了对所有生成的 RDD 实现相同的容错属性,接收到的数据都会在其他节点上的 executor 中做备份(默认复制因子是 2)。这样在故障发生时就会有两种类型的数据需要被恢复:

    1. 接收到的数据并且已生成副本 - 这样的数据在一个节点故障时可以通过其副本所在的节点恢复。
    2. 接收到的数据在缓冲区中但暂未生成副本 - 由于备份还没有完成,恢复这些数据唯一的方式就是从数据源重新获取。

    此外,还有两种类型的故障需要考虑:

    1. Worker 节点故障 - 任何一个运行 executor 的节点都有可能故障,故障节点内存中的数据都会丢失。如果某些接收器运行在这些节点上,接收到的存在缓冲区的数据也会丢失。
    2. Driver 节点故障 - 如果运行 Spark Streaming 应用程序 driver 的节点发生故障,显然 SparkContext 对象会失联,所有 executor 存储在内存中的数据都会丢失。

    基于以上认知,让我们来理解 Spark Streaming 的容错语义。

    定义

    流式计算系统的容错语义通常根据一条数据会被计算多少次来决定。对于所有可能的发生条件,一个流式系统可能提供三种类型的保证。

    1. At most once:每条数据最多只会被计算一次。
    2. At least once:每条数据可能会被计算一次或者多次,这种语义比 at-most once 有更有力的保障,可以确保不会有数据丢失,但是可能存在数据重复。
    3. Exactly once:每条数据只会被计算一次——没有数据会丢失,也没有数据会被计算多次。显然这是三种语义中最有力的保证。

    基本语义

    在任何的流式计算系统中,宽泛来讲,处理数据有三个步骤。

    1. 接收数据:通过接收器或其他方式从数据源抽取数据。
    2. 计算数据:通过 DStream 和 RDD 的算子对数据进行转换计算。
    3. 输出数据:最终转换后的数据被输出到外部存储系统,比如文件系统,数据库,数据看板等等。

    如果一个流式计算任务必须满足端到端的精准一次(exactly-once)容错语义,那么上面的每个步骤都应该提供精准一次的语义保证。即,每一条数据都必须只被接收一次,计算一次,输出到外部系统一次。接下来在 Spark Streaming 的计算场景中介绍三个步骤的语义。

    1. 接收数据:不同的数据源提供了不同的语义保证,会在下个子章节中讨论。
    2. 计算数据:依托于 RDD 提供的予以保证,所有已经接收到的数据只会被计算一次。即使中间过程有故障发生,只要接收到的数据还能够被重新访问,最终的 RDD 总是会包含相同的数据。
    3. 输出数据:输出算子默认情况下提供至少一次(at-least once)的容错语义保证,这取决于输出算子的类型(是否幂等)和下游存储系统的语义保证(是否支持事务)。但是用户可以自己实现事务机制达到精准一次(exactly-once)容错语义。本章节的后面会有更多讨论。

    接收数据的容错语义

    不同的数据源提供不同的容错语义保证,可以是至少一次(at-least once)或者精准一次(exactly once)。下面展开说明。

    文件数据源

    如果所有的输入数据都已经存储在了像 HDFS 这样的容错存储系统,Spark Streaming 就能够从任何类型的故障中恢复,重新计算所有的数据。这种方式提供了精准一次(exactly-once)的容错语义,意味着所有的数据都只会被计算一次,即使出现异常。

    基于接收器的数据源

    对于基于接收器的数据源,容错语义取决于故障的类型和接收器的类型。根据之前的介绍(earlier),有两种类型的接收器。

    1. 可靠的接收器 - 此类接收器在数据备份到其他节点之后才会向数据源发送确认消息。如果接收器出现异常,数据源将不会再收到存储在缓冲区的数据(暂未备份)的确认消息。所以,当接收器重启之后,数据源会重新发送这些数据,不会有数据丢失。
    2. 不可靠的接收器 - 此类接收器不需要发送确认消息,所以在接收器出现异常(节点故障或者 driver 出现异常)之后可能会丢失数据。

    根据采用的接收器的类型可以实现以下语义。如果 worker 节点发生故障,可靠的接收器不会发生数据丢失;对于不可靠的接收器,已接收但是还没有备份的数据可能会丢失。如果 driver 节点发生故障,除了前面提到的数据丢失,所有的内存中的接收到的和备份过的数据都会丢失,还会影响有状态算子的计算结果。

    为了避免已接收到数据不被丢失,Spark 1.2 引入了预写日志机制(write ahead logs),将接收到的数据存储到容错的系统中。如果开启预写日志机制(write-ahead logs enabled)同时使用可靠的接收器,则不会有数据丢失。在语义方面,这种方式提供至少一次的容错语义保证。

    下面的表格总结了容错语义:

    部署场景 Worker 节点故障 Driver 故障
    Spark 1.1 及更早版本
    Spark 1.2 及之后的版本同时未开启预写日志机制
    采用不可靠的接收器会导致存储在缓冲区的数据丢失,采用可靠的接收器提供至少一次的容错语义保证。 采用不可靠的接收器会导致历史接收到的数据丢失,所有类型的接收器无法提供明确的容错予以保证。
    Spark 1.2 及之后的版本同时开启预写日志机制 采用可靠的接收器不会有数据丢失,提供至少一次的容错语义保证。 采用可靠的接收器或者文件数据源不会有数据丢失,提供至少一次的容错语义保证。

    Kafka Direct API 数据源

    在 Spark 1.3 版本中引入了新的 Kafka Direct API,可以保证 Kafka 数据只会被 Spark Streaming 接收一次。基于此,如果同时使用满足精确一次语义的输出算子,就可以实现端到端的精确一次容错语义保证。更多详情参见 Kafka Integration Guide

    输出算子的容错语义

    输出算子(比如 foreachRDD)拥有至少一次(at-least once)的容错语义保证,即,在有故障发生的情况下计算结果可能被输出到外部系统不止一次。尽管在使用 saveAs***Files 算子将结果输出到文件系统时,这种方式是可以接受的(因为同样的数据在文件系统中会直接被覆写),对于其他的输出算子需要更多的努力来实现精准一次的容错语义保证。有以下两种方式。

    • 幂等更新:多次尝试写入只会出现一份相同的数据。例如,saveAs***Files 算子总是将相同的数据写出到生成的文件中。

    • 事务更新:所有的更新操作都具有事务性,所以更新操作可以原子性的只执行一次。可以通过下面的方式来实现。

      • 使用批次计算时间(可以在 foreachRDD 算子中获取)和 RDD 的分区索引来创建一个标识符,该标识符唯一标记了流式应用程序中的一个数据块。

      • 通过该唯一标识符将数据块事务性的(即,精准一次,原子性的)更新到外部系统。即,如果该标识符还没有被提交,则一起提交该分区数据和标识符。否则,如果已经提交,跳过此次的更新操作。

        dstream.foreachRDD { (rdd, time) =>
          rdd.foreachPartition { partitionIterator =>
            val partitionId = TaskContext.get.partitionId()
            val uniqueId = generateUniqueId(time.milliseconds, partitionId)
            // use this uniqueId to transactionally commit the data in partitionIterator
          }
        }
        


    接下来干点啥呢

    相关文章

      网友评论

        本文标题:05 Spark Streaming Programming G

        本文链接:https://www.haomeiwen.com/subject/vztikktx.html