美文网首页
Spark runJob方法别用

Spark runJob方法别用

作者: 5b9eff0de6a4 | 来源:发表于2020-03-16 10:57 被阅读0次

    学习博主的文章《Spark driver端得到executor返回值的方法》之后有感

    文章中通过阅读count方法的源码,count方法将task的返回值返回到driver端,然后进行聚合,源码如下

    defcount(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

    sparkcontext的runJob方法,Utils.getIteratorSize _这个方法主要是计算每个iterator的元素个数,也即是每个分区的元素个数,返回值就是元素个数:

    /**

    * Counts the number of elements of an iterator using a while loop rather than calling

    * [[scala.collection.Iterator#size]] because it uses a for loop, which is slightly slower

    * in the current version of Scala.

    */

    def getIteratorSize(iterator:Iterator[_]):Long = {

    var count =0L

      while (iterator.hasNext) {

    count +=1L

        iterator.next()

    }

    count

    }

    返回结果为各个分区的元素个数,使用sum方法进行统计。

    博主的文章中使用实际案例代码说明了使用方法,学习后有如下启发,在driver端不止可以获取每个task中的数据量,是否还可以获取具体的变量值呢?做如下测试:


    val rdd = sc.parallelize(1 to 10 ,3)

    import org.apache.spark.TaskContext

    import scala.collection.mutable._

    val funccc = (itr : Iterator[Int]) => {

      val lst = new ListBuffer[Int]

          itr.foreach(each=>{

            lst.append(each)

          })

          (TaskContext.getPartitionId(),lst)

        }

    val res = sc.runJob(rdd,funccc)


    在driver端查看获取的结果

    res(1)._2.foreach(println)

    4

    5

    6

    相关文章

      网友评论

          本文标题:Spark runJob方法别用

          本文链接:https://www.haomeiwen.com/subject/ogwwehtx.html