美文网首页
案例实现-求用户访问学科的子网页top3-chache

案例实现-求用户访问学科的子网页top3-chache

作者: lehuai | 来源:发表于2018-01-05 15:52 被阅读0次
    ProjectCount2.scala
    package day08
    
    
    import java.net.URL
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
    
    import scala.collection.mutable
    
    /**
      * 缓存机制
      * 自定义一个分区器
      * 按照每种学科数据放到不同的分区器里
      */
    object ProjectCount3 {
      def main(args: Array[String]): Unit = {
    
        val conf: SparkConf = new SparkConf().setAppName("ProjectCount3").setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
    
        //获取数据
        val file: RDD[String] = sc.textFile("D:/teachingprogram/Spark学习视频/day08/access.txt")
    
        // 提取出url并生成一个元组
        val urlAndOne: RDD[(String, Int)] = file.map(line => {
          val fields = line.split("\t")
          val url = fields(1)
          (url, 1)
        })
    
        // 把相同的url聚合
        val sumedUrl: RDD[(String, Int)] = urlAndOne.reduceByKey(_+_)
    
        // 获取学科信息
        val cachedProject: RDD[(String, (String, Int))] = sumedUrl.map(x => {
          val url = x._1
          val project = new URL(url).getHost
          val count = x._2
    
          (project, (url, count))
        }).cache()
    
        // 调用Spark自带的分区器此时会发生哈希碰撞,需要自定义分区器
    //    val res: RDD[(String, (String, Int))] = cachedProject.partitionBy(new HashPartitioner(3))
    //    res.saveAsTextFile("d://out")
    
        // 得到所有学科
        val projects: Array[String] = cachedProject.keys.distinct().collect()
        // 调用自定义分区器并得到分区号
        val partitioner: ProjectPartitioner = new ProjectPartitioner(projects)
    
        // 分区
        val partitioned: RDD[(String, (String, Int))] = cachedProject.partitionBy(partitioner)
    
        // 对每个分区的数据进行排序并取top3
        val res: RDD[(String, (String, Int))] = partitioned.mapPartitions(it => {
          it.toList.sortBy(_._2._2).reverse.take(3).iterator
        })
    
        res.saveAsTextFile("d://out")
    
        sc.stop()
    
      }
    
    }
    class ProjectPartitioner(projects: Array[String]) extends Partitioner {
      // 用来存放学科和分区号
      private val projectsAndPartNum = new mutable.HashMap[String, Int]
      // 计数器,用于指定分区号
      var n = 0
    
      for(pro <- projects) {
        projectsAndPartNum += (pro -> n)
        n += 1
      }
      // 得到分区数
      override def numPartitions: Int = projects.length
      // 得到分区号
      override def getPartition(key: Any): Int = {
        projectsAndPartNum.getOrElse(key.toString,0)
      }
    }
    
    

    相关文章

      网友评论

          本文标题:案例实现-求用户访问学科的子网页top3-chache

          本文链接:https://www.haomeiwen.com/subject/zjxsnxtx.html