美文网首页
RDD or DF获取分区编号和数据

RDD or DF获取分区编号和数据

作者: 灯火gg | 来源:发表于2019-04-18 18:00 被阅读0次

思考:

RDD的分区怎么获取?
RDD分区怎么根据数据划分分区?

例如:

val rdd=sc.parallelize(1 to 10,5)

首先看一下parallelize方法

 /** Distribute a local Scala collection to form an RDD.
   *
   * @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
   * to parallelize and before the first action on the RDD, the resultant RDD will reflect the
   * modified collection. Pass a copy of the argument to avoid this.
   * @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
   * RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
   */
  def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
  }

1.分发一个本地Scala集合来形成一个RDD
2.parallelize是懒加载,如果是可变集合,则在action操作之前都是rdd都是"可变的",建议传递参数副本来避免修改原来集合
例如以下操作结果

  var data=ListBuffer[Int](1,2,3)
  val rdd=sc.parallelize(data,5)
  data.+=(4)
  import sqlContext.implicits._
  rdd.toDF("count").show()
  结果为:
  +-----+
  |count|
  +-----+
  |    1|
  |    2|
  |    3|
  |    4|
  +-----+

3.接下来会new ParallelCollectionRDD

RDD中获取分区编号为:

rdd.partitions.foreach(part=>println(part.index+"分区类型:"+part.getClass))

通过获取分区index发现分区类型为ParallelCollectionPartition

现在我们看一下ParallelCollectionPartition,然而无果,我们无法在自定义类中使用ParalleCollectionPartition,因为scala的访问修饰符原因。只能在spark中使用。所以要结合spark api 如果有哪位大神可以通过partition获取分区数据可以在评论指出。当然也可以重新自己定义partition这就要修改源码重新编译了。

结合api发现:

 /**
   * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
   * of the original partition.
   *
   * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
   * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
   */
  def mapPartitionsWithIndex[U: ClassTag](
      f: (Int, Iterator[T]) => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U] = withScope {
    val cleanedF = sc.clean(f)
    new MapPartitionsRDD(
      this,
      (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
      preservesPartitioning)
  }

这个方法中会有index传入具体的f cleanedF(index, iter)

 rdd.mapPartitionsWithIndex{
    (partid,iter)=>{
      var part_map = scala.collection.mutable.Map[String,List[Int]]()
      var part_name = "part_" + partid
      part_map(part_name) = List[Int]()
      while(iter.hasNext){
        part_map(part_name) :+= iter.next()
      }
      part_map.iterator
    }
  }.collect.foreach(println)

这个函数是一个transform,传入分区id和每个分区的迭代器,返回一个迭代器
此时这个迭代器就是我们要实现的业务,查询每个分区中的数据。

是否可以从Taskcontext中获取分区?

下面有一个简单粗暴的方法:

rdd.foreach(row=>{
    println("partitionID:"+TaskContext.getPartitionId()+"data:"+row)
})

DF同理

相关文章

  • RDD or DF获取分区编号和数据

    思考: RDD的分区怎么获取?RDD分区怎么根据数据划分分区? 例如: 首先看一下parallelize方法 1....

  • 宽依赖和窄依赖

    窄依赖是指父RDD的每个分区只被子RDD的一个分区所使用,子RDD分区通常对应常数个父RDD分区(O(1),与数据...

  • Spark_day04

    RDD的 Shuffle 和 分区 RDD的分区操作2.Shuffle 的原理 分区的作用 RDD 使用分区来分布...

  • SPARK[RDD之partitions]

    RDD是容错、并行的数据结构,具备分区的属性,这个分区可以是单机上分区也可以是多机上的分区,对于RDD分区的数量涉...

  • Spark中repartition和coalesce的用法

    在Spark的Rdd中,Rdd是分区的。 有时候需要重新设置Rdd的分区数量,比如Rdd的分区中,Rdd分区比较多...

  • spark task partition 并行度 线程

    partition分区 概念 分区概念spark的分区是RDD里的一个概念,RDD为分布式弹性工作集,因为数据量很...

  • Spark中RDD是什么

    一、RDD是什么? RDD是一个弹性可复原的分布式数据集!RDD是一个逻辑概念,一个RDD中有多个分区,一个分区在...

  • Spark的优化

    1.RDD重新分区 针对大量小分区的RDD,使用RDD重分区函数coalesce将小分区合并成大分区;同样当分区数...

  • Spark Shuffle 机制解析

    1. 什么是 Shuffle 当一个父 RDD 分区的数据分散到了多个子 RDD 的分区中时,这时会产生 Shuf...

  • Spark RDD分区策略

    RDD的数据分区策略由Partitioner数据分区器控制,Spark提供两个类型分片函数,如下: Partiti...

网友评论

      本文标题:RDD or DF获取分区编号和数据

      本文链接:https://www.haomeiwen.com/subject/yrrtgqtx.html