# ./spark-shell --master spark://192.1368.56.21:7077 --executor-memory 1g --total-executor-cores 2
scala> sc.textFile("hdfs://hadoop21:9000/wc/").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
res0: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:25
scala> sc.textFile("hdfs://hadoop21:9000/wc/").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res1: Array[(String, Int)] = Array((tom,11), (hello,22), (jerry,6), (kitty,1), (hanmeimei,2), (lilei,2))
CustomSort.scala
package day10
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object MySort {
implicit val girlOrdering = new Ordering[Girl] {
override def compare(x: Girl, y: Girl): Int = {
if (x.faceValue != y.faceValue) {
x.faceValue - y.faceValue
}else {
// if (x.age > y.age){
y.age - x.age
}
// else {
// x.age - y.age
// }
// }
}
}
}
object CustomSort {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("CustomSort").setMaster("local[2]")
val sc = new SparkContext(conf)
val girlInfo = sc.parallelize(Array(("tingting",80,25),("ningning",90,26),("mimi",90,27)))
// val res = girlInfo.sortBy(_._2,false)
// 第一种排序方式
// import MySort.girlOrdering
// val res: RDD[(String, Int, Int)] = girlInfo.sortBy(x => Girl(x._2,x._3),false)
// 第二种排序方式
val res: RDD[(String, Int, Int)] = girlInfo.sortBy(x => Girl(x._2,x._3),false)
println(res.collect.toBuffer)
sc.stop()
}
}
//第一种排序方式
//case class Girl(faceValue: Int, age: Int){}
//第二种排序方式
case class Girl(faceValue: Int, age: Int) extends Ordered[Girl] {
override def compare(that: Girl): Int = {
if (this.faceValue != that.faceValue){
this.faceValue - that.faceValue
}else {
that.age - this.age
}
}
}
网友评论