美文网首页
Spark-DStream数据转换

Spark-DStream数据转换

作者: 布莱安托 | 来源:发表于2020-07-07 14:47 被阅读0次

    DStream的原语与RDD类似,分文转换(Transformation)和输出(Output)两种,此外还有一些特殊的原语,如:updateStateByKey,transform以及各种窗口(window)相关的原语。

    无状态转化操作

    无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。支持map、flatMap、filter、reduceByKey等RDD相同的操作。

    注:如针对键值对的DStream使用reduceByKey需要添加import StreamingContext._

    无状态转化应用到DStream内部各个批次的RDD上,但是只会对单个时间周期的数据进行操作,并不会跨越时间周期。

    无状态转化操作也能在多个DStream间整合数据,不过也是在各个时间周期内。键值对DStream可以使用和RDD一样的连接相关操作,例如:cogroup、join、leftOutJoin等。

    同样,可以使用union将两个DStream内容合并起来,也可以使用StreamingContext.union()合并多个流。

    有状态转化操作

    1. updateStateByKey

    updateStateByKey用于记录历史记录,有事我们需要在DStream中跨批次维护状态。针对这种情况updateStateByKey为我们提供了一个对状态变量的访问,用于键值对的DStream。给定一个键值对类型的DStream,并传递一个按照键来更新值得函数,以此来构建出一个新的DStream,内部数据的形式为(原有键,状态)的一个键值对。

    想要维护并更新状态需要做如下两步:

    1. 定义状态,可以为任意数据类型
    2. 定义状态更新函数,函数需要实现当前批次输入与当前状态的更新操作

    使用updateStateByKey需要设置checkpoint,会通过checkpoint保存状态。

    实现有状态的Wordcount如下:

    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    
    object UpdateStateDemo {
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf().setMaster("local[4]").setAppName("UpdateStateDemo")
        val streamingContext = new StreamingContext(conf, Seconds(5))
          
        streamingContext.checkpoint("checkpoint")
    
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "localhost:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "use_a_separate_group_id_for_each_stream",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )
    
        val topics = Array("test")
    
        val kafkaDStream = KafkaUtils.createDirectStream[String, String](
          streamingContext,
          LocationStrategies.PreferConsistent,
          ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
        )
    
        val flatMapDStream = kafkaDStream.flatMap(_.key().split(" "))
    
        val mapDStream = flatMapDStream.map((_, 1))
    
        // 将原有状态值与新增数据value序列相加,获得新的状态
        val updateStateDStream = mapDStream.updateStateByKey {
          case (seq, state) => Option(state.getOrElse(0) + seq.sum)
        }
    
        updateStateDStream.print()
    
        streamingContext.start()
        streamingContext.awaitTermination()
    
      }
    }
    

    2. 窗口操作

    窗口操作可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Streaming的允许状态。基于窗口的操作会在一个比StreamingContext的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。

    注:所有基于窗口的操作均需要两个参数:1)窗口大小,2)滑动步长,两者都必须是StreamingContext中设置的采集周期的正整数倍。

    窗口大小控制每次计算最近的多少个批次的数据,计算的批次数量为windowDuration/batchInterval个。滑动步长用来控制对新的DStream进行计算的间隔,默认与批次间隔相同。

    关于窗口的操作有如下原语:

    1. window(windowLength, slideInterval):对DStream的窗口中的批次进行计算,并返回一个新的DStream

      利用window对窗口中的批次数据进行Wordcount

      import org.apache.kafka.common.serialization.StringDeserializer
      import org.apache.spark.SparkConf
      import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
      import org.apache.spark.streaming.{Seconds, StreamingContext}
      
      object WindowDemo {
        def main(args: Array[String]): Unit = {
          val conf = new SparkConf().setMaster("local[4]").setAppName("WindowDemo")
          val streamingContext = new StreamingContext(conf, Seconds(3))
      
          val kafkaParams = Map[String, Object](
            "bootstrap.servers" -> "localhost:9092",
            "key.deserializer" -> classOf[StringDeserializer],
            "value.deserializer" -> classOf[StringDeserializer],
            "group.id" -> "use_a_separate_group_id_for_each_stream",
            "auto.offset.reset" -> "latest",
            "enable.auto.commit" -> (false: java.lang.Boolean)
          )
      
          val topics = Array("test")
      
          val kafkaDStream = KafkaUtils.createDirectStream[String, String](
            streamingContext,
            LocationStrategies.PreferConsistent,
            ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
          )
      
          // 窗口大小及滑动步长均为采集周期的整数倍
          val windowDStream = kafkaDStream.window(Seconds(9), Seconds(3))
      
          val flatMapDStream = windowDStream.flatMap(_.key().split(" "))
      
          val mapDStream = flatMapDStream.map((_, 1))
      
          val reduceByKeyDStream = mapDStream.reduceByKey(_ + _)
      
          reduceByKeyDStream.print()
      
          streamingContext.start()
          streamingContext.awaitTermination()
      
        }
      }
      
    1. countByWindow(windowLength, slideInterval):返回一个DStream窗口计算的元素数量

    2. reduceByWindow(func, windowLength, slideInterval):通过自定义函数对DStream窗口进行聚合

    3. reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]):通过自定义函数对键值对类型的DStream的窗口进行reduceByKey操作。默认任务并行大小根据配置属性spark.default.parallelism做分组,也可以通过制定numTasks来设置并行度。

    4. reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]):与上面的函数相比,新增了参数invFunc,这个函数是用来将上一个窗口包含的,本次窗口不包含的批次数据结果从上一个窗口的计算结果去除,这样窗口间相同的批次不需要重复计算,只需要将新增数据和除去无效数据后的结果进行计算,即可得到新的结果,提高了计算的效率。注:使用时必须开启checkpoint

    5. countByValueAndWindow(windowLength, slideInterval, [numTasks]):对键值对类型的DStream进行处理,返回(K, Long)类型的DStream,其中键的值为其在窗口中出现的频次。

    3. 其他重要操作

    1. transform
    2. join

    相关文章

      网友评论

          本文标题:Spark-DStream数据转换

          本文链接:https://www.haomeiwen.com/subject/fdigqktx.html