美文网首页
spark学习笔记3-StructuredStreaming

spark学习笔记3-StructuredStreaming

作者: 我是老薛 | 来源:发表于2019-02-27 19:56 被阅读0次

本文是对Spark的核心外围组件之一的Structured Streaming的一个学习总结,本文共包含如下几部分的内容:

  • 概述
  • Structured Streaming的结构化流失处理
  • 简单实例
  • 小结

参考资料:

1、如果要对Spark的基本知识有所了解,可参考文档《spark学习笔记1-基础部分》

2、如果要对Spark SQL的基本知识有所了解,可参考文档《spark学习笔记2-Spark SQL》

一、概述

相比于我们熟悉的Spark streaming组件,Structured Streaming是Spark2.0版本提出的新的实时流框架(2.0和2.1是实验版本,从Spark2.2开始为稳定版本),它的API设计比Spark streaming更加简单易用,功能也更加强大。从Spark-2.X版本后,Spark streaming就进入维护模式。

在介绍Structured Streaming之前,我们先来介绍下Spark streaming。Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。它支持从多种数据源获取数据,包括Kafk、Flume、以及TCP socket等,从数据源获取数据之后,可以使用诸如map、reduce和window等高级函数进行复杂算法的处理。最后还可以将处理结果存储到文件系统和数据库等系统中。其处理的数据流图如下图:


Spark Streaming接收流数据,并根据一定的时间间隔拆分成一批批batch数据,用抽象接口DStream表示(DStream可以看成是一组RDD序列,每个batch对应一个RDD),然后通过Spark Engine处理这些batch数据,最终得到处理后的一批批结果数据。其处理机制如下图:

相比Spark Streaming, Structured Streaming有如下特点:

1、如Spark Streaming一样能支持多种数据源的输入和输出。

2、支持以结构化的方式操作流式数据,能够像使用Spark SQL处理离线的批处理一样,处理流数据,代码更简洁,写法更简单。

3、基于Event-Time,相比于Spark Streaming的Processing-Time更精确,更符合业务场景。

4、解决了Spark Streaming存在的代码升级,DAG图变化引起的任务失败,无法断点续传的硬伤问题。

下面章节我们来详细介绍Structured Streaming的特点和使用。

二、StructuredStreaming的结构化流式处理

Structured Streaming将实时流抽象成一张无边界的表(Unbounded Table),输入的每一条数据(new data)当成输入表的一个新行(new rows),同时将流式计算的结果映射为另外一张表,完全以结构化的方式去操作流式数据。如下图所示:


输入的流数据是以batch为单位被处理,每个batch会触发一次流式计算(如sql查询,开发人员预先定义好的),计算结果被更新到Result Table。每个batch代表一个触发间隔,默认设置为1秒,这样每1秒从输入源读取数据到Input Table,然后触发Query计算,将结果写入Result Table。如下图所示:


上面流程中Result Table中的信息会被输出到外部数据源中,如文件、kafka、关系数据库中,在测试阶段最简单的方式可输出到控制台上。一共有三种Output模式:

1、Append模式:仅仅从上次触发计算到当前新增的行会被输出。仅仅支持行数据插入结果表后不进行更改的query操作。。

2、Complete模式: 每次触发都会将整个结果表输出。这个是针对聚合操作的。。

3、Update模式:仅仅是自上次触发之后结果表有变更的行会输出。

上面介绍了structured Streaming采用结构化流失处理的基本流程和机制,下面章节我们通过一个具体的例子实际感受下。

三、简单实例

这个例子,输入信息来自一个socket服务器,当spark客户端程序(这里指的是利用structured Streaming的API编写的程序)连接到该socket后,socket服务器就会随机的发送数据,然后spark客户端就会获取数据进行处理。

我们先用java来编写一个用于测试的socket服务器程序,代码如下:

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;

