import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
object Spark03 {
//二次排序
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[2]").appName("03").getOrCreate()
import spark.implicits._
Logger.getRootLogger.setLevel(Level.WARN)
//sparkcore 计算方式
fun1(spark.sparkContext)
//sparksql 计算方式
fun2(spark)
spark.stop()
}
//使用sparkcore实现
/*
sparkcore实现思路:
第一步: 自定义 key 实现scala.math.Ordered接口和Serializeable接口
第二步:将要进行二次排序的数据加载, 将数据映射为 <key,value> 格式的RDD
第三步:使用sortByKey 基于自定义的key进行二次排序
第四步:去掉排序的key,只保留排序的结果
*/
def fun1( sc:SparkContext ):Unit ={
val data = List("hadoop spark scala spark", "scala mr", "spark scala mr java" )
sc.parallelize(data, 3)
// 正则表达式 \\s 表示空白字符
.flatMap( _.split("\\s+") )
.map( (_,1) )
.reduceByKey( _+_ )
.map( bean =>{
( MySortKey(bean._1, bean._2), bean )
})
.sortByKey(false,1)
.map(_._2)
.foreach( println )
}
//使用sparksql实现
/**
使用sql完成二次排序
select * from tablename order by field1, field2 desc
*/
def fun2( spark:SparkSession ):Unit ={
import spark.implicits._
val data = List("hadoop spark scala spark", "scala mr", "spark scala mr java" )
spark.sparkContext.parallelize(data,3)
.flatMap( _.split("\\s+") )
.map( (_,1) )
.reduceByKey( _+_ )
.toDF("word","count")
.createOrReplaceTempView("temptable")
val sql = s"select word,count from temptable order by count desc,word "
spark.sql(sql).show(false)
}
//Ordered[自定义数据类型] 用来自定义排序规则
case class MySortKey(first:String, second:Int) extends Ordered[MySortKey]{
//重写比较方法
// 首先 按照second字段 数字大小降序, 之后按照 first字段 字典升序
override def compare(that: MySortKey): Int = {
val temp = this.second - that.second
if ( temp != 0 ){
temp
}else{
- this.first.compareTo(that.first)
}
}
}
}
网友评论