美文网首页
(十一)Spark Core求每个域名的流量

(十一)Spark Core求每个域名的流量

作者: 白面葫芦娃92 | 来源:发表于2018-09-18 16:02 被阅读0次

    原始文件baidu.log内容:

    hello   world   spark   hadoop  hive    223.104.18.110  v1.go2yd.com    17168
    world   spark   hello   mysql   sqoop   113.101.75.194  v2.go2yd.com    17222
    spark   hello   mysql   hive    world   27.17.127.135   v2.go2yd.com    1556
    ......
    ......
    

    需求一:求每个域名的流量
    步骤:
    1)获取到日志中每条记录的域名和流量
    2)按照域名进行分组并求流量的和

    object test {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("test").setMaster("local[2]")
        val sc = new SparkContext(sparkConf)
        val lines = sc.textFile("file:///E:/BigDataSoftware/data/baidu.log")
    
        val splits = lines.map(x=> {
             val tmp = x.split("\t")
             (tmp(6),tmp(7).toLong)  //取出域名和流量两列,并注意将流量转成Long类型,不然会变成字符串相加
           })
        val results = splits.reduceByKey(_+_)
        results.take(10).foreach(println)
        
    sc.stop()
      }
    }
    

    如果流量数据中有字符串等脏数据时,toLong报错导致任务失败,报错如下

    java.lang.NumberFormatException: For input string: "10701str"
        at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
        at java.lang.Long.parseLong(Long.java:589)
        at java.lang.Long.parseLong(Long.java:631)
        at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:276)
        at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
        at www.ruozedata.test$$anonfun$2.apply(test.scala:20)
    .......
    

    解决办法:加入一个判断

    object test {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("test").setMaster("local[2]")
        val sc = new SparkContext(sparkConf)
        val lines = sc.textFile("file:///E:/BigDataSoftware/data/baidu.log")
      
      val splits = lines.map(x=> {
          val tmp = x.split("\t")
          var traffic = 0L
          try {
            traffic = tmp(7).toLong
          } catch {
            case e:Exception => traffic = 0L
          }
          (tmp(6),traffic)
        })
        val results = splits.reduceByKey(_+_)
        results.take(10).foreach(println)
        
    sc.stop()
      }
    }
    
    输出结果:
    ------------------------------------------------
    (v2.go2yd.com,38034)
    (v4.go2yd.com,31969)
    (v3.go2yd.com,44598)
    (v1.go2yd.com,77995)
    

    但是发现,有一条带空格的数据没有被算进去,损失一条数据,加个trim即可解决

    object test {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setAppName("test").setMaster("local[2]")
        val sc = new SparkContext(sparkConf)
        val lines = sc.textFile("file:///E:/BigDataSoftware/data/baidu.log")
    
        val splits = lines.map(x=> {
          val tmp = x.split("\t")
          var traffic = 0L
          try {
            traffic = tmp(7).trim.toLong
          } catch {
            case e:Exception => traffic = 0L
          }
          (tmp(6),traffic)
        })
        val results = splits.reduceByKey(_+_)
        results.take(10).foreach(println)
        
    sc.stop()
      }
    }
    
    输出结果:
    ------------------------------------------------
    (v2.go2yd.com,38034)
    (v4.go2yd.com,31969)
    (v3.go2yd.com,51784)
    (v1.go2yd.com,77995)
    

    最后一条的流量加进来了

    相关文章

      网友评论

          本文标题:(十一)Spark Core求每个域名的流量

          本文链接:https://www.haomeiwen.com/subject/xfiynftx.html