美文网首页
Spark RDD的处理过程

Spark RDD的处理过程

作者: lsnl8480 | 来源:发表于2016-03-20 22:39 被阅读1123次

    闲来无事,研究一下Spark Rdd的处理过程。
    以一个简单的例子看看:

    val textRDD = sc.textFile("/home/ubuntu/people.txt")
    val filterRDD = textRDD.filter(_.startsWith("123"))
    val mapRDD = filterRDD.map(line => (line, 1))
    val reduceRDD = mapRDD.reduceByKey(_+_)
    reduceRDD.foreach(println)
    

    首先看第一行代码。
    val textRDD = sc.textFile("/home/ubuntu/people.txt")
    看一下textFile的代码:

    def filter(f: T => Boolean): RDD[T] = withScope {
      val cleanF = sc.clean(f)
      new MapPartitionsRDD[T, T](
        this,
        (context, pid, iter) => iter.filter(cleanF),
        preservesPartitioning = true)
    }
    

    可以看到,经过filter,重新生成了一个RDD。

    再看第二行:
    val mapRDD = filterRDD.map(line => (line, 1))
    看一下map的代码:

    def map[U: ClassTag](f: T => U): RDD[U] = withScope {
      val cleanF = sc.clean(f)
      new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))}
    

    同样是生成一个RDD。
    再看第三行:
    val reduceRDD = mapRDD.reduceByKey(_+_)
    这次reduceByKey的代码定位到了PairRDDFunctions,也是重新生成了一个RDD

    def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
      reduceByKey(defaultPartitioner(self), func)
    }
    

    在看最后一行
    reduceRDD.foreach(println)
    代码定位到RDD.scala

    def foreach(f: T => Unit): Unit = withScope {
      val cleanF = sc.clean(f)
      sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
    }
    

    可以看到,最终调用了SparkContext的runJob,看一下runJob的代码:

    def runJob[T, U: ClassTag](
        rdd: RDD[T],
        func: (TaskContext, Iterator[T]) => U,
        partitions: Seq[Int],
        resultHandler: (Int, U) => Unit): Unit = {
      if (stopped.get()) {
        throw new IllegalStateException("SparkContext has been shutdown")
      }
      val callSite = getCallSite
      val cleanedFunc = clean(fund)
     logInfo("Starting job: " + callSite.shortForm)
      if (conf.getBoolean("spark.logLineage", false)) {
        logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
      }
      dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
      progressBar.foreach(_.finishAll())
      rdd.doCheckpoint()}
    

    大致可以看出,最终由DAGScheduler来执行了任务。
    再看看DAGScheduler的runJob

    def runJob[T, U](
        rdd: RDD[T],
        func: (TaskContext, Iterator[T]) => U,
        partitions: Seq[Int],
        callSite: CallSite,
        resultHandler: (Int, U) => Unit,
        properties: Properties): Unit = {
      val start = System.nanoTime
      val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
      // Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`,
      // which causes concurrent SQL executions to fail if a fork-join pool is used. Note that
      // due to idiosyncrasies in Scala, `awaitPermission` is not actually used anywhere so it's
      // safe to pass in null here. For more detail, see SPARK-13747.
      val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
      waiter.completionFuture.ready(Duration.Inf)(awaitPermission)
      waiter.completionFuture.value.get match {
        case scala.util.Success(_) =>
          logInfo("Job %d finished: %s, took %f s".format
            (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        case scala.util.Failure(exception) =>
          logInfo("Job %d failed: %s, took %f s".format
            (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
          // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
          val callerStackTrace = Thread.currentThread().getStackTrace.tail
          exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
          throw exception
      }
    }
    

    可以看到,DAGScheduler.runJob将任务提交后,就等待任务执行完成。
    再看看是如何提交任务的:

    def submitJob[T, U](
        rdd: RDD[T],
        func: (TaskContext, Iterator[T]) => U,
        partitions: Seq[Int],
        callSite: CallSite,
        resultHandler: (Int, U) => Unit,
        properties: Properties): JobWaiter[U] = {
      // Check to make sure we are not launching a task on a partition that does not exist.
      val maxPartitions = rdd.partitions.length
      partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
        throw new IllegalArgumentException(
          "Attempting to access a non-existent partition: " + p + ". " +
            "Total number of partitions: " + maxPartitions)
      }
    
      val jobId = nextJobId.getAndIncrement()
      if (partitions.size == 0) {
        // Return immediately if the job is running 0 tasks
        return new JobWaiter[U](this, jobId, 0, resultHandler)
      }
    
      assert(partitions.size > 0)
      val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
      val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
      eventProcessLoop.post(JobSubmitted(
        jobId, rdd, func2, partitions.toArray, callSite, waiter,
        SerializationUtils.clone(properties)))
      waiter
    }
    

    大致逻辑,找到空闲的partitions,然后将任务提交上去。
    暂时就看到这里,下一篇再做更深入的研究。

    相关文章

      网友评论

          本文标题:Spark RDD的处理过程

          本文链接:https://www.haomeiwen.com/subject/wxlklttx.html