练习
package day07
import org.apache.spark.{SparkConf, SparkContext}
object SparkRDDTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkRDDTest").setMaster("local")
val sc = new SparkContext(conf)
// 通过并行化生成rdd
val rdd1 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10))
// 对rdd1里的每一个元素乘2然后排序
// val res1 = rdd1.map(_ * 2).sortBy(x => x,true)
// println(res1.collect().toBuffer)
// 过滤出大于等于10的元素
// val res2 = res1.filter(_ >= 10)
// 将元素以数组的方式打印出来
// println(res2.collect().toBuffer)
val rdd2 = sc.parallelize(Array("a b c","d e f","h i j"))
// 将rdd2里面的每一个元素先切分再压平
// val res = rdd2.flatMap(_.split(' '))
// println(res.collect.toBuffer)
// 来个复杂的,
val rdd3 = sc.parallelize(List(List("a b c","a b b"),List("e f g","a f g"),List("h i j","a a b")))
// 将rdd3里面的每一个元素先切分再压平
// val res = rdd3.flatMap(_.flatMap(_.split(" ")))
// println(res.collect().toBuffer)
val rdd4 = sc.parallelize(List(5,6,4,3))
val rdd5 = sc.parallelize(List(1,2,3,4))
// 求并集
val unionres = rdd4 union rdd5
// println(res.collect().toBuffer)
// 求交集
// println(rdd4.intersection(rdd5).collect().toBuffer)
// 去重
// println(unionres.distinct().collect().toBuffer)
val rdd6 = sc.parallelize(List(("tom",1),("jerry",3),("kitty",2)))
val rdd7 = sc.parallelize(List(("jerry",2),("tom",1),("shuke",2)))
// 求join
// println((rdd6 join rdd7).collect().toBuffer)
// 求左连接和右连接
// val res1 = rdd6.leftOuterJoin(rdd7)
// val res2 = rdd6.rightOuterJoin(rdd7)
// println(res1.collect().toBuffer)
// 求并集
// val res = rdd6 union(rdd7)
// 按key进行分组
// println(res.groupByKey().collect().toBuffer)
// 分别用groupByKey和reduceByKey实现单词计数,注意groupByKey与reduceByKey的区别
// groupByKey
// println(res.groupByKey().mapValues(_.sum).collect().toBuffer)
// reduceByKey
// println(res.reduceByKey(_ + _).collect.toBuffer)
val rdd8 = sc.parallelize(List(("tom",1),("tom",2),("jerry",3),("kitty",2)))
val rdd9 = sc.parallelize(List(("jerry",2),("tom",1),("shuke",2)))
// cogroup 注意cogroup与groupByKey的区别
// println(rdd8.cogroup(rdd9).collect().toBuffer)
val rdd10 = sc.parallelize(List(1,2,3,4,5))
// reduce聚合
// println(rdd10.reduce(_+_))
val rdd11 = sc.parallelize(List(("tom",1),("jerry",3),("kitty",2),("shuke",1)))
val rdd12 = sc.parallelize(List(("jerry",2),("tom",3),("shuke",2),("kitty",5)))
val rdd13 = rdd11.union(rdd12)
// 按key进行聚合
// reduceByKey
// 按value的降序排序
val res = rdd13.reduceByKey(_+_).map(t => (t._2,t._1)).sortByKey(false).map(t => (t._2,t._1))
println(res.collect.toBuffer)
// 笛卡尔积
// println(rdd11.cartesian(rdd12).collect.toBuffer)
// 其他:count、top、take、first、takeOrdered
}
}
网友评论