美文网首页
mapWithState 计算单词个数(官方示例)

mapWithState 计算单词个数(官方示例)

作者: 焉知非鱼 | 来源:发表于2018-09-07 13:39 被阅读325次
    package allinone
    
    import org.apache.spark.streaming._
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.SparkSession
    
    /**
      * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every
      * second starting with initial value of word count.
      * Usage: StatefulNetworkWordCount <hostname> <port>
      *   <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
      *   data.
      *
      * To run this on your local machine, you need to first run a Netcat server
      *    `$ nc -lk 9999`
      * and then run the example
      *    `spark-submit.sh allinone.StatefulNetworkWordCount localhost 9999`
      */
    
    object StatefulNetworkWordCount {
      def main(args: Array[String]) {
        if (args.length < 2) {
          System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>")
          System.exit(1)
        }
    
        // 本地模式运行,便于测试
        Logger.getLogger("org").setLevel(Level.WARN)
        val spark = SparkSession.builder()
          .appName(this.getClass.getName)
          .master("local[2]")
          .getOrCreate()
    
        spark.sparkContext.setLogLevel("WARN")
    
        // Create the context with a 1 second batch size
        val ssc = new StreamingContext(spark.sparkContext, Seconds(10))
        ssc.checkpoint("/tmp/word-count-map-with-state")
    
        // Initial state RDD for mapWithState operation
        val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
    
        // Create a ReceiverInputDStream on target ip:port and count the
        // words in input stream of \n delimited test (eg. generated by 'nc')
        val lines = ssc.socketTextStream(args(0), args(1).toInt)
        val words = lines.flatMap(_.split(" "))
        val wordDstream = words.map(x => (x, 1))
    
        // Update the cumulative count using mapWithState
        // This will give a DStream made of state (which is the cumulative count of the words)
        // 定义一个接收三个参数的匿名函数
        val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
          val sum = one.getOrElse(0) + state.getOption.getOrElse(0)  // 当前值加上上一个批次的该状态的值
          val output = (word, sum) // 输出单词和该单词出现的次数
          state.update(sum) // 更新当前的状态
          output // 该匿名函数的返回值为输出结果
        }
    
        // 使用 mapWithState 更新状态
        val stateDstream = wordDstream.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD))
        stateDstream.print()
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    相关文章

      网友评论

          本文标题:mapWithState 计算单词个数(官方示例)

          本文链接:https://www.haomeiwen.com/subject/vrkcgftx.html