美文网首页
Spark 运行架构

Spark 运行架构

作者: yonggang_sun | 来源:发表于2016-05-18 17:36 被阅读148次

    Spark 运行架构

    解决原始数据乱码的问题

    1. 使用atom可以检查出文件的编码格式为gb2312
    2. 使用iconv可以直接转码,遇到错误忽略掉即可
    3. 将文件导入hdfs上

    The filesystem under path '/hdfs/user/data/SogouQ3_utf8.txt' is HEALTHY

    最热门的查询词排行 top10

    1. 选取第三个字段,做一个wordcount,然后交换(key,value)-> (value, key),按照现有的key排序,排序完成后,再一次交换(k ,v),最后选取前10
    2. 具体代码
    package io.github.week3
    
    import org.apache.spark.{SparkContext, SparkConf}
    
    /**
      * Created by sunyonggang on 16/4/22.
      */
    class TermsTop {
    
    }
    object TermsTop {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("TermsTop10").setMaster("spark://gg01:7077")
        val sc = new SparkContext(conf)
    
        val logRDD = sc.textFile("hdfs://gg01:9000/hdfs/user/data/SogouQ3_utf8.txt")
    
        //统计其中的搜索词,并排序前10
        val wc = logRDD.map(line => line.split("\t")).filter(_.length == 6).map(line => (line(2), 1)).reduceByKey(_ + _)
        val termsTop = wc.map(word => (word._2, word._1)).sortByKey(false).map(word => (word._2, word._1)).take(10)
    
        for (term <- termsTop) {
          println(term)
        }
      }
    }
    

    3.结果:

    16/04/23 10:52:38 INFO DAGScheduler: ResultStage 4 (take at TermsTop.scala:20) finished in 0.462 s
    16/04/23 10:52:38 INFO DAGScheduler: Job 1 finished: take at TermsTop.scala:20, took 4.698771 s
    (百度,77627)
    (baidu,36564)
    (人体艺术,29598)
    (4399小游戏,23306)
    (优酷,20847)
    (qq空间,20677)
    (新亮剑,19205)
    (馆陶县县长闫宁的父亲,17842)
    (公安卖萌,16612)
    (百度一下 你就知道,15212)
    16/04/23 10:52:38 INFO SparkContext: Invoking stop() from shutdown hook
    16/04/23 10:52:38 INFO SparkUI: Stopped Spark web UI at http://192.168.199.150:4040
    

    用户查询排行 top10

    1. 同第一个问题,只不过选取第二个字段
    2. 具体代码
    package io.github.week3
    
    import org.apache.spark.{SparkContext, SparkConf}
    
    /**
      * Created by sunyonggang on 16/4/22.
      */
    
    class UsersTop {
    
    }
    object UsersTop {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("TermsTop10").setMaster("spark://gg01:7077")
        val sc = new SparkContext(conf)
    
        val logRDD = sc.textFile("hdfs://gg01:9000/hdfs/user/data/SogouQ3_utf8.txt")
    
        //统计其中的user id,并排序前10
        val wc = logRDD.map(line => line.split("\t")).filter(_.length == 6).map(line => (line(1), 1)).reduceByKey(_ + _)
        val termsTop = wc.map(word => (word._2, word._1)).sortByKey(false).map(word => (word._2, word._1)).take(10)
    
        for (term <- termsTop) {
          println(term)
        }
      }
    }
    

    3.结果

    16/04/23 10:56:31 INFO DAGScheduler: ResultStage 4 (take at UsersTop.scala:21) finished in 1.511 s
    16/04/23 10:56:31 INFO DAGScheduler: Job 1 finished: take at UsersTop.scala:21, took 5.284386 s
    (ac65768b987c20b3b25cd35612f61892,20385)
    (9faa09e57c277063e6eb70d178df8529,11653)
    (02a8557754445a9b1b22a37b40d6db38,11528)
    (cc7063efc64510c20bcdd604e12a3b26,2571)
    (b64b0ec03efd0ca9cef7642c4921658b,2355)
    (7a28a70fe4aaff6c35f8517613fb5c67,1292)
    (b1e371de5729cdda9270b7ad09484c4f,1277)
    (f656e28e7c3e10c2b733e6b68385d5a2,1241)
    (7eab8caf9708d68e6964220e2f89e80d,1181)
    (c72ce1164bcd263ba1f69292abdfdf7c,1120)
    16/04/23 10:56:31 INFO SparkContext: Invoking stop() from shutdown hook
    16/04/23 10:56:31 INFO SparkUI: Stopped Spark web UI at http://192.168.199.150:4040
    

    网站访问排行版(不用区分二级域名) top50

    1. 选取第六个字段,截取一级域名,然后与前面两个问题一样的解决方案
    2. 具体代码
    package io.github.week3
    
    import org.apache.spark.{SparkContext, SparkConf}
    
    /**
      * Created by sunyonggang on 16/4/22.
      */
    class WebsiteTop50 {
    
    }
    object WebsiteTop50 {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("TermsTop10").setMaster("spark://gg01:7077")
        val sc = new SparkContext(conf)
    
        val logRDD = sc.textFile("hdfs://gg01:9000/hdfs/user/data/SogouQ3_utf8.txt")
    
        //统计其中的website,并排序前50
        val wc = logRDD.map(line => line.split("\t")(5)).map(website => getFirstDomainFromWebsite(website)).reduceByKey(_ + _)
        val termsTop = wc.map(word => (word._2, word._1)).sortByKey(false).map(word => (word._2, word._1)).take(50)
    
        for (term <- termsTop) {
          println(term)
        }
      }
      def getFirstDomainFromWebsite(line : String) : (String , Int) = {
        val regex = """(\w+\:\/\/.*?\/)(.*)""".r
        val regex(domain, seconddomain) = line
        (domain, 1)
      }
    }
    

    3.结果,(ps:上面的正则表达式可能不够健壮)

    16/04/23 11:00:46 INFO DAGScheduler: Job 1 finished: take at WebsiteTop50.scala:20, took 2.012784 s
    (http://zhidao.baidu.com/,584230)16/04/23 11:00:46 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 27) in 688 ms on 192.168.199.146 (1/1)
    16/04/23 11:00:46 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool
    
    (http://tv.sogou.com/,290795)
    (http://wenwen.soso.com/,287967)
    (http://baike.baidu.com/,229051)
    (http://www.youku.com/,158918)
    (http://www.baidu.com/,148608)
    (http://wenku.baidu.com/,127176)
    (http://www.tudou.com/,125944)
    (http://tieba.baidu.com/,124058)
    (http://v.youku.com/,122190)
    (http://www.docin.com/,74127)
    (http://www.4399.com/,70430)
    (http://blog.sina.com.cn/,67190)
    (http://www.ranwen.com/,55428)
    (http://tv.sohu.com/,51935)
    (http://v.ku6.com/,43433)
    (http://www.tvmao.com/,41431)
    (http://hi.baidu.com/,41347)
    (http://www.360doc.com/,41164)
    (http://iask.sina.com.cn/,40803)
    (http://www.56.com/,37227)
    (http://www.hao123.com/,32477)
    (http://www.qiyi.com/,31019)
    (http://ishare.iask.sina.com.cn/,30783)
    (http://www.tianya.cn/,30571)
    (http://www.7k7k.com/,29045)
    (http://www.douban.com/,27587)
    (http://www.booksky.org/,26782)
    (http://detail.zol.com.cn/,26401)
    (http://qzone.qq.com/,26036)
    (http://www.qire123.com/,25440)
    (http://www.12306.cn/,25383)
    (http://cf.qq.com/,24262)
    (http://www.qidian.com/,24115)
    (http://blog.163.com/,23254)
    (http://www.xixiwg.com/,22503)
    (http://www.a67.com/,21706)
    (http://weibo.com/,20811)
    (http://dzh.mop.com/,20431)
    (http://detail.china.alibaba.com/,20351)
    (http://kankan.xunlei.com/,18656)
    (http://dl.pconline.com.cn/,18310)
    (http://bbs1.people.com.cn/,17534)
    (http://news.ifeng.com/,17442)
    (http://news.shangdu.com/,16957)
    (http://news.xinhuanet.com/,16621)
    (http://dnf.qq.com/,16427)
    (http://item.taobao.com/,15764)
    (http://roll.sohu.com/,15588)
    (http://www.qzone.cc/,15236)
    16/04/23 11:00:46 INFO SparkContext: Invoking stop() from shutdown hook
    16/04/23 11:00:46 INFO SparkUI: Stopped Spark web UI at http://192.168.199.150:4040
    

    相关文章

      网友评论

          本文标题:Spark 运行架构

          本文链接:https://www.haomeiwen.com/subject/omuwrttx.html