简介
上一篇文章对Spark Streaming做了基本的介绍,包括Spark Streaming的概念、特点、应用场景、基础组件等等,具体可见Spark从入门到放弃—Spark Streaming介绍。本文的目的是具体的代码实践,通过若干个小例子来演示Spark Streaming API的使用。
WordCount
还是以经典的WordCount来举例,但是这里不再是统计本地文件的word数量,而是通过网络接收实时数据流,并且实现word的实时统计更新。具体代码如下:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreaming_WordCount {
def main(args: Array[String]) : Unit = {
val sparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// 通过监听端口创建一个DStream对象,包含一行行数据
val linesStream: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
// 对数据进行切分,形成一个个单词
val wordStream = linesStream.flatMap(_.split(" "))
// 将单词映射成元祖
val wordToOne = wordStream.map(word => (word, 1))
//将相同的单词进行聚合操作
val wordcount = wordToOne.reduceByKey(_+_)
//print,action操作
wordcount.print()
//启动ssc
ssc.start()
ssc.awaitTermination()
}
}
代码里面设置的间隔时间为2秒,即每两秒Spark会从Socket数据量中取一次数据,然后执行随后的Transformation等操作。


可以看到间隔时间确实是2秒,并且每次输出的结果刚好是统计了这个时间段内的单词的数量。
自定义数据源
Spark Streaming可以从包括内置数据源在内的任意数据源获取数据(其他数据源包括flume,kafka,kinesis,文件,套接字等等)。如果开发者需要自定义数据源,那么这需要开发者去实现一个定制receiver
从具体的数据源接收数据。
要想实现自定义的receiver
,必须继承Receiver
这个抽象类,并且实现它的两个方法,分别是onStart
和onStop
,这两个方法分别代表开始接受数据以及停止接收数据。
需要注意到是,onStart()启动线程负责数据的接收,onStop()确保这个接收过程停止。一旦接收到了数据,这些数据可以通过调用store()
方法存到Spark中,store()
是[Receiver]类中的方法。
下面实现一个简单的例子,它继承了Receiver
类,并且实现了onStart
和onStop
方法。为了简单起见,在onStart
方法中,我们开启了一个线程,每隔500ms就产生一个随机数,并且通过调用父类的store
方法,将数据保存到Spark的内存中,然后被StreamingContxt对象获取。在主线程中,我们只是简单地将接收到的数据流打印出来。
代码如下:
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val messageDS: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver())
messageDS.print()
ssc.start()
ssc.awaitTermination()
}
/*
自定义数据采集器
1. 继承Receiver,定义泛型, 传递参数
2. 重写方法
*/
class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {
private var flag = true
override def onStart(): Unit = {
new Thread(new Runnable {
override def run(): Unit = {
while ( flag ) {
val message = "采集的数据为:" + new Random().nextInt(10).toString
store(message)
Thread.sleep(500)
}
}
}).start()
}
override def onStop(): Unit = {
flag = false;
}
}
}
结果如下:

Transform操作
Transform操作允许在DStream上执行任意的RDD-to-RDD函数,并且返回一个新的DStream。 使用transform操作,我们就可以轻松地使用RDD操作,即便这些函数没有在DStream的API中暴露出来。举个例子,对一个数据流中的每批次数据与另外一个dataset进行join操作的功能,DStream中并没有直接包含可以执行这些操作的API,但是我们可以通过transform
函数轻松实现。值得注意的时候,传递给transform
函数的参数,每经过一个批次就会被调用一次。
仍然以上述的WordCount例子来举例说明transform
函数的使用,代码如下:
object WordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val lines = ssc.socketTextStream("localhost", 9999)
val wordAndCountDStream: DStream[(String, Int)] = lines.transform(
rdd => {
val words: RDD[String] = rdd.flatMap(_.split(" "))
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
val value: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)
value
}
)
//打印 wordAndCountDStream.print
wordAndCountDStream.print()
ssc.start()
ssc.awaitTermination()
}
}
结果如下:


