transformations
基础操作
def map[U: ClassTag](f: T => U): RDD[U]
- 执行map()会遍历RDD中的每一个数据想,并依次调用f函数,生成新的rdd集合
val rdd1: RDD[Int] = sc.parallelize(1 to 10)
// rdd1中的每个元素调用f函数,生成新的rdd集合
val rdd2: RDD[Int] = rdd1.map(_ * 2)
println(rdd2.collect().toList)
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
- 先遍历分区,然后通过Iterator遍历分区内的每个元素(一个分区是一个Iterator),最后返回一个Iterator
- preservesPartitioning是否保留rdd分区信息,默认false
val rdd1: RDD[Int] = sc.parallelize(1 to 10,2)
val rdd2: RDD[Int] = rdd1.mapPartitions(iter => {
val arr = ArrayBuffer[Int]()
while (iter.hasNext) {
val item: Int = iter.next()
arr+=item*2
}
arr.iterator
})
println(rdd2.collect().toList)
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]
- 与mapPartitions不同的是
f: (Int, Iterator[T])
定义中多个Int类型的参数,该参数为分区对应的分区号
val rdd1: RDD[Int] = sc.parallelize(1 to 10,2)
val rdd2: RDD[String] = rdd1.mapPartitionsWithIndex((index,iter) => {
val arr = ArrayBuffer[String]()
while (iter.hasNext) {
val item: Int = iter.next()
arr+=item+"_"+index
}
arr.iterator
})
println(rdd2.collect().toList)
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
- f函数返回的是TraversableOnce集合,flatMap将集合中的每一个元素拆分出来存放到新的rdd中
val rdd1: RDD[String] = sc.parallelize(Array("a,b,c","A,B,C","D,e,F"))
val rdd2: RDD[String] = rdd1.flatMap(_.split(","))
println(rdd2.collect().toList)
def filter(f: T => Boolean): RDD[T]
- 将rdd中的每一个元素应用f函数,如果返回true,被添加到新的rdd中
val rdd1: RDD[Int] = sc.parallelize(1 to 10)
val rdd2: RDD[Int] = rdd1.filter(n=>n%2==0)
println(rdd2.collect().toList)
def distinct(): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
- 对元素去重,放入新的rdd中
- 默认distinct()会生成与原rdd分区个数一致的分区数
- 对map+reduceByKey的封装
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
val rdd1: RDD[Int] = sc.parallelize(Array(1,2,3,4,2,2,2,2,3))
val rdd2: RDD[Int] = rdd1.distinct()
println(rdd2.collect().toList)
def union(other: RDD[T]): RDD[T]
- 并集,两个rdd都所有的元素
- 不去重
val rdd1: RDD[Int] = sc.parallelize(1 to 10)
val rdd2:RDD[Int] = sc.parallelize(1 to 20)
// List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
println(rdd1.union(rdd2).collect().toList)
def intersection(other: RDD[T]): RDD[T]
- 交集,两个rdd中共有的元素
- 去重
val rdd1: RDD[Int] = sc.parallelize(1 to 10)
val rdd2:RDD[Int] = sc.parallelize(1 to 20)
// List(8, 1, 9, 10, 2, 3, 4, 5, 6, 7) 无序
println(rdd1.intersection(rdd2).collect().toList)
def subtract(other: RDD[T]): RDD[T]
- 差集,在rdd1中存在,但在rdd2中不存在的元素
- 不去重
val rdd1: RDD[Int] = sc.parallelize(1 to 10)
val rdd2:RDD[Int] = sc.parallelize(1 to 20)
// List(16, 17, 18, 19, 11, 20, 12, 13, 14, 15) 无序
println(rdd2.subtract(rdd1).collect().toList)
def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty) (implicit ord: Ordering[T] = null): RDD[T]
- 将分区数较多的rdd转换成分区数较少的rdd
-
shuffle: Boolean = false
时,如果分区数大于原始分区数,coalesce函数不会起作用
- 较多分区数聚合为较少分区数不必经过shuffle,较少分区数扩展较多分区数必须经过shuffle
val rdd1: RDD[Int] = sc.parallelize(1 to 10,5)
val rdd2: RDD[Int] = rdd1.coalesce(2)
val rdd2: RDD[Int] = rdd1.coalesce(10) // 不会起作用
println(rdd1.partitions.size)
println(rdd2.partitions.size)
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
- 内部实现
coalesce(numPartitions, shuffle = true)
,都要经过shuffle
val rdd1: RDD[Int] = sc.parallelize(1 to 10,5)
val rdd2: RDD[Int] = rdd1.repartition(10)
println(rdd1.partitions.size)
println(rdd2.partitions.size)
def randomSplit(weights: Array[Double],seed: Long = Utils.random.nextLong): Array[RDD[T]]
- 根据weights对一个rdd进行拆分,比如Array(1,4,5),会拆分成3个rdd,元素比例接近为1:4:5,比例之和最好为1
val rdd1: RDD[Int] = sc.parallelize(1 to 10)
val rdd2: Array[RDD[Int]] = rdd1.randomSplit(Array(1,8,1))
println(rdd2(0).collect().toList)
println(rdd2(1).collect().toList)
println(rdd2(2).collect().toList)
def glom(): RDD[Array[T]]
- 将rdd中每一个分区变成一个数组,放在新的rdd中
val rdd1: RDD[Int] = sc.parallelize(1 to 10,5)
val rdd2: RDD[Array[Int]] = rdd1.glom()
println(rdd2.collect().toList)
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
- 将两个rdd中的元素以k-v形式进行合并,key为rdd1(0),value为rdd2(0)
- 两个rdd中的元素个数和分区数必须完全一样
val rdd1: RDD[Int] = sc.parallelize(1 to 5,2)
val rdd2: RDD[String] = sc.parallelize(Array("A","B","C","D","E"),2)
val rdd3: RDD[(Int, String)] = rdd1.zip(rdd2)
println(rdd3.collect().toList)
def zipPartitions[B: ClassTag, V: ClassTag] (rdd2: RDD[B]) (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V]
-
f: (Iterator[T], Iterator[B])
定义如何对每一个分区中的元素进行zip
- 分区数必须一致,元素个数和元素类型可以不一致,迭代逻辑处理这些不一致的问题
val rdd1: RDD[Int] = sc.parallelize(1 to 5,2)
val rdd2: RDD[String] = sc.parallelize(Array("A","B","C","D","E"),2)
val rdd3: RDD[(Int, String)] = rdd1.zipPartitions(rdd2)((r1, r2) => {
var list = List[(Int, String)]()
while (r1.hasNext && r2.hasNext) {
list ::= (r1.next(), r2.next())
}
list.iterator
})
println(rdd3.collect().toList)
def zipWithIndex(): RDD[(T, Long)]
- 将rdd中的元素与该元素在rdd中的索引进行合并
def zipWithUniqueId(): RDD[(T, Long)]
- 将rdd中的元素与该元素对应的唯一id进行zip操作
- 唯一id生成规则
xn+k
,n=总分区数,k=元素在rdd中的分区号,x=从0开始自然递增
val rdd1: RDD[String] = sc.parallelize(Array("A","B","C","D"),2)
val rdd2: RDD[(String, Long)] = rdd1.zipWithIndex()
val rdd3: RDD[(String, Long)] = rdd1.zipWithUniqueId()
println(rdd2.collect().toList)
println(rdd3.collect().toList)
网友评论