package com.atguigu
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
object Trans {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark01_Partition")
//构建spark上下文对象
val sc = new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(Array(("aaa",1),("cc",2),("b",4),("b",1),("b",1)))
val rdd2: RDD[(String, Iterable[Int])] = rdd.groupByKey()
val rdd3: RDD[(String, Int)] = rdd2.map {
case (key, items) => {
(key, items.sum)
}
}
rdd3.collect().foreach(println)
sc.stop()
}
}
(cc,2)
(aaa,1)
(b,6)
不使用模式匹配
package com.atguigu
import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
object Trans {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark01_Partition")
//构建spark上下文对象
val sc = new SparkContext(conf)
val rdd: RDD[(String, Int)] = sc.makeRDD(Array(("aaa",1),("cc",2),("b",4),("b",1),("b",1)))
val rdd2: RDD[(String, Iterable[Int])] = rdd.groupByKey()
val rdd3: RDD[(String, Int)] = rdd2.map {
t => {
(t._1, t._2.sum)
}
}
rdd3.collect().foreach(println)
sc.stop()
}
}
(cc,2)
(aaa,1)
(b,6)
网友评论