美文网首页Spark
StructuredStreaming编程指南

StructuredStreaming编程指南

作者: 撸码小丑 | 来源:发表于2019-01-23 09:38 被阅读22次

    1、概述

    结构化流是一个基于Spark SQL引擎的可扩展、容错的流处理引擎。您可以用在静态数据上表示批处理计算的方式来表示流计算。Spark SQL引擎将负责递增和连续地运行它,并在流数据继续到达时更新最终结果。可以使用Scala、Java、Python或R中的DataSet/DataFrAPI API来表示流聚合、事件时间窗口、流到批连接等。在相同的优化Sql SQL引擎上执行计算。最后,系统通过检查点和提前写入日志来确保端到端的容错性。简而言之,结构化流提供了快速、可扩展、容错、端到端的一次性流处理,而用户无需考虑流。
    在内部,默认情况下,结构化流式查询使用微批处理引擎进行处理,该引擎将数据流作为一系列小批处理作业进行处理,从而实现端到端延迟,最短可达100毫秒,并且可以保证容错性。然而,自Spark 2.3以来,我们引入了一种新的低延迟处理模式,称为连续处理,它可以在至少一次保证的情况下实现低至1毫秒的端到端延迟。在查询中不更改数据集/数据帧操作的情况下,您可以根据应用程序要求选择模式。
    在本文中,将介绍编程模型和API。将主要使用默认的微批量处理模型来解释这些概念,然后讨论连续处理模型。首先,从一个简单的结构化流式查询示例开始——流式WordCount。

    2、快速入门

    假设你希望维护从侦听TCP Socket的数据服务器接收的文本数据的word count。让我们看看如何使用structured streaming来表达这一点。首先,我们必须导入必要的类并创建本地SparkSession,这是与Spark相关的所有功能的起点。

    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.sql.*;
    import org.apache.spark.sql.streaming.StreamingQuery;
    
    import java.util.Arrays;
    import java.util.Iterator;
    
    SparkSession spark = SparkSession
      .builder()
      .appName("JavaStructuredNetworkWordCount")
      .getOrCreate();
    

    接下来,让我们创建一个流式DataFrame,它表示从侦听localhost:9999的服务器接收到的文本数据,并转换该DataFrame以计算单词字数。

    // Create DataFrame representing the stream of input lines from connection to localhost:9999
    Dataset<Row> lines = spark
      .readStream()
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load();
    
    // Split the lines into words
    Dataset<String> words = lines
      .as(Encoders.STRING())
      .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());
    
    // Generate running word count
    Dataset<Row> wordCounts = words.groupBy("value").count();
    

    此行数据帧表示包含流式文本数据的无边界表。此表包含一列名为“value”的字符串,流式文本数据中的每一行将成为表中的一行。注意,由于我们只是在设置转换,所以目前还没有接收到任何数据,而且还没有启动转换。接下来,我们使用.as(encoders.string())将数据帧转换为字符串数据集,这样我们就可以应用flatmap操作将每一行拆分为多个单词。结果单词数据集包含所有单词。最后,我们通过对数据集中的唯一值进行groupby并对其进行计数来定义WordCounts数据帧。注意,这是一个流数据帧,它表示流的运行单词计数。

    我们现在已经设置了对流数据的查询。剩下的就是实际开始接收数据并计算计数。为此,我们将其设置为每次更新时向控制台打印完整的计数集(由outputmode("complete")指定)。然后使用start()启动流计算。

    // Start running the query that prints the running counts to the console
    StreamingQuery query = wordCounts.writeStream()
      .outputMode("complete")
      .format("console")
      .start();
    
    query.awaitTermination();
    

    执行此代码后,流计算将在后台启动。查询对象是该活动流查询的句柄,我们决定使用waitTermination()等待查询的终止,以防止查询活动时进程退出。

    要实际执行这个示例代码,首先需要使用

    $ nc -lk 9999
    

    然后,在另一个终端中,可以使用

    $ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999
    

    然后,在运行netcat服务器的终端中键入的任何行都将被计数并每秒在屏幕上打印。像下面这样:

    # TERMINAL 1:
    # Running Netcat
    
    $ nc -lk 9999
    apache spark
    apache hadoop
    
    
    # TERMINAL 2: RUNNING JavaStructuredNetworkWordCount
    
    $ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999
    
    -------------------------------------------
    Batch: 0
    -------------------------------------------
    +------+-----+
    | value|count|
    +------+-----+
    |apache|    1|
    | spark|    1|
    +------+-----+
    
    -------------------------------------------
    Batch: 1
    -------------------------------------------
    +------+-----+
    | value|count|
    +------+-----+
    |apache|    2|
    | spark|    1|
    |hadoop|    1|
    +------+-----+
    ...
    

    3、编程模型

    结构化流中的关键思想是将实时数据流视为一个不断追加的表。这导致了一个新的流处理模型,与批处理模型非常相似。您将把流计算表示为与静态表类似的标准批处理查询,spark将在无边界输入表上以增量查询的形式运行它。让我们更详细地了解这个模型。

    3.1、基本概念

    想象把流数据当成一个'Input Table',每个data item到来后都会追加到这个table里面。


    image.png

    对输入的查询将生成“结果表”。每一个触发间隔(比如说,每1秒),新的行都会附加到输入表中,这最终会更新结果表。每当结果表更新时,我们都希望将更改后的结果行写入外部接收器。

    image.png

    “输出”定义为写入外部存储器的内容。可以在不同的模式下定义输出:

    • 完成模式-整个更新的结果表将写入外部存储器。由存储连接器决定如何处理整个表的写入。
    • 追加模式-只有自上一个触发器以来追加到结果表中的新行才会写入外部存储器。这仅适用于结果表中不希望更改现有行的查询。
    • 更新模式-只有自上次触发器以来在结果表中更新的行才会写入外部存储器(从spark 2.1.1开始可用)。请注意,这与完整模式不同,因为此模式只输出自上一个触发器以来已更改的行。如果查询不包含聚合,则等同于追加模式。
      请注意,每个模式都适用于某些类型的查询。这将在后面详细讨论。
      为了说明这个模型的使用,让我们在上面的快速示例的上下文中理解这个模型。第一行数据框是输入表,最后一行字数数据框是结果表。请注意,在流式处理行DataFreame上生成单词计数的查询与静态DataFreame的查询完全相同。但是,当这个查询启动时,spark将不断检查来自socket连接的新数据。如果有新的数据,spark将运行一个“增量”查询,将以前运行的计数与新的数据结合起来,以计算更新的计数,如下所示。


      image.png

    请注意,结构化流并没有具体化整个表。它从流数据源中读取最新的可用数据,然后递增处理以更新结果,然后丢弃源数据。它只保留更新结果所需的最小中间状态数据(例如前面示例中的中间计数)。

    此模型与许多其他流处理引擎显著不同。许多流系统要求用户自己维护正在运行的聚合,因此必须考虑容错性和数据一致性(至少一次、最多一次或完全一次)。在这个模型中,spark负责在有新数据时更新结果表,从而减少用户对结果表的推理。作为一个例子,让我们看看这个模型如何处理基于事件时间的处理和延迟到达的数据。

    3.2、处理事件时间和延迟数据

    事件时间是嵌入到数据本身中的时间。对于许多应用程序,您可能希望在此事件时间上进行操作。例如,如果您希望获得每分钟由物联网设备生成的事件数,那么您可能希望使用生成数据的时间(即数据中的事件时间),而不是Spark接收数据的时间。这个事件时间很自然地用这个模型表示——设备中的每个事件都是表中的一行,而事件时间是行中的一列值。这允许基于窗口的聚合(例如,每分钟事件数)只是事件时间列上特殊类型的分组和聚合-每个时间窗口都是一个组,并且每一行可以属于多个窗口/组。因此,这种基于事件时间窗口的聚合查询既可以在静态数据集(例如,从收集的设备事件日志中)上定义,也可以在数据流上定义,从而使用户的生活更加方便。

    此外,该模型根据事件时间自然地处理比预期晚到达的数据。由于Spark正在更新结果表,因此它可以完全控制在有延迟数据时更新旧聚合,以及清除旧聚合以限制中间状态数据的大小。自Spark2.1以来,我们支持做标记,允许用户指定最新数据的阈值,并允许引擎相应地清除旧状态。稍后将在窗口操作部分中更详细地解释这些内容。

    3.3、容错语义

    只交付一次端到端语义是结构化流设计背后的关键目标之一。为了实现这一点,spark设计了结构化的流媒体源、接收器和执行引擎,以便可靠地跟踪处理的确切进度,以便通过重新启动和/或重新处理来处理任何类型的故障。假设每个流源都有偏移量(类似于Kafka偏移量或Kinesis序列号),以跟踪流中的读取位置。引擎使用检查点和提前写入日志来记录每个触发器中正在处理的数据的偏移范围。流水槽设计成等量处理后处理。同时,使用可重放源和等量汇点,结构化流可以确保在任何故障下端到端的语义都是一次性的。

    4、使用DataSet和DataFrames的API

    由于Spark 2.0,DataSet和DataFrame可以表示静态的有界数据,也可以表示流式的无界数据。与静态DataSet和DataFrame类似,你可以使用公共入口点SparkSession从流源创建流DataSets和DataFrames,并将它们作为静态DataSet和DataFrame应用于相同的操作。

    4.1、创建流式DataSets和DataFrames

    Streaming DataFrames可以通过DataStreamReader接口创建。与创建静态DataFrame的读取接口类似,您可以指定源的详细信息—数据格式、模式、选项等。

    • Input Sources
      有一些内置资源。
      1.File source -文件源-读取以数据流形式写入目录的文件。支持的文件格式有文本、csv、json、orc、parquet。注意,文件必须原子地放置在给定的目录中,在大多数文件系统中,可以通过文件移动操作来实现。
      2.Kafka source -卡夫卡来源-从卡夫卡读取数据。它与Kafka经纪人0.10.0或更高版本兼容。有关详细信息,请参阅《卡夫卡集成指南》。
      3.Socket source -套接字源(用于测试)-从套接字连接读取utf8文本数据。侦听服务器套接字位于驱动程序处。请注意,这只能用于测试,因为这不提供端到端的容错保证。
      4.Rate source-速率源(用于测试)-以每秒指定的行数生成数据,每个输出行包含时间戳和值。其中timestamp是包含消息调度时间的timestamp类型,value是包含消息计数的long类型,从0开始作为第一行。此源用于测试和基准测试。

    有些源不具有容错性,因为它们不能保证在失败后可以使用检查点偏移量重播数据。请参见前面关于容错语义的部分。以下是Spark中所有来源的详细信息。


    image.png

    下面是一些例子。

    SparkSession spark = ...
    
    // Read text from socket
    Dataset<Row> socketDF = spark
      .readStream()
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load();
    
    socketDF.isStreaming();    // Returns True for DataFrames that have streaming sources
    
    socketDF.printSchema();
    
    // Read all the csv files written atomically in a directory
    StructType userSchema = new StructType().add("name", "string").add("age", "integer");
    Dataset<Row> csvDF = spark
      .readStream()
      .option("sep", ";")
      .schema(userSchema)      // Specify schema of the csv files
      .csv("/path/to/directory");    // Equivalent to format("csv").load("/path/to/directory")
    

    这些示例生成未类型化的流式DataFrame,这意味着在编译时不检查DataFrame的架构,只在提交查询时在运行时检查。一些操作(如map、flatmap等)需要在编译时知道类型。为此,可以使用与静态数据帧相同的方法将这些非类型化流数据帧转换为类型化流数据集。有关详细信息,请参阅《SQL编程指南》。此外,有关支持的流媒体源的更多详细信息将在文档的后面讨论。

    4.2、流式DataFrames/Datasets的模式推断和划分

    您可以对流式DataFrames/Datasets应用各种操作—从非类型化、类似SQL的操作(例如select、where、groupby)到类似RDD的类型化操作(例如map、filter、flatmap)。有关详细信息,请参阅《SQL编程指南》。让我们来看几个您可以使用的示例操作。

    • 基本操作--Selection, Projection, Aggregation
      DataFrame/Dataset上的大多数常见操作都支持流式处理。本节稍后将讨论一些不受支持的操作。
    import org.apache.spark.api.java.function.*;
    import org.apache.spark.sql.*;
    import org.apache.spark.sql.expressions.javalang.typed;
    import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
    
    public class DeviceData {
      private String device;
      private String deviceType;
      private Double signal;
      private java.sql.Date time;
      ...
      // Getter and setter methods for each field
    }
    
    Dataset<Row> df = ...;    // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType }
    Dataset<DeviceData> ds = df.as(ExpressionEncoder.javaBean(DeviceData.class)); // streaming Dataset with IOT device data
    
    // Select the devices which have signal more than 10
    df.select("device").where("signal > 10"); // using untyped APIs
    ds.filter((FilterFunction<DeviceData>) value -> value.getSignal() > 10)
      .map((MapFunction<DeviceData, String>) value -> value.getDevice(), Encoders.STRING());
    
    // Running count of the number of updates for each device type
    df.groupBy("deviceType").count(); // using untyped API
    
    // Running average signal for each device type
    ds.groupByKey((MapFunction<DeviceData, String>) value -> value.getDeviceType(), Encoders.STRING())
      .agg(typed.avg((MapFunction<DeviceData, Double>) value -> value.getSignal()));
    

    还可以将流式数据帧/数据集注册为临时视图,然后对其应用SQL命令。

    df.createOrReplaceTempView("updates");
    spark.sql("select count(*) from updates");  // returns another streaming DF
    

    注意,可以使用df.isStreaming来标识数据帧/数据集是否具有流数据。

    df.isStreaming()
    
    4.3、Window Operations on Event Time

    滑动事件时间窗口上的聚合对于结构化流非常简单,并且与分组聚合非常相似。在分组聚合中,为用户指定的分组列中的每个唯一值维护聚合值(例如计数)。对于基于窗口的聚合,将为行的事件时间所在的每个窗口维护聚合值。让我们用一个例子来理解这一点。

    假设我们的快速示例被修改了,流现在包含了行以及生成行的时间。我们不需要运行单词计数,而是希望在10分钟的窗口内对单词进行计数,每5分钟更新一次。也就是说,单词在10分钟窗口12:00-12:10、12:05-12:15、12:10-12:20等时间段内接收的单词中计数。请注意,12:00-12:10表示12:00之后但12:10之前到达的数据。现在,考虑一下12:07收到的一个词。这个词应该增加对应于两个窗口12:00-12:10和12:05-12:15的计数。因此,计数将由分组键(即字)和窗口(可以从事件时间计算)这两个参数索引。

    结果表如下所示。


    image.png

    由于此窗口化与分组类似,因此在代码中,可以使用groupby()和window()操作来表示窗口化聚合。

    Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
    
    // Group the data by window and word and compute the count of each group
    Dataset<Row> windowedCounts = words.groupBy(
      functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
      words.col("word")
    ).count();
    
    • 处理后期数据和水印
      现在考虑一下如果其中一个事件延迟到达应用程序会发生什么。例如,在12:04(即事件时间)生成的单词可以在12:11被应用程序接收。应用程序应使用时间12:04而不是12:11更新窗口12:00-12:10的旧计数。这在基于窗口的分组中自然发生——结构化流可以长时间保持部分聚合的中间状态,以便后期数据可以正确更新旧窗口的聚合,如下图所示。


      image.png

      但是,要运行这个查询几天,系统必须绑定它在内存状态中累积的中间量。这意味着系统需要知道何时可以从内存状态中除去旧聚合,因为应用程序将不再接收该聚合的延迟数据。为了实现这一点,在Spark2.1中,我们引入了水印技术,它允许引擎自动跟踪数据中的当前事件时间,并尝试相应地清除旧状态。您可以通过指定事件时间列和阈值来定义查询的水印,该阈值说明数据在事件时间方面的预计延迟时间。对于从时间t开始的特定窗口,引擎将保持状态并允许延迟数据更新状态,直到(引擎看到的最大事件时间-延迟阈值>t)。换句话说,阈值内的延迟数据将被聚合,但超过阈值的数据将开始下降(有关确切的保证,请参阅本节后面的部分)。让我们用一个例子来理解这一点。我们可以很容易地使用withwatermark()在前面的示例中定义水印,如下所示。

    Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
    
    // Group the data by window and word and compute the count of each group
    Dataset<Row> windowedCounts = words
        .withWatermark("timestamp", "10 minutes")
        .groupBy(
            functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
            words.col("word"))
        .count();
    

    在本例中,我们定义了查询的水印“timestamp”列的值,还定义了“10分钟”作为允许数据延迟的阈值。如果在更新输出模式下运行此查询(稍后在输出模式部分中讨论),则引擎将继续更新结果表中窗口的计数,直到窗口比水印旧,而水印比“timestamp”列中的当前事件时间落后10分钟。这是一个例子。


    image.png

    如图所示,引擎跟踪的最大事件时间是蓝色虚线,每个触发器开始时设置为(最大事件时间-“10分钟”)的水印是红线。例如,当引擎观察数据(12:14,dog)时,它将下一个触发器的水印设置为12:04。这个水印允许引擎在额外的10分钟内保持中间状态,以便计算延迟的数据。例如,数据(12:09,cat)出现故障和延迟,并落在Windows 12:00-12:10和12:05-12:15中。由于它仍在触发器中的水印12:04之前,因此引擎仍将中间计数保持为状态,并正确更新相关窗口的计数。但是,当水印更新到12:11时,窗口的中间状态(12:00-12:10)被清除,所有后续数据(例如(12:04,驴))被视为“太晚”,因此被忽略。请注意,在每个触发器之后,更新的计数(即紫色行)都会写入sink作为触发器输出,这由更新模式决定。

    某些接收器(如文件)可能不支持更新模式所需的细粒度更新。为了使用它们,我们还支持附加模式,其中只有最终计数被写入sink。如下所示。
    请注意,在非流式数据集中使用withwatermark是不起作用的。由于水印不应以任何方式影响任何批查询,因此我们将直接忽略它。


    image.png

    与之前的更新模式类似,引擎为每个窗口保持中间计数。但是,部分计数不会更新到结果表,也不会写入接收器。引擎等待“10分钟”计算延迟日期,然后将窗口的中间状态<水印,并将最终计数附加到结果表/接收器。例如,只有在水印更新为12:11之后,才会将窗口12:00-12:10的最终计数追加到结果表中。

    • 水印清除聚合状态的条件
      需要注意的是,水印必须满足以下条件才能清除聚合查询中的状态(从spark 2.1.1开始,以后可能会更改)。
      1.输出模式必须是追加或更新。完整模式要求保留所有聚合数据,因此不能使用水印删除中间状态。有关每个输出模式语义的详细说明,请参阅输出模式部分。
      2.聚合必须具有事件时间列或事件时间列上的窗口。
      3.必须在与聚合中使用的时间戳列相同的列上调用WithWatermark。例如,df.withWatermark(“time”,“1 min”).groupby(“time2”).count()在追加输出模式下无效,因为水印是在聚合列的不同列上定义的。
      4.必须在聚合之前调用WithWatermark才能使用水印详细信息。例如,df.groupby(“time”).count().withWatermark(“time”,“1 min”)在追加输出模式下无效。

    • 水印聚合的语义保证
      水印延迟(用水印设置)为“2小时”,保证引擎不会丢弃任何延迟时间小于2小时的数据。也就是说,任何比最新处理的数据晚2小时(就事件时间而言)以内的数据都保证被聚合。
      但是,担保只在一个方向上是严格的。延迟超过2小时的数据不一定会被删除;它可能会被聚合,也可能不会被聚合。数据越晚,引擎处理数据的可能性就越小。

    4.4、Join Operations

    结构化流支持将流数据集/数据帧与静态数据集/数据帧以及另一个流数据集/数据帧连接起来。流连接的结果是递增生成的,类似于上一节中的流聚合结果。在本节中,我们将探讨在上述情况下支持的连接类型(即内部、外部等)。请注意,在所有支持的联接类型中,使用流数据集/数据帧进行联接的结果将与使用流中包含相同数据的静态数据集/数据帧时的结果完全相同。

    • Stream-static Joins
      自从Spark2.0引入以来,结构化流支持流和静态数据帧/数据集之间的连接(内部连接和某些类型的外部连接)。下面是一个简单的例子。
    Dataset<Row> staticDf = spark.read(). ...;
    Dataset<Row> streamingDf = spark.readStream(). ...;
    streamingDf.join(staticDf, "type");         // inner equi-join with a static DF
    streamingDf.join(staticDf, "type", "right_join");  // right outer join with a static DF
    

    注意,流静态连接不是有状态的,因此不需要状态管理。但是,还不支持几种类型的流静态外部联接。这些类型在后面会有详细的介绍。

    • Stream-stream Joins
      在Spark2.3中,我们增加了对流流连接的支持,也就是说,您可以连接两个流数据集/数据帧。在两个数据流之间生成连接结果的挑战在于,在任何时间点,数据集的视图对于连接的两边都是不完整的,这使得在输入之间查找匹配更加困难。从一个输入流接收到的任何行都可以与将来的任何行匹配,但仍将从另一个输入流接收到该行。因此,对于这两个输入流,我们将过去的输入缓冲为流状态,这样我们可以将未来的每个输入与过去的输入匹配,并相应地生成联接的结果。此外,与流聚合类似,我们自动处理延迟的无序数据,并可以使用水印限制状态。让我们讨论支持的流连接的不同类型以及如何使用它们。
      Inner Joins with optional Watermarking
      支持任何类型的列上的内部联接以及任何类型的联接条件。但是,当流运行时,流状态的大小将无限期地增长,因为必须保存所有过去的输入,因为任何新输入都可以与过去的任何输入匹配。为了避免无边界状态,您必须定义额外的连接条件,以便使不确定的旧输入不能与将来的输入匹配,因此可以从状态中清除。换言之,您将不得不在联接中执行以下附加步骤。
      1.在两个输入上定义水印延迟,以便引擎知道输入的延迟程度(类似于流聚合)
      2.在两个输入之间定义一个事件时间约束,这样引擎就可以计算出一个输入的旧行何时不需要(即不满足时间约束)与另一个输入匹配。这个约束可以用两种方法之一定义。
      1.时间范围联接条件(例如,在RightTime和RightTime之间的LeftTime上联接+间隔1小时),
      2.在事件时间窗口上加入(例如…在LeftTimeWindow上加入=RightTimeWindow)。

    让我们用一个例子来理解这一点。

    假设我们想将一个广告印象流(显示广告时)与另一个用户点击广告流连接起来,以便在印象导致可货币化点击时进行关联。要允许此流连接中的状态清理,您必须指定水印延迟和时间约束,如下所示。
    1.水印延迟:例如,事件时间中的印痕和相应的点击可能延迟/无序,分别最多2小时和3小时。
    2.事件时间范围条件:例如,在相应的印象后0秒到1小时的时间范围内可能会发生一次单击。

    代码应该是这样的。

    import static org.apache.spark.sql.functions.expr
    
    Dataset<Row> impressions = spark.readStream(). ...
    Dataset<Row> clicks = spark.readStream(). ...
    
    // Apply watermarks on event-time columns
    Dataset<Row> impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours");
    Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours");
    
    // Join with event-time constraints
    impressionsWithWatermark.join(
      clicksWithWatermark,
      expr(
        "clickAdId = impressionAdId AND " +
        "clickTime >= impressionTime AND " +
        "clickTime <= impressionTime + interval 1 hour ")
    );
    

    带水印的流内部连接的语义保证
    这类似于在聚合上添加水印提供的保证。水印延迟“2小时”保证引擎不会丢弃任何延迟时间小于2小时的数据。但延迟超过2小时的数据可能会被处理,也可能不会被处理。

    Outer Joins with Watermarking
    对于内部联接,水印+事件时间约束是可选的,而对于左侧和右侧外部联接,则必须指定它们。这是因为为了在外部联接中生成空结果,引擎必须知道将来何时输入行将与任何内容不匹配。因此,必须指定水印+事件时间约束以生成正确的结果。因此,具有外部联接的查询看起来很像前面的广告货币化示例,只是有一个额外的参数将其指定为外部联接。

    impressionsWithWatermark.join(
      clicksWithWatermark,
      expr(
        "clickAdId = impressionAdId AND " +
        "clickTime >= impressionTime AND " +
        "clickTime <= impressionTime + interval 1 hour "),
      "leftOuter"                 // can be "inner", "leftOuter", "rightOuter"
    );
    

    带水印的流外部连接的语义保证
    外部联接与内部联接在水印延迟以及是否删除数据方面具有相同的保证。
    告诫
    关于外部结果是如何产生的,有几个重要的特征需要注意:
    1.外部空结果将根据指定的水印延迟和时间范围条件生成延迟。这是因为引擎必须等待那么长的时间,以确保没有匹配,将来也不会有更多的匹配。

    2.在微批量引擎的当前实现中,水印是在微批量结束时进行的,下一个微批量使用更新后的水印来清除状态并输出外部结果。由于我们只在需要处理新数据时触发一个微批处理,因此如果流中没有接收到新数据,则外部结果的生成可能会延迟。简而言之,如果被联接的两个输入流中的任何一个在一段时间内没有接收数据,则外部(两种情况下,左或右)输出可能会延迟。

    流式查询中联接的支持列表

    有关支持的联接的其他详细信息

    • 联接可以级联,也就是说,您可以执行df1.join(df2,…).join(df3,…).join(df4,…)。
    • 从spark 2.3开始,只有当查询处于追加输出模式时,才能使用联接。还不支持其他输出模式。
    • 从spark 2.3开始,在连接之前不能使用其他非映射类操作。以下是一些无法使用的示例。
      1。在联接之前不能使用流聚合。
      2。在联接之前,不能在更新模式下使用MapGroupsWithState和FlatmapGroupsWithState。
    4.5、流式重复数据消除

    您可以使用事件中的唯一标识符来消除数据流中的重复记录。这与使用唯一标识符列的静态重复数据消除完全相同。查询将存储以前记录中所需的数据量,以便筛选重复记录。与聚合类似,您可以使用带或不带水印的重复数据消除。

    • 使用水印-如果重复记录到达的时间有上限,则可以在事件时间列上定义水印,并使用guid和事件时间列进行重复数据消除。查询将使用水印从过去的记录中删除旧的状态数据,这些记录不希望再得到任何重复数据。这限制了查询必须维护的状态量。
    • 没有水印-由于重复记录可能到达的时间没有界限,查询将所有过去记录的数据存储为状态。
    Dataset<Row> streamingDf = spark.readStream(). ...;  // columns: guid, eventTime, ...
    
    // Without watermark using guid column
    streamingDf.dropDuplicates("guid");
    
    // With watermark using guid and eventTime columns
    streamingDf
      .withWatermark("eventTime", "10 seconds")
      .dropDuplicates("guid", "eventTime");
    
    4.6、处理多个水印的策略

    流查询可以有多个联合或连接在一起的输入流。每个输入流可以有一个不同的延迟数据阈值,对于有状态的操作,这些阈值需要被容忍。在每个输入流上使用withWatermarks("eventtime",delay)指定这些阈值。例如,考虑使用inputstream1和inputstream2之间的流连接进行查询。

    inputStream1.withWatermark(“eventTime1”, “1 hour”) .join( inputStream2.withWatermark(“eventTime2”, “2 hours”), joinCondition)
    

    在执行查询时,结构化流单独跟踪每个输入流中看到的最大事件时间,根据相应的延迟计算水印,并选择一个带有它们的全局水印用于状态操作。默认情况下,选择最小值作为全局水印,因为这样可以确保如果其中一个流落后于另一个流(例如,其中一个流由于上游故障而停止接收数据),则不会意外地将任何数据拖得太晚。换句话说,全局水印将以最慢流的速度安全移动,查询输出将相应延迟。
    但是,在某些情况下,您可能希望获得更快的结果,即使这意味着从最慢的流中删除数据。由于Spark 2.4,可以通过将SQL配置spark.sql.streaming.multipleWatermarkPolicy to max (default is min),将多水印策略设置为选择最大值作为全局水印。这使全局水印以最快的流速度移动。但是,作为一个副作用,来自较慢流的数据将被大量丢弃。因此,明智地使用这个配置。

    4.7、任意状态操作

    许多用例需要比聚合更高级的有状态操作。例如,在许多用例中,您必须从事件的数据流中跟踪会话。为了进行这种会话化,必须将任意类型的数据保存为状态,并使用每个触发器中的数据流事件对状态执行任意操作。由于spark 2.2,可以使用操作mapGroupsWithState 和更强大的操作flatMapGroupsWithState来完成此操作。这两个操作都允许您对分组数据集应用用户定义的代码以更新用户定义的状态。

    4.8、不支持的操作

    流式数据帧/数据集不支持一些数据帧/数据集操作。其中一些如下。

    • 流数据集尚不支持多个流聚合(即流数据集中的聚合链)。

    • 流数据集不支持限制行和取前n行。

    • 不支持对流数据集执行不同的操作。

    • 只有在聚合之后并且处于完全输出模式时,流数据集才支持排序操作。

    • 不支持流数据集上的几种类型的外部联接。有关更多详细信息,请参阅“连接操作”部分中的支持矩阵。

    此外,还有一些数据集方法不适用于流数据集。它们是将立即运行查询并返回结果的操作,这对流数据集没有意义。相反,这些功能可以通过显式启动流式查询来完成(请参见下一节)。

    • count()-无法从流数据集中返回单个计数。相反,使用ds.groupby().count()返回包含运行计数的流数据集。

    • foreach()-改为使用ds.writestream.foreach(…)(参见下一节)。

    • show()-而是使用控制台接收器(参见下一节)。

    如果您尝试这些操作中的任何一个,您将看到类似“流式数据帧/数据集不支持操作xyz”的分析异常。虽然其中一些可能在未来的Spark版本中得到支持,但还有一些基本上难以有效地在流数据上实现。例如,不支持对输入流进行排序,因为它需要跟踪流中接收的所有数据。因此,从根本上说,这很难有效地执行。

    5、开始流式查询

    一旦定义了最终结果数据帧/数据集,剩下的就是开始流计算。要做到这一点,您必须使用DataStreamWriter ,通过 Dataset.writeStream(),您必须在此接口中指定以下一个或多个选项。

    • Details of the output sink:输出接收器的详细信息:数据格式、位置等。

    • Output mode: 输出模式:指定写入输出接收器的内容。

    • Query name:查询名称:可以选择指定查询的唯一名称以进行标识。

    • Trigger interval: 触发间隔:可以选择指定触发间隔。如果未指定,系统将在上一次处理完成后立即检查新数据的可用性。如果由于前一个处理未完成而错过触发时间,则系统将立即触发处理。

    • Checkpoint location:检查点位置:对于一些可以保证端到端容错的输出接收器,指定系统写入所有检查点信息的位置。这应该是HDFS兼容的容错文件系统中的一个目录。下一节将更详细地讨论检查点的语义。

    5.1、Output Modes输出模式

    有几种类型的输出模式。

    • Append mode (default) -附加模式(默认)-这是默认模式,其中只有自上一个触发器以来添加到结果表的新行才会输出到接收器。只有添加到结果表的行永远不会更改的查询才支持此功能。因此,此模式保证每行只输出一次(假设容错接收器)。例如,只有select、where、map、flatmap、filter、join等的查询将支持追加模式。

    • Complete mode -完全模式-每次触发后,整个结果表都将输出到接收器。聚合查询支持此功能。

    • Update mode - 更新模式-(从spark 2.1.1开始可用)只有结果表中自上次触发器以来更新的行将输出到接收器。更多信息将添加到将来的版本中。

    不同类型的流式查询支持不同的输出模式。这是兼容性矩阵。


    5.2、Output Sinks

    有几种类型的内置输出接收器。

    • File sink - Stores the output to a directory.
    writeStream
        .format("parquet")        // can be "orc", "json", "csv", etc.
        .option("path", "path/to/destination/dir")
        .start()
    
    • Kafka sink - Stores the output to one or more topics in Kafka.
    writeStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
        .option("topic", "updates")
        .start()
    
    • foreach sink-对输出中的记录执行任意计算。有关详细信息,请参阅本节后面的内容。
    writeStream
        .foreach(...)
        .start()
    
    • Console sink (for debugging) -控制台接收器(用于调试)-每次有触发器时都将输出打印到控制台/stdout。支持附加和完整输出模式。这应该用于在低数据量上进行调试,因为在每次触发之后,整个输出都被收集并存储在驱动程序的内存中。
    writeStream
        .format("console")
        .start()
    
    • Memory sink (for debugging)存表存储在内存中。支持附加和完整输出模式。当整个输出被收集并存储在驱动程序内存中时,这应该用于在低数据量上进行调试。因此,谨慎使用。
    writeStream
        .format("memory")
        .queryName("tableName")
        .start()
    

    有些接收器不能容错,因为它们不能保证输出的持久性,并且仅用于调试目的。请参见前面关于容错语义的部分。以下是spark中所有水槽sinks的细节。



    注意,必须调用start()才能实际开始执行查询。这将返回一个streamingquery对象,该对象是连续运行执行的句柄。您可以使用这个对象来管理查询,我们将在下一小节中讨论这个问题。现在,让我们用几个例子来理解这一切。

    // ========== DF with no aggregations ==========
    Dataset<Row> noAggDF = deviceDataDf.select("device").where("signal > 10");
    
    // Print new data to console
    noAggDF
      .writeStream()
      .format("console")
      .start();
    
    // Write new data to Parquet files
    noAggDF
      .writeStream()
      .format("parquet")
      .option("checkpointLocation", "path/to/checkpoint/dir")
      .option("path", "path/to/destination/dir")
      .start();
    
    // ========== DF with aggregation ==========
    Dataset<Row> aggDF = df.groupBy("device").count();
    
    // Print updated aggregations to console
    aggDF
      .writeStream()
      .outputMode("complete")
      .format("console")
      .start();
    
    // Have all the aggregates in an in-memory table
    aggDF
      .writeStream()
      .queryName("aggregates")    // this query name will be the table name
      .outputMode("complete")
      .format("memory")
      .start();
    
    spark.sql("select * from aggregates").show();   // interactively query in-memory table
    

    使用foreach和foreachbatch
    foreach和foreachbatch操作允许您对流式查询的输出应用任意操作和写入逻辑。它们有稍微不同的用例——虽然foreach允许在每一行上自定义写入逻辑,但是foreach batch允许在每个微批的输出上执行任意操作和自定义逻辑。让我们更详细地了解它们的用法。
    ForeachBatch
    foreachbatch(…)允许您指定对流式查询的每个微批的输出数据执行的函数。自Spark 2.4以来,这在scala、Java和Python中得到了支持。它需要两个参数:一个数据帧或数据集,该数据帧或数据集具有微批的输出数据和微批的唯一ID。

    streamingDatasetOfString.writeStream().foreachBatch(
      new VoidFunction2<Dataset<String>, Long> {
        public void call(Dataset<String> dataset, Long batchId) {
          // Transform and write batchDF
        }    
      }
    ).start();
    

    使用foreachbatch,可以执行以下操作。

    • 重用现有的批处理数据源-对于许多存储系统,可能还没有可用的流接收器,但可能已经存在用于批处理查询的数据编写器。使用foreachbatch,可以在每个微批的输出上使用批处理数据编写器。
    • 写入多个位置-如果要将流式查询的输出写入多个位置,则只需多次写入输出数据帧/数据集。但是,每次尝试写入都会导致重新计算输出数据(包括可能重新读取输入数据)。为了避免重新计算,应该缓存输出数据帧/数据集,将其写入多个位置,然后取消缓存。这是一个大纲。
    streamingdf.writestream.forachbatch{(batchdf:dataframe,batchid:long)=>
    batchdf.persist()
    batchdf.write.format(…)/位置1 
    batchdf.write.format(…)/位置2 
    batchdf.unpersist()
    }
    
    • 应用其他数据帧操作-流式数据帧中不支持许多数据帧和数据集操作,因为Spark在这些情况下不支持生成增量计划。使用foreachbatch,可以对每个微批处理输出应用其中一些操作。但是,您必须对自己执行该操作的端到端语义进行推理。

    注:

    • 默认情况下,foreachbatch至少提供一次写入保证。但是,您可以使用提供给函数的batchID作为消除重复输出并获得一次性保证的方法。
    • foreachbatch不使用连续处理模式,因为它从根本上依赖于流式查询的微批处理执行。如果以连续模式写入数据,请改用foreach。

    Foreach
    如果foreach batch不是一个选项(例如,相应的批数据编写器不存在,或者不存在连续处理模式),则可以使用foreach表示自定义编写器逻辑。具体来说,您可以将数据写入逻辑划分为三种方法:open, process, and close. 。自从Scale 2.4以来,Foreach在Scala、Java和Python中可用。
    In Java, you have to extend the class ForeachWriter

    streamingDatasetOfString.writeStream().foreach(
      new ForeachWriter[String] {
    
        @Override public boolean open(long partitionId, long version) {
          // Open connection
        }
    
        @Override public void process(String record) {
          // Write string to connection
        }
    
        @Override public void close(Throwable errorOrNull) {
          // Close the connection
        }
      }
    ).start();
    

    执行语义:当启动流式查询时,spark以以下方式调用函数或对象的方法:

    • 此对象的单个副本负责查询中单个任务生成的所有数据。换句话说,一个实例负责处理以分布式方式生成的数据的一个分区。

    • 此对象必须是可序列化的,因为每个任务都将获得所提供对象的新的序列化反序列化副本。因此,强烈建议对写入数据进行任何初始化(例如。打开连接或启动事务)是在调用open()方法之后完成的,这意味着任务已准备好生成数据。

    • 方法的生命周期如下:
      For each partition with partition_id:

      • For each batch/epoch of streaming data with epoch_id:
        • Method open(partitionId, epochId) is called.
        • If open(…) returns true, for each row in the partition and batch/epoch, method process(row) is called.
        • Method close(error) is called with error (if any) seen while processing rows.
    • 如果存在open()方法并成功返回(与返回值无关),则调用close()方法(如果存在),除非jvm或python进程在中间崩溃。

    • 注意:当失败导致重新处理某些输入数据时,open()方法中的partitionId and epochId可用于对生成的数据进行重复数据消除。这取决于查询的执行模式。如果流式查询是在微批处理模式下执行的,那么由一个唯一元组(partition_id, epoch_id)表示的每个分区都保证具有相同的数据。因此,(partition_id,epoch_id)可以用于消除重复和/或事务性提交数据,并实现一次性保证。但是,如果流式查询是在连续模式下执行的,则此保证不适用,因此不应用于重复数据消除。

    5.3、触发器

    流式查询的触发器设置定义了流式数据处理的时间,无论该查询是作为具有固定批处理间隔的微批处理查询还是作为连续处理查询执行。以下是支持的不同类型的触发器。



    下面是一些实例

    import org.apache.spark.sql.streaming.Trigger
    
    // Default trigger (runs micro-batch as soon as it can)
    df.writeStream
      .format("console")
      .start();
    
    // ProcessingTime trigger with two-seconds micro-batch interval
    df.writeStream
      .format("console")
      .trigger(Trigger.ProcessingTime("2 seconds"))
      .start();
    
    // One-time trigger
    df.writeStream
      .format("console")
      .trigger(Trigger.Once())
      .start();
    
    // Continuous trigger with one-second checkpointing interval
    df.writeStream
      .format("console")
      .trigger(Trigger.Continuous("1 second"))
      .start();
    

    6、管理流式查询

    启动查询时创建的StreamingQuery可用于监视和管理查询。

    StreamingQuery query = df.writeStream().format("console").start();   // get the query object
    
    query.id();          // get the unique identifier of the running query that persists across restarts from checkpoint data
    
    query.runId();       // get the unique id of this run of the query, which will be generated at every start/restart
    
    query.name();        // get the name of the auto-generated or user-specified name
    
    query.explain();   // print detailed explanations of the query
    
    query.stop();      // stop the query
    
    query.awaitTermination();   // block until query is terminated, with stop() or with error
    
    query.exception();       // the exception if the query has been terminated with error
    
    query.recentProgress();  // an array of the most recent progress updates for this query
    
    query.lastProgress();    // the most recent progress update of this streaming query
    

    您可以在单个SparkSession中启动任意数量的查询。它们都将同时运行,共享集群资源。您可以使用sparkSession.streams()来获得用于管理当前活动查询的StreamingQueryManager

    SparkSession spark = ...
    
    spark.streams().active();    // get the list of currently active streaming queries
    
    spark.streams().get(id);   // get a query object by its unique id
    
    spark.streams().awaitAnyTermination();   // block until any one of them terminates
    

    7、监控流式查询

    有多种方法可以监视活动的流式查询。您可以使用Spark的Dropwizard度量支持将度量推送到外部系统,也可以通过编程方式访问它们。

    7.1、交互读取度量值

    您可以使用streamingQuery.lastProgress() and streamingQuery.status().lastProgress()直接获取活动查询的当前状态和度量值。lastprogress()返回StreamingQueryProgress in scala and java和一个在python中具有相同字段的字典。它包含有关流最后一个触发器中的进度的所有信息—处理了哪些数据、处理速率、延迟等。还有streamingQuery.recentProgress,它返回最后几个进度的数组。

    另外,streamingQuery.status()返回StreamingQueryStatus inscala and java和一个在python中具有相同字段的字典。它提供有关查询正在立即执行的操作的信息—触发器是否处于活动状态、数据是否正在处理等。

    下面是几个例子。

    StreamingQuery query = ...
    
    System.out.println(query.lastProgress());
    /* Will print something like the following.
    
    {
      "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
      "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
      "name" : "MyQuery",
      "timestamp" : "2016-12-14T18:45:24.873Z",
      "numInputRows" : 10,
      "inputRowsPerSecond" : 120.0,
      "processedRowsPerSecond" : 200.0,
      "durationMs" : {
        "triggerExecution" : 3,
        "getOffset" : 2
      },
      "eventTime" : {
        "watermark" : "2016-12-14T18:45:24.873Z"
      },
      "stateOperators" : [ ],
      "sources" : [ {
        "description" : "KafkaSource[Subscribe[topic-0]]",
        "startOffset" : {
          "topic-0" : {
            "2" : 0,
            "4" : 1,
            "1" : 1,
            "3" : 1,
            "0" : 1
          }
        },
        "endOffset" : {
          "topic-0" : {
            "2" : 0,
            "4" : 115,
            "1" : 134,
            "3" : 21,
            "0" : 534
          }
        },
        "numInputRows" : 10,
        "inputRowsPerSecond" : 120.0,
        "processedRowsPerSecond" : 200.0
      } ],
      "sink" : {
        "description" : "MemorySink"
      }
    }
    */
    
    
    System.out.println(query.status());
    /*  Will print something like the following.
    {
      "message" : "Waiting for data to arrive",
      "isDataAvailable" : false,
      "isTriggerActive" : false
    }
    */
    
    7.2、使用异步API以编程方式报告度量

    还可以通过附加streamingQueryListener(scala/java docs)异步监视与sparkSession关联的所有查询。一旦使用SparkSession.streams.addListener()附加了customStreamingQueryListener对象,在启动和停止查询以及在活动查询中取得进展时,您将得到回调。下面是一个例子,

    SparkSession spark = ...
    
    spark.streams().addListener(new StreamingQueryListener() {
        @Override
        public void onQueryStarted(QueryStartedEvent queryStarted) {
            System.out.println("Query started: " + queryStarted.id());
        }
        @Override
        public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
            System.out.println("Query terminated: " + queryTerminated.id());
        }
        @Override
        public void onQueryProgress(QueryProgressEvent queryProgress) {
            System.out.println("Query made progress: " + queryProgress.progress());
        }
    });
    
    7.3、使用DropWizard报告度量值

    Spark支持使用DropWizard库报告度量。要同时报告结构化流式查询的指标,必须在SparkSession中显式启用configurationsPark.sql.streaming.metricsEnabled。

    spark.conf().set("spark.sql.streaming.metricsEnabled", "true");
    // or
    spark.sql("SET spark.sql.streaming.metricsEnabled=true");
    

    启用此配置后在SparkSession中启动的所有查询都将通过DropWizard向配置的Versinkshave报告度量(例如Ganglia、Graphite、JMX等)。

    8、使用检查点从失败中恢复

    如果出现故障或有意关闭,您可以恢复以前查询的进度和状态,并在停止的地方继续。这是使用检查点和提前写入日志完成的。您可以使用检查点位置配置查询,查询将把所有进度信息(即每个触发器中处理的偏移范围)和正在运行的聚合(如快速示例中的字数)保存到检查点位置。此检查点位置必须是HDFS兼容文件系统中的路径,并且可以在启动查询时在DatastreamWriter中设置为选项。

    aggDF
      .writeStream()
      .outputMode("complete")
      .option("checkpointLocation", "path/to/HDFS/dir")
      .format("memory")
      .start();
    

    9、流式查询更改后的恢复语义

    在从同一检查点位置重新启动之间,流式查询中允许哪些更改存在限制。以下是一些不允许的更改,或者更改的效果没有很好的定义。对于所有人:

    • termallowed意味着您可以进行指定的更改,但其效果的语义是否定义良好取决于查询和更改。

    • termNot allowed意味着您不应执行指定的更改,因为重新启动的查询可能会因不可预知的错误而失败。sdfre显示了使用sparksession.readstream生成的流式数据帧/数据集。

    变更的类型

    • 输入源的编号或类型(即不同的源)发生更改:这是不允许的。

    • 输入源参数的更改:是否允许,更改的语义是否定义良好,取决于源和查询。下面是几个例子。

      • 允许添加/删除/修改速率限制:spark.readstream.format(“kafka”).option(“subscribe”,“topic”)tospark.readstream.format(“kafka”).option(“subscribe”,“topic”).option(“maxoffsetspertrigger”,…)

      • 通常不允许更改订阅的主题/文件,因为结果是不可预测的:spark.readstream.format(“kafka”).option(“subscribe”,“topic”)tosbark.readstream.format(“kafka”).option(“subscribe”,“newtopic”)。

    • 输出接收器类型的更改:允许在几个特定接收器组合之间进行更改。这需要逐个验证。下面是几个例子。

      • 允许文件接收器到Kafka接收器。卡夫卡只能看到新的数据。

      • 不允许Kafka接收器到文件接收器。

      • 卡夫卡水槽改为foreach,反之亦然。

    • 输出接收器参数的更改:是否允许,更改的语义是否定义良好,取决于接收器和查询。下面是几个例子。

      • 不允许更改文件接收器的输出目录:sdf.writestream.format(“parquet”).option(“path”,“/somepath”)to sdf.writestream.format(“parquet”).option(“path”,“/anotherpath”)。

      • 允许更改输出主题:sdf.writestream.format(“kafka”).option(“topic”,“sometopic”)to sdf.writestream.format(“kafka”).option(“topic”,“anothertopic”)。

      • 允许对用户定义的foreach接收器(即foreachwritercode)进行更改,但更改的语义取决于代码。

    • *投影/过滤/类似地图的操作中的更改:某些情况下是允许的。例如:

      • 允许添加/删除筛选器:sdf.selectexpr(“a”)to sdf.where(…).selectexpr(“a”).filter(…)。

      • 允许更改具有相同输出架构的投影:sdf.selectexpr(“stringcolumn as json”).writestreamtosdf.selectexpr(“anotherStringcolumn as json”).writestream

      • 有条件地允许使用不同输出架构的投影中的更改:sdf.selectexpr(“a”).writestreamtosdf.selectexpr(“b”)。只有在输出接收器允许架构从“a”更改为“b”时,才允许使用writestream。

    • 状态操作中的更改:流式查询中的某些操作需要维护状态数据,以便持续更新结果。结构化流自动检查状态数据到容错存储(例如,HDFS、AWS S3、Azure Blob存储)并在重新启动后将其还原。但是,这假定状态数据的架构在重新启动时保持不变。这意味着在重新启动之间不允许对流式查询的有状态操作进行任何更改(即添加、删除或架构修改)。以下是在重新启动之间不应更改其架构的有状态操作列表,以确保状态恢复:

      • 流聚合:例如,sdf.groupby(“a”).agg(…)。不允许对分组键或聚合的数量或类型进行任何更改。

      • 流式重复数据消除:例如,sdf.dropduplicates(“A”)。不允许对分组键或聚合的数量或类型进行任何更改。

      • 流流连接:例如,sdf1.join(sdf2,…)(即,两个输入都是用sparksession.readstream生成的)。不允许在架构或同等联接列中进行更改。不允许更改联接类型(外部或内部)。连接条件中的其他更改定义错误。

      • 任意状态操作:例如,sdf.groupbykey(…).mapgroupswithstate(…)orsdf.groupbykey(…).flatmapgroupswithstate(…)。不允许更改用户定义状态的架构和超时类型。允许在用户定义状态映射函数内进行任何更改,但更改的语义效果取决于用户定义的逻辑。如果您真的想支持状态架构更改,那么您可以使用支持架构迁移的编码/解码方案将复杂的状态数据结构显式编码/解码为字节。例如,如果将状态保存为avro编码的字节,则可以在查询重新启动之间自由更改avro状态模式,因为二进制状态将始终成功还原。

    10、连续的工作

    [实验]
    连续处理是Spark 2.3中引入的一种新的、实验性的流式执行模式,它允许低(~1 ms)端到端延迟,并至少保证一次容错。将其与默认的微批量处理引擎进行比较,后者可以实现一次完全保证,但最多只能实现约100毫秒的延迟。对于某些类型的查询(在下面讨论),您可以选择在不修改应用程序逻辑的情况下执行它们的模式(即,不更改数据帧/数据集操作)。
    要在连续处理模式下运行受支持的查询,只需指定一个具有所需检查点间隔的连续触发器作为参数。例如,

    import org.apache.spark.sql.streaming.Trigger;
    
    spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "topic1")
      .load()
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("topic", "topic1")
      .trigger(Trigger.Continuous("1 second"))  // only change in query
      .start();
    

    检查点间隔为1秒意味着连续处理引擎将每秒记录查询的进度。生成的检查点的格式与微批处理引擎兼容,因此任何查询都可以用任何触发器重新启动。例如,支持的以微批处理模式启动的查询可以在连续模式下重新启动,反之亦然。请注意,任何时候切换到连续模式时,至少会得到一次容错保证。

    10.1、支持的查询

    从spark 2.3开始,在连续处理模式中只支持以下类型的查询。

    • Operations操作:在连续模式下只支持类似地图的数据集/数据帧操作, only projections (select,map,flatMap,mapPartitions, etc.) and selections (where,filter, etc.)。
      • 除了聚合函数(因为还不支持聚合)、current_timestamp() and current_date()之外,所有SQL函数都受支持(使用时间的确定性计算具有挑战性)。
    • Sources:
      kafka来源:支持所有选项。
      Rate 来源:适用于测试。只有在连续模式下支持的选项才是numPartitionsAndRowsPerSecond。
    • Sinks:
      kafka水槽:支持所有选项。
      内存接收器:用于调试。
      控制台接收器:便于调试。支持所有选项。注意,控制台将打印在连续触发器中指定的每个检查点间隔。

    有关详细信息,请参阅输入源和输出下沉部分。尽管控制台接收器适合测试,但可以最好地观察到以Kafka为源和接收器的端到端低延迟处理,因为这允许引擎在输入主题中输入数据可用的毫秒内处理数据并使结果在输出主题中可用。

    10.2、告诫
    • 连续处理引擎启动多个长时间运行的任务,这些任务不断地从源读取数据、处理数据并不断地向接收器写入数据。查询所需的任务数取决于查询可以并行从源读取的分区数。因此,在开始连续处理查询之前,必须确保集群中有足够的核心来并行执行所有任务。例如,如果您正在读取具有10个分区的Kafka主题,那么集群必须至少有10个核心才能使查询取得进展。
    • 停止连续处理流可能会产生虚假的任务终止警告。这些可以被安全地忽略。
    • 当前没有失败任务的自动重试。任何失败都将导致查询停止,需要从检查点手动重新启动查询。

    相关文章

      网友评论

        本文标题:StructuredStreaming编程指南

        本文链接:https://www.haomeiwen.com/subject/gnakjqtx.html