美文网首页
Spark Streaming之DStream转换

Spark Streaming之DStream转换

作者: 万事万物 | 来源:发表于2021-08-09 12:44 被阅读0次

DStream上的操作与RDD的类似,分为转换输出两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。

无状态转化操作

无状态转化操作:就是把RDD转化操作应用到DStream每个批次上,每个批次相互独立,自己算自己的。

常规无状态转化操作

DStream的部分无状态转化操作列在了下表中,都是DStream自己的API。
注意,针对键值对的DStream转化操作,要添加import StreamingContext._才能在Scala中使用,比如reduceByKey()。

函数名称 目的 Scala示例 函数签名
map() 对DStream中的每个元素应用给定函数,返回由各元素输出的元素组成的DStream。 ds.map(x=>x + 1) f: (T) -> U
flatMap() 对DStream中的每个元素应用给定函数,返回由各元素输出的迭代器组成的DStream。 ds.flatMap(x => x.split(" ")) f: T -> Iterable[U]
filter() 返回由给定DStream中通过筛选的元素组成的DStream ds.filter(x => x != 1) f: T -> Boolean
repartition() 改变DStream的分区数 ds.repartition(10) N / A
reduceByKey() 将每个批次中键相同的记录规约。 ds.reduceByKey( (x, y) => x + y) f: T, T -> T
groupByKey() 将每个批次中的记录根据键分组。 ds.groupByKey() N / A

需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD批次组成,且无状态转化操作是分别应用到每个RDD批次上的。

Transform

通过Transform可以将DStream每一批次的数据直接转换为RDD的算子操作。

还是通过一个程序来演示Transform具体的使用吧

需求:
从mysql中读取数据,对数据进行操作

  • 环境准备
 <dependencies>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.25</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>3.0.0</version>
        </dependency>

    </dependencies>
  • 自定Receiver
    读取数据库资源
import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet, ResultSetMetaData}

import com.mysql.jdbc.Driver
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver

import scala.collection.mutable.ArrayBuffer

class MysqlReceiver(val sql:String) extends Receiver[ArrayBuffer[Any]](StorageLevel.MEMORY_ONLY){
  // 建立连接
  var conn: Connection=null
  //
  var statement: PreparedStatement=null

  // receiver 启动时执行
  override def onStart(): Unit = {
    // 注册驱动
    classOf[Driver]


    // url
    val url="jdbc:mysql://hadoop102:3306/gmall?useUnicode=true&characterEncoding=UTF-8"
    // 账号
    val username="root"
    // m密码
    val password="123321"

    //建立连接
    conn = DriverManager.getConnection(url, username, password)

    // 获取 prepareStatement
    statement = conn.prepareStatement(sql)

    // 只需要执行查询sql
    val rs: ResultSet = statement.executeQuery()

    val metaData: ResultSetMetaData = rs.getMetaData

    while (rs.next()) {

      val array = ArrayBuffer[Any]()
      for(i <- 1 to metaData.getColumnCount){
        val columnName: String = metaData.getColumnName(i)
        val className: String = metaData.getColumnClassName(i)

        val value: Any = className match {
          case "java.lang.Long" => rs.getLong(i)
          case _ => rs.getString(i)

        }
        array.+=(value)
      }
      // 存储
      store(array)
    }
  }

  // receiver 关闭时执行
  override def onStop(): Unit = {

import java.sql.ResultSetMetaData

// 关闭资源
    if(statement!=null){
      statement.close()
    }

    if(conn!=null){
      conn.close()
    }

  }
}
  • 编写main程序
    获取用户的邮件长度
  def main(args: Array[String]): Unit = {

    // 创建 StreamingContext
    val ssc =new StreamingContext(new SparkConf().setMaster("local[4]").setAppName("test"),Seconds(5))

    // 设置日志级别
    ssc.sparkContext.setLogLevel("warn")

    // 准备sql

    val sql="select * from user_info"

    // 使用自定义Receiver
    val value: ReceiverInputDStream[ArrayBuffer[Any]] = ssc.receiverStream(new MysqlReceiver(sql))

    value.transform(rdd=>{
      // 统计邮箱长度
      rdd.map(e=>{
        // 邮箱
        val email: Any = e(6)
        // 姓名
        val name: Any = e(4)
        (name,email.toString.length)
      })
    }).print()



    // 启动
    ssc.start()

    // 等待
    ssc.awaitTermination()

  }

运行测试

-------------------------------------------
Time: 1625961725000 ms
-------------------------------------------
(孟河,16)
(令狐岚艺,20)
(邬羽,16)
(公孙昭,16)
(许超浩,19)
(茅馥,13)
(汤卿聪,18)
(钱固,19)
(汪云莲,18)
(司徒艺,18)