public class SocketExample {
    public static void main(String[] args) {
        try {
            ServerSocket server = new ServerSocket(9999);
            System.out.println("启动服务器,等待客户端连接....");
            Socket client = server.accept();
            System.out.println("客户端已连接到服务器");

            for (int i = 0; i < 10000; i++) {
                BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(client.getOutputStream()));
                Random rand = new Random();
                StringBuffer data = new StringBuffer();
                for(int k =0;k<6;k++){
                    data.append("data" + rand.nextInt(10)+" ");
                }
                data.append("\n");
                bw.write(data.toString());
                bw.flush();
                Thread.sleep(10);
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

上面程序很简单,创建一个Socket服务器,侦听端口是9999,然后等待客户端连接。接收到客户端连接后,在一个for循环中,每隔10毫秒向客户端发送一个字符串,字符串的内容是由6以空格分隔的字符串组成。

上面代码编译后是一个可执行的Java程序,然后我们在控制台上启动该java程序,如: java SocketExample

下面我们启动spark shell,在命令行中编写一个structured Streaming程序。该程序的功能是连接到socket服务器,获取socket服务器发送的字符串信息,统计所有的不同单词出现的次数。代码如下:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().appName("newdemo").getOrCreate()

val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()

val words = lines.as[String].flatMap(_.split(" "))

words.createOrReplaceTempView("info")

val wordCounts = spark.sql("select value,count(*) from info group by value")

wordCounts.writeStream.outputMode("complete").format("console").start()

下面我们来解释上面代码的含义。

val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
上面语句的作用是设置连接数据源的信息,返回一个DataFrame对象,这样相当于把socket数据源当作一个DataFrame来处理,这样我们就可以利用DataFrame提供的各种方法来进行数据的操作了。

val words = lines.as[String].flatMap(_.split(" "))
上面语句的含义是将上面获取的lines对象(DataFrame类型)中的每个每个字符串(是以空格分隔的多个字符串组成)中的单词取出了,生成一个新的Dataset对象。然后就可以对Dataset对象进行处理了。

words.createOrReplaceTempView("info")
上面语句的含义是将words对象(DataSet类型)注册为一个SQL临时视图,这样既可以执行sql语句了。因为words中的数据类型是字符窜,所有该视图只有一个字段,字段名为默认的value。

val wordCounts = spark.sql("select value,count() from info group by value");
上面语句
的意思就是执行一个sql统计操作,即统计相当单词出现的次数。*

wordCounts.writeStream.outputMode("complete").format("console").start()
上面语句的作用是真的执行任务,包括连接socket服务器,获取数据进行处理。也就是说前面的代码都不会执行真正的动作,相当于是设置信息和要执行的动作,这也是函数式编程中惰性特性的一个典型体现。

上面这个语句中 outputMode("complete") 表示输出的方式为完全模式,format("console")表示输出到控制台。这时我们观察控制台,会发现不断打印类似的信息:

Batch: 9

-------------------------------------------

+-----+--------+

|value|count(1)|

+-----+--------+

|data9|    1378|

|data6|    1354|

|data4|    1378|

|data7|    1497|

|data1|    1353|

|data5|    1305|

|data3|    1307|

|data8|    1393|

|data0|    1340|

|data2|    1357|

+-----+--------+

上面信息代表了StructuredStreaming每次batch处理的结果,我们会看到不断打印这样的信息片段。并且会看到这些单词的统计数量的值不断增大。

上面我们用的是完全输出模式,也就是说是统计所有获取数据中单词的重复次数。如果我们希望只是简单输出从socket中获取的单词,则后面两句代码改为如下代码:

val wordCounts = spark.sql("select value from info");

wordCounts.writeStream.outputMode("append").format("console").start()

可以看出除sql语句变化外,outputMode函数的参数值由"complete"变为"append"。这时我们观察控制台的输出,可以看到如下的片段重复出现:

Batch: 32

-------------------------------------------

+-----+

|value|

+-----+

|data4|

|data7|

|data1|

|data1|

|data0|

|data5|

|data0|

|data9|

|data8|

|data7|

|data7|

|data2|

|data4|

四、小结

本文对Structured Streaming的基本概念做了简单的介绍,并通过一个具体的例子来真实感受下Structured Streaming的使用,可以看出编写Structured Streaming程序还是很方便的,可以利用Spark SQL和数据集的各种API进行很方便的处理,就和批处理的方式一样。

需要说明的是,Structured Streaming的功能远不止本文介绍的这点,本文只是介绍最基础的一部分。

相关文章

  • spark学习笔记3-StructuredStreaming

    本文是对Spark的核心外围组件之一的Structured Streaming的一个学习总结,本文共包含如下几部...

  • spark 学习笔记

    Spark学习笔记 Data Source->Kafka->Spark Streaming->Parquet->S...

  • spark

    *Spark Spark 函数Spark (Python版) 零基础学习笔记(一)—— 快速入门 1.map与fl...

  • 2020-03-17

    spark学习笔记centos安装Oracle VirtualBox: Centos安装Vagrant

  • spark核心编程

    Spark 学习笔记 Spark 架构及组件 client:客户端进程,负责提交job到master Driver...

  • Spark Core 学习笔记

    Spark Core 学习笔记 1、Spark 简介 ​ Spark 是一种用于大规模数据处理的统一计算引擎...

  • Spark Architecture

    OReilly.Learning.Spark 学习笔记 Spark里所有操作都是对RDD来的。分为两种 1. Tr...

  • 《架构师训练营》之大数据应用

    极客时间《架构师训练营》第十三周学习笔记 Spark 架构 Spark 则是 UC Berkeley AMP la...

  • 【Spark学习笔记】初识spark

    1.Spark简介 快速且通用的集群计算平台 1.1.快速性: Spark扩充了流行的mapreduce计算模型 ...

  • Spark学习笔记

    Scala语法 至于scala语法而言,大致上和Java的语法类似,增加了一些函数式编程,具体语法可以参考Scal...

网友评论

      本文标题:spark学习笔记3-StructuredStreaming

      本文链接:https://www.haomeiwen.com/subject/ffkquqtx.html