美文网首页
Spark streaming之updateStateByKey

Spark streaming之updateStateByKey

作者: weare_b646 | 来源:发表于2019-04-19 09:23 被阅读0次
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
      * Created by Administrator on 2018/7/24.
      */
    object WordCount {
      def main(args:Array[String]){
        val conf=new SparkConf().setAppName("updateStateByKeyPro")
          .setMaster("local[2]")
        val ssc=new StreamingContext(conf,Seconds(10))
    
        //开启checkpoint
        ssc.checkpoint("hdfs://192.168.47.244:8020/input")
    
        //连接nc(netcat)服务,接收数据源,产生Dtream 对象
        val lines=ssc.socketTextStream("192.168.47.141",9999)
    
        //分隔单词,并将分隔后的每个单词出现次数记录为1
        val pairs=lines.flatMap(_.split(" "))
          .map(word=>(word,1))
        //调用updateStateByKey算子,统计单词在全局中出现的次数
        val result=pairs.updateStateByKey((values:Seq[Int],state:Option[Int])=>{
          //创建一个变量,用于记录单词出现次数
          var newValue=state.getOrElse(0) //getOrElse相当于if....else.....
          for(value <- values){
            newValue +=value //将单词出现次数累计相加
          }
          Option(newValue)
        })
        //直接输出结果
        result.print()
    
        ssc.start() //开启实时计算
        ssc.awaitTermination() //等待应用停止
      }
    
    }
    
    

    相关文章

      网友评论

          本文标题:Spark streaming之updateStateByKey

          本文链接:https://www.haomeiwen.com/subject/ismjwqtx.html