有状态转化操作

说有状态之前,修改一下上面的程序,统计男女人数。
设计到部分代码改动,大部分都是一样。
目前数据量比较少,只有一百条,为了不让程序马上跑完,读取一行数据时休眠2秒
Thread.sleep(2000)

    while (rs.next()) {

      val array = ArrayBuffer[Any]()
      for(i <- 1 to metaData.getColumnCount){
        val columnName: String = metaData.getColumnName(i)
        val className: String = metaData.getColumnClassName(i)

        val value: Any = className match {
          case "java.lang.Long" => rs.getLong(i)
          case _ => rs.getString(i)

        }
        array.+=(value)
      }
      // 存储
      store(array)
      // 休眠2秒
      Thread.sleep(2000)
    }

统计当前RDD 男女人数

    value.transform(rdd=>{
      // 统计邮箱长度
      val value1: RDD[String] = rdd.map(e => {
        // 性别
        val gender: Any = e(10)
        gender.toString
      })

      // 统计当前RDD男女人数
      value1.groupBy(e => e).map(e=>{
        (e._1,e._2.toList.size)
      })
    }).print()

运行结果

-------------------------------------------
Time: 1625963620000 ms
-------------------------------------------
(F,1)
(M,1)

-------------------------------------------
Time: 1625963625000 ms
-------------------------------------------
(F,2)

-------------------------------------------
Time: 1625963630000 ms
-------------------------------------------
(F,2)
(M,1)

-------------------------------------------
Time: 1625963635000 ms
-------------------------------------------
(F,1)
(M,1)

列出一部分,有没有发现无状态转化每次只能统计当前批次的数据。假设数据量比较大肯定需要一个批次一个批次的统计,然后对每个批次的数据进行汇总。
如何获取上一个批次数据呢?这里就需要使用有状态转化

  • UpdateStateByKey

updateStateByKey()用于键值对形式的DStream,可以记录历史批次状态。
updateStateByKey()参数中需要传递一个函数,在函数内部可以根据需求对新数据和历史状态进行整合处理,返回一个新的DStream。

我们知道RDD是不存储数据的,既然要获取上一个批次数据,那么肯定需要将数据进行持久化,如何进行持久化?这里就需要配置checkpoint,记录上一次的结果。
如果不配置,将会抛出如下异常错误

Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint().

更改一点点之前的程序

  def main(args: Array[String]): Unit = {

    // 创建 StreamingContext
    val ssc =new StreamingContext(new SparkConf().setMaster("local[4]").setAppName("test"),Seconds(5))

    // 设置日志级别
    ssc.sparkContext.setLogLevel("warn")

    // 持久化,保存上一个批次数据
    ssc.checkpoint("output//updateStateByKey")

    // 准备sql
    val sql="select * from user_info"

    // 使用自定义Receiver
    val value: ReceiverInputDStream[ArrayBuffer[Any]] = ssc.receiverStream(new MysqlReceiver(sql))

    // 获取性别
    val gender: DStream[(String, Int)] = value.map(e => {
      // 性别
      (e(10).toString,1)
    })

    // currentValue 当前批次 value 值
    // nextValues 上一个批次 value 值
    val fun=(currentValue:Seq[Int],nextValues:Option[Int])=>{
      Some(currentValue.sum+nextValues.getOrElse(0))
    }

    // 聚合
    val result: DStream[(String, Int)] = gender.updateStateByKey(fun)

    // 输出结果
    result.print()

    // 启动
    ssc.start()

    // 等待
    ssc.awaitTermination()
  }

输出结果

-------------------------------------------
Time: 1625965715000 ms
-------------------------------------------
(M,1)

-------------------------------------------
Time: 1625965720000 ms
-------------------------------------------
(M,1)
(F,2)

-------------------------------------------
Time: 1625965725000 ms
-------------------------------------------
(M,2)
(F,4)

21/07/11 09:08:46 WARN ProcfsMetricsGetter: Exception when trying to compute pagesize, as a result reporting of ProcessTree metrics is stopped
-------------------------------------------
Time: 1625965730000 ms
-------------------------------------------
(M,3)
(F,5)

-------------------------------------------
Time: 1625965735000 ms
-------------------------------------------
(M,3)
(F,8)

这样也许看不太出来每次的变化,我们把当前批次的数据结果也打印出来。
改这部分代码即可

    // 获取性别
    val gender: DStream[(String, Int)] = value.map(e => {
      
      println("当前批次数据结果:",(e(10).toString,1))
      // 性别
      (e(10).toString,1)
    })