Join操作
使用join
函数,我们可以很轻松地对两条流数据进行聚合操作。前提是,两条流的批次大小一致,这样才可以在每个批间隔内对每条流各自的RDD进行聚合操作。同样,我们也可以使用leftOuterJoin, rightOuterJoin, fullOuterJoin
等函数。
代码如下:
object WordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val data9999 = ssc.socketTextStream("localhost", 9999)
val data8888 = ssc.socketTextStream("localhost", 8888)
val map9999: DStream[(String, Int)] = data9999.map((_,9))
val map8888: DStream[(String, Int)] = data8888.map((_,8))
// 所谓的DStream的Join操作,其实就是两个RDD的join
val joinDS: DStream[(String, (Int, Int))] = map9999.join(map8888)
joinDS.print()
ssc.start()
ssc.awaitTermination()
}
}
有状态转化操作
updateStateByKey
操作允许用户保持任意的状态信息,并且可以根据新信息持续更新状态。UpdateStateByKey 原语用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加 wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量 的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指 定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数 据为(键,状态) 对。
updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对 应的(键,状态)对组成的。updateStateByKey 操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,需要做下面两步:
- 定义状态,状态可以是一个任意的数据类型。
- 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。
使用 updateStateByKey 需要对检查点目录进行配置,会使用检查点来保存状态。
在每个批次中,Spark会对所有存在的key调用状态更新函数,而不管这些key在当前批次中是否包含新的数据。
之前实现的WordCount例子,只是对每个批次中出现的word进行计数,而并没有保存状态。使用updateStateByKey
函数可以对以往所有出现过的word进行计数,代码如下:
object WordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))
ssc.checkpoint("cp")
// 无状态数据操作,只对当前的采集周期内的数据进行处理
// 在某些场合下,需要保留数据统计结果(状态),实现数据的汇总
// 使用有状态操作时,需要设定检查点路径
val datas = ssc.socketTextStream("localhost", 9999)
val wordToOne = datas.map((_,1))
// updateStateByKey:根据key对数据的状态进行更新
// 传递的参数中含有两个值
// 第一个值表示相同的key的value数据
// 第二个值表示缓存区相同key的value数据
val state = wordToOne.updateStateByKey(
( seq:Seq[Int], buff:Option[Int] ) => {
val newCount = buff.getOrElse(0) + seq.sum
Option(newCount)
}
)
state.print()
ssc.start()
ssc.awaitTermination()
}
}
结果如下:


updateStateByKey
操作,我们可以保存状态信息,这在某些场景下非常有用,比如人流量统计。
Window操作
Windows操作可以设置窗口的大小和滑动窗口的间隔来动态或者当前Streaming的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。
- 窗口时长: 计算内容的时间范围
-
滑动步长: 隔多久触发一次计算
window操作示意图
这两者都必须为采集周期大小的整数倍!
以WordCount为例,设置间隔时间为3秒,窗口大小为12秒,滑动步长为6秒,代码如下:
object WordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))
val lines = ssc.socketTextStream("localhost", 9999)
val wordToOne = lines.map((_,1))
// 窗口的范围应该是采集周期的整数倍
// 窗口可以滑动的,但是默认情况下,一个采集周期进行滑动
// 这样的话,可能会出现重复数据的计算,为了避免这种情况,可以改变滑动的滑动(步长)
val windowDS: DStream[(String, Int)] = wordToOne.window(Seconds(12), Seconds(6))
val wordToCount = windowDS.reduceByKey(_+_)
wordToCount.print()
ssc.start()
ssc.awaitTermination()
}
}
输入的数据流为:


为了提高效率,Spark提供了另外一个函数
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
,当窗口进行滑动的时候,第一个参数func用来增加新加入到窗口中的数据,第二个参数invFunc用来删除掉不再窗口中的数据,即每个窗口的 reduce 值都是通过用前一个窗的 reduce 值来递增计算。仍然以上述的WordCount为例,代码如下:
object WordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val ssc = new StreamingContext(sparkConf, Seconds(3))
ssc.checkpoint("cp")
val lines = ssc.socketTextStream("localhost", 9999)
val wordToOne = lines.map((_,1))
// reduceByKeyAndWindow : 当窗口范围比较大,但是滑动幅度比较小,那么可以采用增加数据和删除数据的方式
// 无需重复计算,提升性能。
val windowDS: DStream[(String, Int)] =
wordToOne.reduceByKeyAndWindow(
(x:Int, y:Int) => { x + y },
(x:Int, y:Int) => { x - y },
Seconds(12),
Seconds(6))
windowDS.print()
ssc.start()
ssc.awaitTermination()
}
}

从上图中可以明显看到数据的变化过程,这个过程反应的就是窗口的滑动过程,当数据进入到窗口区间内的时候,对应key的value累加,当数据滑出窗口的时候,对应key的value减少。
网友评论