DStream的原语与RDD类似,分文转换(Transformation)和输出(Output)两种,此外还有一些特殊的原语,如:updateStateByKey,transform以及各种窗口(window)相关的原语。
无状态转化操作
无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。支持map、flatMap、filter、reduceByKey等RDD相同的操作。
注:如针对键值对的DStream使用reduceByKey需要添加import StreamingContext._
无状态转化应用到DStream内部各个批次的RDD上,但是只会对单个时间周期的数据进行操作,并不会跨越时间周期。
无状态转化操作也能在多个DStream间整合数据,不过也是在各个时间周期内。键值对DStream可以使用和RDD一样的连接相关操作,例如:cogroup、join、leftOutJoin等。
同样,可以使用union将两个DStream内容合并起来,也可以使用StreamingContext.union()合并多个流。
有状态转化操作
1. updateStateByKey
updateStateByKey用于记录历史记录,有事我们需要在DStream中跨批次维护状态。针对这种情况updateStateByKey为我们提供了一个对状态变量的访问,用于键值对的DStream。给定一个键值对类型的DStream,并传递一个按照键来更新值得函数,以此来构建出一个新的DStream,内部数据的形式为(原有键,状态)的一个键值对。
想要维护并更新状态需要做如下两步:
- 定义状态,可以为任意数据类型
- 定义状态更新函数,函数需要实现当前批次输入与当前状态的更新操作
使用updateStateByKey需要设置checkpoint,会通过checkpoint保存状态。
实现有状态的Wordcount如下:
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
object UpdateStateDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[4]").setAppName("UpdateStateDemo")
val streamingContext = new StreamingContext(conf, Seconds(5))
streamingContext.checkpoint("checkpoint")
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("test")
val kafkaDStream = KafkaUtils.createDirectStream[String, String](
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
val flatMapDStream = kafkaDStream.flatMap(_.key().split(" "))
val mapDStream = flatMapDStream.map((_, 1))
// 将原有状态值与新增数据value序列相加,获得新的状态
val updateStateDStream = mapDStream.updateStateByKey {
case (seq, state) => Option(state.getOrElse(0) + seq.sum)
}
updateStateDStream.print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
2. 窗口操作
窗口操作可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Streaming的允许状态。基于窗口的操作会在一个比StreamingContext的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。
注:所有基于窗口的操作均需要两个参数:1)窗口大小,2)滑动步长,两者都必须是StreamingContext中设置的采集周期的正整数倍。
窗口大小控制每次计算最近的多少个批次的数据,计算的批次数量为windowDuration/batchInterval
个。滑动步长用来控制对新的DStream进行计算的间隔,默认与批次间隔相同。
关于窗口的操作有如下原语:
-
window(windowLength, slideInterval)
:对DStream的窗口中的批次进行计算,并返回一个新的DStream利用window对窗口中的批次数据进行Wordcount
import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} object WindowDemo { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local[4]").setAppName("WindowDemo") val streamingContext = new StreamingContext(conf, Seconds(3)) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "localhost:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics = Array("test") val kafkaDStream = KafkaUtils.createDirectStream[String, String]( streamingContext, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) ) // 窗口大小及滑动步长均为采集周期的整数倍 val windowDStream = kafkaDStream.window(Seconds(9), Seconds(3)) val flatMapDStream = windowDStream.flatMap(_.key().split(" ")) val mapDStream = flatMapDStream.map((_, 1)) val reduceByKeyDStream = mapDStream.reduceByKey(_ + _) reduceByKeyDStream.print() streamingContext.start() streamingContext.awaitTermination() } }
-
countByWindow(windowLength, slideInterval)
:返回一个DStream窗口计算的元素数量 -
reduceByWindow(func, windowLength, slideInterval)
:通过自定义函数对DStream窗口进行聚合 -
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
:通过自定义函数对键值对类型的DStream的窗口进行reduceByKey操作。默认任务并行大小根据配置属性spark.default.parallelism
做分组,也可以通过制定numTasks来设置并行度。 -
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
:与上面的函数相比,新增了参数invFunc,这个函数是用来将上一个窗口包含的,本次窗口不包含的批次数据结果从上一个窗口的计算结果去除,这样窗口间相同的批次不需要重复计算,只需要将新增数据和除去无效数据后的结果进行计算,即可得到新的结果,提高了计算的效率。注:使用时必须开启checkpoint -
countByValueAndWindow(windowLength, slideInterval, [numTasks])
:对键值对类型的DStream进行处理,返回(K, Long)类型的DStream,其中键的值为其在窗口中出现的频次。
3. 其他重要操作
transform
join
网友评论