运行测试

-------------------------------------------
Time: 1625966020000 ms
-------------------------------------------

(当前批次数据结果:,(F,1))
(当前批次数据结果:,(F,1))
(当前批次数据结果:,(M,1))
-------------------------------------------
Time: 1625966025000 ms
-------------------------------------------
(M,1)
(F,2)

(当前批次数据结果:,(F,1))
(当前批次数据结果:,(M,1))
-------------------------------------------
Time: 1625966030000 ms
-------------------------------------------
(M,2)
(F,3)

21/07/11 09:13:53 WARN ProcfsMetricsGetter: Exception when trying to compute pagesize, as a result reporting of ProcessTree metrics is stopped
(当前批次数据结果:,(F,1))
(当前批次数据结果:,(M,1))
(当前批次数据结果:,(F,1))
-------------------------------------------
Time: 1625966035000 ms
-------------------------------------------
(M,3)
(F,5)

(当前批次数据结果:,(F,1))
(当前批次数据结果:,(F,1))
-------------------------------------------
Time: 1625966040000 ms
-------------------------------------------
(M,3)
(F,7)

这样就可以很方便看出每个批次的数据累计情况。

功能也看到了,那么就该具体的了。

def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)] = ssc.withScope {
     updateStateByKey(updateFunc, defaultPartitioner())
}

updateStateByKey有很多重载方法,其他的还不怎么熟悉,就说说这一个吧。
updateFunc:需要我们指定函数,并且有两个参数,分别类型为(Seq[V],Option[S])返回的类型也需要为Option[S]
第一个参数,Seq[V] :表示当前批次的所有key对应的value数据,
第二个参数,Option[S]:表示上一个批次的数据,为什么是Option呢?那是因为上一个批次的数据可能还没有。

    // currentValue 当前批次 value 值
    // nextValues 上一个批次 value 值
    val fun=(currentValue:Seq[Int],nextValues:Option[Int])=>{
      Some(currentValue.sum+nextValues.getOrElse(0))
    }

    // 聚合
    val result: DStream[(String, Int)] = gender.updateStateByKey(fun)
  • Window
    updateStateByKey 用于统计全局数据,我们还是拿上一个案例(统计男女人数)为例,假设我的数据量越来越大,不停的有人向我网站注册会员,我需要统计10分钟,一个小时,男女注册人数,只需要统计这一个时间段的数据。
    需求明白了之后,如何实现了?此时就需要使用到了WindowOperations 函数。
    理解WindowOperations函数,就需要先理解 scala中的sliding(滑窗函数)
val list=List(1,2,3,4,5,6,7,8,9,10)

滑窗

list,sliding(窗口大小,步长) 

例如

list,sliding(2,1) 

结果

List(1,2)
List(2,3)
List(3,4)
List(4,5)
List(5,6)
List(7,8)
List(8,9)
List(9,10)

sliding 滑动的是数据,WindowOperations滑动的就是时间(Streaming 是按照时间拉取一个批次数据)。
感觉有点抽象?用案例说话吧。

def main(args: Array[String]): Unit = {

    // 创建 StreamingContext
    val ssc =new StreamingContext(new SparkConf().setMaster("local[4]").setAppName("test"),Seconds(5))

    // 设置日志级别
    ssc.sparkContext.setLogLevel("warn")

    // 持久化,保存上一个批次数据
    ssc.checkpoint("output//updateStateByKey")

    // 准备sql
    val sql="select * from user_info"

    // 使用自定义Receiver
    val value: ReceiverInputDStream[ArrayBuffer[Any]] = ssc.receiverStream(new MysqlReceiver(sql))

    // 获取性别
    val gender: DStream[(String, Int)] = value.map(e => {
      println("统计当前批次的数据:",(e(10).toString,1))
      // 性别
      (e(10).toString,1)
    })


    // 窗口
    val windowValue: DStream[(String, Int)] = gender.window(Seconds(10), Seconds(5))

    // 输出结果
    windowValue.print()

    // 聚合当前窗口的数据
    val result: DStream[(String, Int)] = windowValue.reduceByKey(_ + _)

    result.print()

    println("*"*50)

    // 启动
    ssc.start()

    // 等待
    ssc.awaitTermination()


  }

设置滑窗

 val windowValue: DStream[(String, Int)] = gender.window(Seconds(10), Seconds(5))

Seconds(10):窗口时长:计算内容的时间范围;
Seconds(5):滑动步长:隔多久触发一次计算。

之前设置的,五秒拉取一次。

