美文网首页
RDD or DF获取分区编号和数据

RDD or DF获取分区编号和数据

作者: 灯火gg | 来源:发表于2019-04-18 18:00 被阅读0次

    思考:

    RDD的分区怎么获取?
    RDD分区怎么根据数据划分分区?

    例如:

    val rdd=sc.parallelize(1 to 10,5)
    

    首先看一下parallelize方法

     /** Distribute a local Scala collection to form an RDD.
       *
       * @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
       * to parallelize and before the first action on the RDD, the resultant RDD will reflect the
       * modified collection. Pass a copy of the argument to avoid this.
       * @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
       * RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
       */
      def parallelize[T: ClassTag](
          seq: Seq[T],
          numSlices: Int = defaultParallelism): RDD[T] = withScope {
        assertNotStopped()
        new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
      }
    

    1.分发一个本地Scala集合来形成一个RDD
    2.parallelize是懒加载,如果是可变集合,则在action操作之前都是rdd都是"可变的",建议传递参数副本来避免修改原来集合
    例如以下操作结果

      var data=ListBuffer[Int](1,2,3)
      val rdd=sc.parallelize(data,5)
      data.+=(4)
      import sqlContext.implicits._
      rdd.toDF("count").show()
      结果为:
      +-----+
      |count|
      +-----+
      |    1|
      |    2|
      |    3|
      |    4|
      +-----+
    

    3.接下来会new ParallelCollectionRDD

    RDD中获取分区编号为:

    rdd.partitions.foreach(part=>println(part.index+"分区类型:"+part.getClass))
    

    通过获取分区index发现分区类型为ParallelCollectionPartition

    现在我们看一下ParallelCollectionPartition,然而无果,我们无法在自定义类中使用ParalleCollectionPartition,因为scala的访问修饰符原因。只能在spark中使用。所以要结合spark api 如果有哪位大神可以通过partition获取分区数据可以在评论指出。当然也可以重新自己定义partition这就要修改源码重新编译了。

    结合api发现:

     /**
       * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
       * of the original partition.
       *
       * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
       * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
       */
      def mapPartitionsWithIndex[U: ClassTag](
          f: (Int, Iterator[T]) => Iterator[U],
          preservesPartitioning: Boolean = false): RDD[U] = withScope {
        val cleanedF = sc.clean(f)
        new MapPartitionsRDD(
          this,
          (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
          preservesPartitioning)
      }
    
    

    这个方法中会有index传入具体的f cleanedF(index, iter)

     rdd.mapPartitionsWithIndex{
        (partid,iter)=>{
          var part_map = scala.collection.mutable.Map[String,List[Int]]()
          var part_name = "part_" + partid
          part_map(part_name) = List[Int]()
          while(iter.hasNext){
            part_map(part_name) :+= iter.next()
          }
          part_map.iterator
        }
      }.collect.foreach(println)
    

    这个函数是一个transform,传入分区id和每个分区的迭代器,返回一个迭代器
    此时这个迭代器就是我们要实现的业务,查询每个分区中的数据。

    是否可以从Taskcontext中获取分区?

    下面有一个简单粗暴的方法:

    rdd.foreach(row=>{
        println("partitionID:"+TaskContext.getPartitionId()+"data:"+row)
    })
    

    DF同理

    相关文章

      网友评论

          本文标题:RDD or DF获取分区编号和数据

          本文链接:https://www.haomeiwen.com/subject/yrrtgqtx.html