Spark RDD
一、基本RDD运算
val intRDD=sc.parallelize(List(3,1,2))
val sr=sc.parallelize(List("a","b"))
val sr2=sc.parallelize(List("a c","b"))
1.collect RDD类型的数据转化为数组
2.map
intRDD.map(x =>x+1).collect()
intRDD.map(_+1).collect()
sr.map((_,1))
3.flatMap
sr2.flatMap(_.split(" ")).collect()
4.filter
intRDD.filter(x =>x<3).collect()
intRDD.filter(_<3).collect()
sr.filter(x =>x.contains("a")).collect()
4.randomSplit
val intsR=intRDD.randomSplit(Array(0.5,0.5))
insR(0).collect()
5.group
val gRDD=intRDD.groupBy(x => if(x>2) "d" else "x" ).collect()
gRDD(0) gRDD(1)
二、多个RDD运算
2.1 union
val intRDD1=sc.parallelize(List(3,1,2))
val intRDD2=sc.parallelize(List(4,5))
val intRDD3=sc.parallelize(List(7,8))
val intRDD4=sc.parallelize(List(2,8))
(intRDD1 ++ intRDD2 ++ intRDD3).collect()
intRDD1.union(intRDD2).collect()
2.2 intersection 交集
intRDD1.intersection(intRDD4).collect()
2.3 subtract 差集
intRDD1.subtract(intRDD4).collect()
2.4 cartesian 卡迪乘积
intRDD1.cartesian(intRDD4).collect()
三、动作运算
intRDD.first
intRDD.take(3)
读取前3条小-->大
intRDD.takeOrdered(3)
读取前3条大-->小
intRDD.takeOrdered(3)(Ordering[Int].reverse)
intRDD.count
intRDD.max
intRDD.mean
intRDD.sum
intRDD.min
四、RDD k-v 基本'转换'运算
val kvRDD1 =sc.parallelize(List((1,2),(1,3),(2,3),(2,2)))
kvRDD1.keys.collect()
kvRDD1.values.collect()
4.1 filter
kvRDD1.filter{ case (k,v) => k<2}.collect()
4.2 mapValues
kvRDD1.mapValues(_ * 2).collect()
kvRDD1.mapValues(x => x * 2).collect()
4.3 sortByKey
kvRDD1.sortByKey().collect()
kvRDD1.sortByKey(true).collect()
kvRDD1.sortByKey(false).collect()
4.4 reduceByKey
kvRDD1.reduceByKey((x,y) =>{x+y } ).collect()
kvRDD1.reduceByKey(_+_).collect()
五、多个RDD k-v'转换'运算
val kvRDD2 =sc.parallelize(List( (1,6),(2,5),(2,7)))
val kvRDD3 =sc.parallelize(List((1,2),(1,3)))
5.1 join
kvRDD2.join(kvRDD3).collect()
kvRDD2.join(kvRDD3).foreach(println)
5.2 leftOuterJoin
kvRDD2.leftOuterJoin(kvRDD3).collect()
5.3 rightOuterJoin
kvRDD2.rightOuterJoin(kvRDD3).collect()
5.4 subtract kvRDD1删除存在kvRDD3的key
kvRDD1.subtract(kvRDD3).collect()
六、k-v 动作 运算
kvRDD1.first
kvRDD1.first._1
kvRDD1.first._2
kvRDD1.countByKey()
kvRDD1.take
kvRDD1.max
kvRDD1.min
var kv =kvRDD1.collectAsMap
kv(1)
根据key查值
kvRDD1.lookup(2)
七、广播变量
7.1 使用广播变量 Broadcast
val kvf =sc.parallelize(List((1,"apple"),(2,"orange")))
val kvmap =kvf.collectAsMap()
val fids = sc.parallelize(List(2,1))
val fnames =fids.map( x => kvmap(x)).collect()
==>
val bcmap = sc.broadcast(kvmap) //广播变量
val bcfids = sc.parallelize(List(2,1))
val bcnames = fids.map( x => bcmap.value(x) ).collect() //
7.2 累加器 accumulator
val total =sc.accumulator(0.0)
intRDD.foreach(i => { total +=i })
八、单词统计
sc.textFile("file:/usr/local/src/draw.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect()
网友评论