val windowValue: DStream[(String, Int)] = gender.window(Seconds(10), Seconds(5))

我想了半天,确实不知道怎么表达
看看这个数据,1~10;可以这么理解,集合中的每一个元素相当于存储5秒一个批次的数据,
1:第一个批次数据,2:表示第二个批次数据,以此类推。

val list=List(1,2,3,4,5,6,7,8,9,10)

设置的创建为Seconds(10)也就是10秒,是不是相当窗口大小为2,也就是两个批次的数据。
设置步长为Seconds(5),也就是每次滑动一个元素的数据,也就是滑动一个一批次的数据。
最终的结果是不是应该也是这样,只是这里应该为数字,而是一个批次的数据。

List(1,2)
List(2,3)
List(3,4)
List(4,5)
List(5,6)
List(7,8)
List(8,9)
List(9,10)

我们再来看看数据结果

**************************************************
(统计当前批次的数据:,(M,1))
-------------------------------------------
Time: 1625969490000 ms
-------------------------------------------
(M,1)

-------------------------------------------
Time: 1625969490000 ms
-------------------------------------------
(M,1)

(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(F,1))
-------------------------------------------
Time: 1625969495000 ms
-------------------------------------------
(M,1)
(F,1)
(F,1)
(F,1)

-------------------------------------------
Time: 1625969495000 ms
-------------------------------------------
(M,1)
(F,3)

(统计当前批次的数据:,(M,1))
(统计当前批次的数据:,(F,1))
-------------------------------------------
Time: 1625969500000 ms
-------------------------------------------
(F,1)
(F,1)
(F,1)
(M,1)
(F,1)

-------------------------------------------
Time: 1625969500000 ms
-------------------------------------------
(M,1)
(F,4)

(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(M,1))
(统计当前批次的数据:,(F,1))
-------------------------------------------
Time: 1625969505000 ms
-------------------------------------------
(M,1)
(F,1)
(F,1)
(M,1)
(F,1)

-------------------------------------------
Time: 1625969505000 ms
-------------------------------------------
(M,2)
(F,3)

(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(F,1))
-------------------------------------------
Time: 1625969510000 ms
-------------------------------------------
(F,1)
(M,1)
(F,1)
(F,1)
(F,1)

-------------------------------------------
Time: 1625969510000 ms
-------------------------------------------
(M,1)
(F,4)

(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(M,1))
-------------------------------------------
Time: 1625969515000 ms
-------------------------------------------
(F,1)
(F,1)
(F,1)
(F,1)
(M,1)

-------------------------------------------
Time: 1625969515000 ms
-------------------------------------------
(M,1)
(F,4)

(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(F,1))
-------------------------------------------
Time: 1625969520000 ms
-------------------------------------------
(F,1)
(F,1)
(M,1)
(F,1)
(F,1)

-------------------------------------------
Time: 1625969520000 ms
-------------------------------------------
(M,1)
(F,4)

(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(M,1))
(统计当前批次的数据:,(F,1))
-------------------------------------------
Time: 1625969525000 ms
-------------------------------------------
(F,1)
(F,1)
(F,1)
(M,1)
(F,1)

-------------------------------------------
Time: 1625969525000 ms
-------------------------------------------
(M,1)
(F,4)

因为我不仅打印了每个窗口的数据,还打印了每个窗口统计后的结果

数据

滑窗

微信图片_20210711104114.png

结果

结果

图片有点模糊,我也很无奈呀,就是想说数据的结果就是通过滑窗的方式获取的,

注意:

滑动时间、窗口时间必须为批次时间的整数倍。

  • reduceByKeyAndWindow
    向上面这种操作
 // 窗口
val windowValue: DStream[(String, Int)] = gender.window(Seconds(10), Seconds(5))
// 聚合当前窗口的数据
val result: DStream[(String, Int)] = windowValue.reduceByKey(_ + _)

还可以进行简写,这里就需要使用到reduceByKeyAndWindow,对窗口进行聚合。效果和上面两行代码一样

  def main(args: Array[String]): Unit = {

    // 创建 StreamingContext
    val ssc =new StreamingContext(new SparkConf().setMaster("local[4]").setAppName("test"),Seconds(5))

    // 设置日志级别
    ssc.sparkContext.setLogLevel("warn")

    // 持久化,保存上一个批次数据
    ssc.checkpoint("output//updateStateByKey")

    // 准备sql
    val sql="select * from user_info"

    // 使用自定义Receiver
    val value: ReceiverInputDStream[ArrayBuffer[Any]] = ssc.receiverStream(new MysqlReceiver(sql))

    // 获取性别
    val gender: DStream[(String, Int)] = value.map(e => {
      println("统计当前批次的数据:",(e(10).toString,1))
      // 性别
      (e(10).toString,1)
    })


    // 窗口
    val result: DStream[(String, Int)] = gender.reduceByKeyAndWindow((agg, curr) => agg + curr, windowDuration=Seconds(10), Seconds(5))

    // 输出结果
    result.print()



    println("*"*50)

    // 启动
    ssc.start()

    // 等待
    ssc.awaitTermination()
  }

