美文网首页
Spark streaming之updateStateByKey

Spark streaming之updateStateByKey

作者: 张明洋_4b13 | 来源:发表于2019-03-09 15:42 被阅读0次

    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}

    /**

    • Created by Administrator on 2018/7/24.
      */
      object WordCount {
      def main(args:Array[String]){
      val conf=new SparkConf().setAppName("updateStateByKeyPro")
      .setMaster("local[2]")
      val ssc=new StreamingContext(conf,Seconds(10))

      //开启checkpoint
      ssc.checkpoint("hdfs://192.168.47.244:8020/input")

      //连接nc(netcat)服务,接收数据源,产生Dtream 对象
      val lines=ssc.socketTextStream("192.168.47.141",9999)

      //分隔单词,并将分隔后的每个单词出现次数记录为1
      val pairs=lines.flatMap(_.split(" "))
      .map(word=>(word,1))
      //调用updateStateByKey算子,统计单词在全局中出现的次数
      val result=pairs.updateStateByKey((values:Seq[Int],state:Option[Int])=>{
      //创建一个变量,用于记录单词出现次数
      var newValue=state.getOrElse(0) //getOrElse相当于if....else.....
      for(value <- values){
      newValue +=value //将单词出现次数累计相加
      }
      Option(newValue)
      })
      //直接输出结果
      result.print()

    ssc.start() //开启实时计算
    ssc.awaitTermination() //等待应用停止
    

    }

    }

    相关文章

      网友评论

          本文标题:Spark streaming之updateStateByKey

          本文链接:https://www.haomeiwen.com/subject/xsospqtx.html