思考:
RDD的分区怎么获取?
RDD分区怎么根据数据划分分区?
例如:
val rdd=sc.parallelize(1 to 10,5)
首先看一下parallelize方法
/** Distribute a local Scala collection to form an RDD.
*
* @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
* to parallelize and before the first action on the RDD, the resultant RDD will reflect the
* modified collection. Pass a copy of the argument to avoid this.
* @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
* RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
*/
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
1.分发一个本地Scala集合来形成一个RDD
2.parallelize是懒加载,如果是可变集合,则在action操作之前都是rdd都是"可变的",建议传递参数副本来避免修改原来集合
例如以下操作结果
var data=ListBuffer[Int](1,2,3)
val rdd=sc.parallelize(data,5)
data.+=(4)
import sqlContext.implicits._
rdd.toDF("count").show()
结果为:
+-----+
|count|
+-----+
| 1|
| 2|
| 3|
| 4|
+-----+
3.接下来会new ParallelCollectionRDD
RDD中获取分区编号为:
rdd.partitions.foreach(part=>println(part.index+"分区类型:"+part.getClass))
通过获取分区index发现分区类型为ParallelCollectionPartition
现在我们看一下ParallelCollectionPartition,然而无果,我们无法在自定义类中使用ParalleCollectionPartition,因为scala的访问修饰符原因。只能在spark中使用。所以要结合spark api 如果有哪位大神可以通过partition获取分区数据可以在评论指出。当然也可以重新自己定义partition这就要修改源码重新编译了。
结合api发现:
/**
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*
* `preservesPartitioning` indicates whether the input function preserves the partitioner, which
* should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
*/
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning)
}
这个方法中会有index传入具体的f cleanedF(index, iter)
rdd.mapPartitionsWithIndex{
(partid,iter)=>{
var part_map = scala.collection.mutable.Map[String,List[Int]]()
var part_name = "part_" + partid
part_map(part_name) = List[Int]()
while(iter.hasNext){
part_map(part_name) :+= iter.next()
}
part_map.iterator
}
}.collect.foreach(println)
这个函数是一个transform,传入分区id和每个分区的迭代器,返回一个迭代器
此时这个迭代器就是我们要实现的业务,查询每个分区中的数据。
是否可以从Taskcontext中获取分区?
下面有一个简单粗暴的方法:
rdd.foreach(row=>{
println("partitionID:"+TaskContext.getPartitionId()+"data:"+row)
})
DF同理
网友评论