运行:

**************************************************
(统计当前批次的数据:,(M,1))
(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(F,1))
-------------------------------------------
Time: 1625973660000 ms
-------------------------------------------
(M,1)
(F,2)

(统计当前批次的数据:,(M,1))
(统计当前批次的数据:,(F,1))
-------------------------------------------
Time: 1625973665000 ms
-------------------------------------------
(M,2)
(F,3)

(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(M,1))
-------------------------------------------
Time: 1625973670000 ms
-------------------------------------------
(M,2)
(F,3)

注意:
windowDuration 不能简写,具体是什么原因不太清除,可能是编译器的问题。

    // 窗口
    val result: DStream[(String, Int)] = gender.reduceByKeyAndWindow((agg, curr) => agg + curr, windowDuration=Seconds(10), Seconds(5))

reduceByKeyAndWindow 还有一个重载方法

  def reduceByKeyAndWindow(
      reduceFunc: (V, V) => V,
      invReduceFunc: (V, V) => V,
      windowDuration: Duration,
      slideDuration: Duration = self.slideDuration,
      numPartitions: Int = ssc.sc.defaultParallelism,
      filterFunc: ((K, V)) => Boolean = null
    ): DStream[(K, V)] = ssc.withScope {
    reduceByKeyAndWindow(
      reduceFunc, invReduceFunc, windowDuration,
      slideDuration, defaultPartitioner(numPartitions), filterFunc
    )
  }

需要指定两个函数
invReduceFunc:逆归约函数;

我现在需要统计用户注册人数,假设1秒钟拉取一次,我需要统计一分钟的数据(窗口应该60),每一秒统计一次。


若按原来的做法,每次都需要从头开始(1~60)开始结算,但是2~59的数据始终不会变的,变得永远都第一个和最后一个。
那么。
逆归约函数的思想简单点说,就是丢弃第一个,加上新加入的。中间的永远不变,这样的好处就是减少重复计算。
 def main(args: Array[String]): Unit = {

    // 创建 StreamingContext
    val ssc =new StreamingContext(new SparkConf().setMaster("local[4]").setAppName("test"),Seconds(5))

    // 设置日志级别
    ssc.sparkContext.setLogLevel("warn")

    // 持久化,保存上一个批次数据
    ssc.checkpoint("output//updateStateByKey")

    // 准备sql
    val sql="select * from user_info"

    // 使用自定义Receiver
    val value: ReceiverInputDStream[ArrayBuffer[Any]] = ssc.receiverStream(new MysqlReceiver(sql))

    // 获取性别
    val gender: DStream[(String, Int)] = value.map(e => {
      println("统计当前批次的数据:",(e(10).toString,1))
      // 性别
      (e(10).toString,1)
    })

    // 窗口
    val result: DStream[(String, Int)] = gender.reduceByKeyAndWindow((agg, curr) => {
      println(s"累加滑入批次数据: ${agg}  ${curr}")
      agg + curr
    }, (agg1, curr1) => {
      println(s"减去滑出的批次数据: ${agg1}  ${curr1}")
      agg1 - curr1
    }, Seconds(15), Seconds(5))

    // 输出结果
    result.print()

    println("*"*50)

    // 启动
    ssc.start()

    // 等待
    ssc.awaitTermination()
  }

结果

**************************************************
(统计当前批次的数据:,(M,1))
-------------------------------------------
Time: 1625975440000 ms
-------------------------------------------
(M,1)

(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(F,1))
累加滑入批次数据: 1  1
-------------------------------------------
Time: 1625975445000 ms
-------------------------------------------
(M,1)
(F,2)

(统计当前批次的数据:,(M,1))
(统计当前批次的数据:,(F,1))
(统计当前批次的数据:,(F,1))
累加滑入批次数据: 1  1
累加滑入批次数据: 1  1
累加滑入批次数据: 2  2
-------------------------------------------
Time: 1625975450000 ms
-------------------------------------------
(M,2)
(F,4)

相关文章

网友评论

      本文标题:Spark Streaming之DStream转换

      本文链接:https://www.haomeiwen.com/subject/lurhpltx.html