ProjectCount2.scala
package day08
import java.net.URL
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
import scala.collection.mutable
/**
* 缓存机制
* 自定义一个分区器
* 按照每种学科数据放到不同的分区器里
*/
object ProjectCount3 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("ProjectCount3").setMaster("local[2]")
val sc: SparkContext = new SparkContext(conf)
//获取数据
val file: RDD[String] = sc.textFile("D:/teachingprogram/Spark学习视频/day08/access.txt")
// 提取出url并生成一个元组
val urlAndOne: RDD[(String, Int)] = file.map(line => {
val fields = line.split("\t")
val url = fields(1)
(url, 1)
})
// 把相同的url聚合
val sumedUrl: RDD[(String, Int)] = urlAndOne.reduceByKey(_+_)
// 获取学科信息
val cachedProject: RDD[(String, (String, Int))] = sumedUrl.map(x => {
val url = x._1
val project = new URL(url).getHost
val count = x._2
(project, (url, count))
}).cache()
// 调用Spark自带的分区器此时会发生哈希碰撞,需要自定义分区器
// val res: RDD[(String, (String, Int))] = cachedProject.partitionBy(new HashPartitioner(3))
// res.saveAsTextFile("d://out")
// 得到所有学科
val projects: Array[String] = cachedProject.keys.distinct().collect()
// 调用自定义分区器并得到分区号
val partitioner: ProjectPartitioner = new ProjectPartitioner(projects)
// 分区
val partitioned: RDD[(String, (String, Int))] = cachedProject.partitionBy(partitioner)
// 对每个分区的数据进行排序并取top3
val res: RDD[(String, (String, Int))] = partitioned.mapPartitions(it => {
it.toList.sortBy(_._2._2).reverse.take(3).iterator
})
res.saveAsTextFile("d://out")
sc.stop()
}
}
class ProjectPartitioner(projects: Array[String]) extends Partitioner {
// 用来存放学科和分区号
private val projectsAndPartNum = new mutable.HashMap[String, Int]
// 计数器,用于指定分区号
var n = 0
for(pro <- projects) {
projectsAndPartNum += (pro -> n)
n += 1
}
// 得到分区数
override def numPartitions: Int = projects.length
// 得到分区号
override def getPartition(key: Any): Int = {
projectsAndPartNum.getOrElse(key.toString,0)
}
}
网友评论