美文网首页
Structured Streaming 介绍(一)

Structured Streaming 介绍(一)

作者: Cherish_Qiang | 来源:发表于2017-08-01 22:05 被阅读310次

最近看了下structured streaming 的基本用法,大部分虽然是翻译官方文档,但是从翻译中也可以加深理解。

基本介绍和编程模型

Spark2.2.0 在7月12号发布,这个版本的Structured Streaming 抛掉了试验的标签,可以正式在生产环境使用。
Structured Streaming 是基于Spark SQL 引擎的流式计算引擎,将流式计算应用于DataFrame.随着数据不断地到达,Spark SQL引擎会以一种增量的方式来执行这些操作,并且持续更新计算结果。其基本概念就是将输入数据流作为
“Input Table”,每次新收到的数据会成为该表新的一行。


Input Table

每次针对数据的查询都会生成一个“Result Table”。每一次的
触发间隔(比如说1s),Input Table 新增的一行,最终都会在Result Table 进行更新。当result table 更新的
时候,我们可能会将改变的数据写入外部存储。

Result Table

Input Source

File source - 以文件流的形式读取目录中写入的文件。 支持的文件格式为text,csv,json,parquet。 有关更多最新列表,可以看下DataStreamReader界面的文档,并支持各种文件格式的选项。 请注意,文件必须是被移动到目录中的,比如用mv命令。
kafka source - 从kafka poll 数据,兼容 kafka broker 0.10.0 或更高版本。更多详情看
Kafka Integration Guide

Socket source (for testing )从socket 连接中读取 UTF8 数据,仅用于测试,不提供容错保证。

某些数据源是不支持容错的,因为它们不能保证在故障之后可以通过checkedpoint offsets 来重新消费数据。

Source Options Fault-tolerant Notes
File path:输入路径,适用所有格式 maxFilesPerTrigger:每次触发时,最大新文件数(默认:无最大) latestFirst:是否首先处理最新的文件,当有大量积压的文件时,有用(默认值:false) fileNameOnly:是否仅根据文件名而不是完整路径检查新文件(默认值:false)。将此设置为“true”,以下文件将被视为相同的文件,因为它们的文件名“dataset.txt”是相同的:· "file:///dataset.txt" "s3://a/dataset.txt" "s3n://a/b/dataset.txt" "s3a://a/b/c/dataset.txt" YES 支持glob路径,但不支持多个逗号分隔的 paths/globs.
socket host port NO ------
kafka 参见Kafka Integration Guide YES ------

三种不同的输出模式

Complete Mode - 更新后的整个Result Table将被写入外部存储。 由外部存储决定如何处理整个表的写入。
Append Mode - 在Result Table中,只有自上次触发后新增到result table中的数据将被写入外部存储。 这仅适用于不期望更改结果表中现有行的查询,也就是说,我们确定,result table中已有的数据是肯定不会被改变的,才使用这种模式。
Update Mode - 只有自上次触发以后在Result Table中更新的数据(包括新增的和修改的)将被写入外部存储(可用于Spark 2.1.1)。 这与完全模式不同,因为此模式仅输出自上次触发以来更改的行。 如果查询不包含聚合,它将等同于Append Mode。

简单例子

object WordCount {
 def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
        .appName("WordCount")
        .master("local")
        .getOrCreate()
val lines = spark
        .readStream
        .format("socket")
        .option("host", "localhost")
        .option("port", 9999)
        .load()
import spark.implicits._
val words = lines.as[String].flatMap(_.split(" "))
val wordCount=  words .groupBy("value").count()
 //执行此代码后,流式计算将在后台启动。
 //qurey对象是该活动流查询的句柄
  //使用awaitTermination()等待查询的终止.
val qurey = wordCount.writeStream
        .outputMode(OutputMode.Complete())
        .trigger(Trigger.ProcessingTime(2))
        .format("console")
        .start()
qurey.awaitTermination()
   }
 }

lines 为DataFrame是input table,这个表包含了一个名为"value"的列,现在还没有开始收到任何数据,因为我们只是做了transformation操作。 接下来,我们使用.as [String]将DataFrame转换为String数据集,通过flatMap操作将每一行分割成多个单词。 最后,我们通过分组操作生成wordCounts DataFrame。通过start()方法开启流计算。

流数据生成的DataFrame经查询生成wordCounts与静态DataFrame完全相同。 但是,当该查询启动时,Spark将连续检查套接字连接中的新数据。 如果有新数据,Spark将运行一个“增量”查询,将以前的运行计数与新数据相结合,以计算更新的计数,如下所示。


Word Count Example

相关文章

网友评论

      本文标题:Structured Streaming 介绍(一)

      本文链接:https://www.haomeiwen.com/subject/oyldlxtx.html