美文网首页
SparkContext原理与源码剖析

SparkContext原理与源码剖析

作者: SunnyMore | 来源:发表于2018-05-31 00:01 被阅读21次

    1. SparkContext原理

    SparkContext原理.png

    2. SparkContext源码剖析

    SparkContext是再Driver端创建,除了和Master通信,进行资源的申请、任务的分配和监控等以外还会再创建的时候初始化各个核心组件,包括DAGScheduler,TaskScheduler,SparkEnv,SparkUI等。

    /**
     * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
     * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
     *
     * Only one SparkContext may be active per JVM.  You must `stop()` the active SparkContext before
     * creating a new one.  This limitation may eventually be removed; see SPARK-2243 for more details.
     * 目前一个jvm只能存在一个SparkContext,未来可能会支持 可以看看https://issues.apache.org/jira/browse/SPARK-2243的讨论
     * @param config a Spark Config object describing the application configuration. Any settings in
     *   this config overrides the default configs as well as system properties.
     */
    class SparkContext(config: SparkConf) extends Logging {
    
      // The call site where this SparkContext was constructed.
      // 获取当前SparkContext的当前调用堆栈,将栈里最靠近栈底的属于spark或者Scala核心的类压入callStack的栈顶,
      // 并将此类的方法存入lastSparkMethod;将栈里最靠近栈顶的用户类放入callStack,将此类的行号存入firstUserLine,
      // 类名存入firstUserFile,最终返回的样例类CallSite存储了最短栈和长度默认为20的最长栈的样例类 
      private val creationSite: CallSite = Utils.getCallSite()
    
      // If true, log warnings instead of throwing exceptions when multiple SparkContexts are active
      private val allowMultipleContexts: Boolean =
        config.getBoolean("spark.driver.allowMultipleContexts", false)
    
    

    接着是定义成员变量,配置信息的获取与设置

     /* ------------------------------------------------------------------------------------- *
       | Private variables. These variables keep the internal state of the context, and are    |
       | not accessible by the outside world. They're mutable since we want to initialize all  |
       | of them to some neutral value ahead of time, so that calling "stop()" while the       |
       | constructor is still running is safe.                                                 |
       * ------------------------------------------------------------------------------------- */
    
      private var _conf: SparkConf = _
      private var _eventLogDir: Option[URI] = None
      private var _eventLogCodec: Option[String] = None
      private var _listenerBus: LiveListenerBus = _
      private var _env: SparkEnv = _
      private var _statusTracker: SparkStatusTracker = _
      private var _progressBar: Option[ConsoleProgressBar] = None
      private var _ui: Option[SparkUI] = None
      private var _hadoopConfiguration: Configuration = _
      private var _executorMemory: Int = _
      private var _schedulerBackend: SchedulerBackend = _
      private var _taskScheduler: TaskScheduler = _
      private var _heartbeatReceiver: RpcEndpointRef = _
      @volatile private var _dagScheduler: DAGScheduler = _
      private var _applicationId: String = _
      private var _applicationAttemptId: Option[String] = None
      private var _eventLogger: Option[EventLoggingListener] = None
      private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
      private var _cleaner: Option[ContextCleaner] = None
      private var _listenerBusStarted: Boolean = false
      private var _jars: Seq[String] = _
      private var _files: Seq[String] = _
      private var _shutdownHookRef: AnyRef = _
      private var _statusStore: AppStatusStore = _
    
      /* ------------------------------------------------------------------------------------- *
       | Accessors and public fields. These provide access to the internal state of the        |
       | context.                                                                              |
       * ------------------------------------------------------------------------------------- */
    
      private[spark] def conf: SparkConf = _conf
    
      /**
       * Return a copy of this SparkContext's configuration. The configuration ''cannot'' be
       * changed at runtime.
       * 运行时,配置信息不允许修改
       */
      def getConf: SparkConf = conf.clone()
    
      def jars: Seq[String] = _jars
      def files: Seq[String] = _files
      def master: String = _conf.get("spark.master")
      def deployMode: String = _conf.getOption("spark.submit.deployMode").getOrElse("client")
      def appName: String = _conf.get("spark.app.name")
    
      private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false)
      private[spark] def eventLogDir: Option[URI] = _eventLogDir
      private[spark] def eventLogCodec: Option[String] = _eventLogCodec
    
      // 是否本地运行
      def isLocal: Boolean = Utils.isLocalMaster(_conf)
    

    然后比较重要的是事件监听

     /**
       * @return true if context is stopped or in the midst of stopping.
       */
      def isStopped: Boolean = stopped.get()
    
      private[spark] def statusStore: AppStatusStore = _statusStore
    
      // An asynchronous listener bus for Spark events
      // listenerBus里已经注册了很多监听者(listener),通常listenerBus会启动一个线程异步的调用
      // 这些listener去消费这个Event (其实就是触发事先设计好的回调函数来执行譬如信息存储等动作)
      private[spark] def listenerBus: LiveListenerBus = _listenerBus
    

    然后创建Env

     // This function allows components created by SparkEnv to be mocked in unit tests:
      private[spark] def createSparkEnv(
          conf: SparkConf,
          isLocal: Boolean,
          listenerBus: LiveListenerBus): SparkEnv = {
        SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
      }
    
      private[spark] def env: SparkEnv = _env
    
    

    然后是低级别状态报告API,负责监听job和stage的进度

    // Used to store a URL for each static file/jar together with the file's local timestamp
      private[spark] val addedFiles = new ConcurrentHashMap[String, Long]().asScala
      private[spark] val addedJars = new ConcurrentHashMap[String, Long]().asScala
    
      // Keeps track of all persisted RDDs
      private[spark] val persistentRdds = {
        val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]()
        map.asScala
      }
      def statusTracker: SparkStatusTracker = _statusTracker
    
      private[spark] def progressBar: Option[ConsoleProgressBar] = _progressBar
    

    接着是进度条,ui,hadoop conf,executor memory等配置

    private[spark] def ui: Option[SparkUI] = _ui
    
      def uiWebUrl: Option[String] = _ui.map(_.webUrl)
    
      /**
       * A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse.
       *
       * @note As it will be reused in all Hadoop RDDs, it's better not to modify it unless you
       * plan to set some global configurations for all Hadoop RDDs.
       */
      def hadoopConfiguration: Configuration = _hadoopConfiguration
    
      private[spark] def executorMemory: Int = _executorMemory
    
      // Environment variables to pass to our executors.
      private[spark] val executorEnvs = HashMap[String, String]()
    
      // Set SPARK_USER for user who is running SparkContext.
      val sparkUser = Utils.getCurrentUserName()
    

    然后是最重要的TaskScheduler 和 DAGScheduler

      private[spark] def schedulerBackend: SchedulerBackend = _schedulerBackend
    
      private[spark] def taskScheduler: TaskScheduler = _taskScheduler
      private[spark] def taskScheduler_=(ts: TaskScheduler): Unit = {
        _taskScheduler = ts
      }
    
      private[spark] def dagScheduler: DAGScheduler = _dagScheduler
      private[spark] def dagScheduler_=(ds: DAGScheduler): Unit = {
        _dagScheduler = ds
      }
    

    SparkContext最重要的功能就是创建了TaskScheduler、DAGSchedule和SparkUI(4040),这里重点讲解TaskScheduler的初始化;

     // Create and start the scheduler
        val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
        _schedulerBackend = sched
        _taskScheduler = ts
        _dagScheduler = new DAGScheduler(this)
        _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
    
        // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
        // constructor
        _taskScheduler.start()
    

    这里可以看到SparkContext先创建TaskScheduler,接着创建DAGSchedule,最后调用TaskScheduler的start方法启动。

    TaskScheduler初始化

      1. 首先调用createTaskScheduler方法,该方法会根据应用程序的提交模式提供不同的初始化程序(我们这里分析standalone模式),首先创建TaskSchedulerImpl(就是我们所说的TaskScheduler,底层主要基于SparkDeploySchedulerBackend来工作)和SparkDeploySchedulerBackend(在底层接收TaskSchedulerImpl的控制,实际上负责与Master的注册、Executor的反注册,task发送到Executor等操作)。
    /**
       * Create a task scheduler based on a given master URL.
       * Return a 2-tuple of the scheduler backend and the task scheduler.
       */
      private def createTaskScheduler(
          sc: SparkContext,
          master: String,
          deployMode: String): (SchedulerBackend, TaskScheduler) = {
        import SparkMasterRegex._
    
        // When running locally, don't try to re-execute tasks on failure.
        val MAX_LOCAL_TASK_FAILURES = 1
    
        master match {
          case "local" =>
            val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
            val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
            scheduler.initialize(backend)
            (backend, scheduler)
    
          case LOCAL_N_REGEX(threads) =>
            def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
            // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
            val threadCount = if (threads == "*") localCpuCount else threads.toInt
            if (threadCount <= 0) {
              throw new SparkException(s"Asked to run locally with $threadCount threads")
            }
            val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
            val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
            scheduler.initialize(backend)
            (backend, scheduler)
    
          case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
            def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
            // local[*, M] means the number of cores on the computer with M failures
            // local[N, M] means exactly N threads with M failures
            val threadCount = if (threads == "*") localCpuCount else threads.toInt
            val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
            val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
            scheduler.initialize(backend)
            (backend, scheduler)
    
          case SPARK_REGEX(sparkUrl) =>
            val scheduler = new TaskSchedulerImpl(sc)
            val masterUrls = sparkUrl.split(",").map("spark://" + _)
            val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
            scheduler.initialize(backend)
            (backend, scheduler)
    
          case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
            // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
            val memoryPerSlaveInt = memoryPerSlave.toInt
            if (sc.executorMemory > memoryPerSlaveInt) {
              throw new SparkException(
                "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
                  memoryPerSlaveInt, sc.executorMemory))
            }
    
            val scheduler = new TaskSchedulerImpl(sc)
            val localCluster = new LocalSparkCluster(
              numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
            val masterUrls = localCluster.start()
            val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
            scheduler.initialize(backend)
            backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
              localCluster.stop()
            }
            (backend, scheduler)
    
          case masterUrl =>
            val cm = getClusterManager(masterUrl) match {
              case Some(clusterMgr) => clusterMgr
              case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
            }
            try {
              val scheduler = cm.createTaskScheduler(sc, masterUrl)
              val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
              cm.initialize(scheduler, backend)
              (backend, scheduler)
            } catch {
              case se: SparkException => throw se
              case NonFatal(e) =>
                throw new SparkException("External scheduler cannot be instantiated", e)
            }
        }
      }
    

    TaskSchedulerImpl的官方简介

    /**
     * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
     * It can also work with a local setup by using a `LocalSchedulerBackend` and setting
     * isLocal to true. It handles common logic, like determining a scheduling order across jobs, waking
     * up to launch speculative tasks, etc.
     *
     * Clients should first call initialize() and start(), then submit task sets through the
     * runTasks method.
     *
     * THREADING: [[SchedulerBackend]]s and task-submitting clients can call this class from multiple
     * threads, so it needs locks in public API methods to maintain its state. In addition, some
     * [[SchedulerBackend]]s synchronize on themselves when they want to send events here, and then
     * acquire a lock on us, so we need to make sure that we don't try to lock the backend while
     * we are holding a lock on ourselves.
     */
    private[spark] class TaskSchedulerImpl(
        val sc: SparkContext,
        val maxTaskFailures: Int,
        isLocal: Boolean = false)
      extends TaskScheduler with Logging {
    
      1. 接着,TaskSchedulerImpl执行其init方法,创建SchedulePool调度池,它有不同的优先策略(比如FIFO)。
    def initialize(backend: SchedulerBackend) {
        this.backend = backend
        schedulableBuilder = {
          schedulingMode match {
            case SchedulingMode.FIFO =>
              new FIFOSchedulableBuilder(rootPool)
            case SchedulingMode.FAIR =>
              new FairSchedulableBuilder(rootPool, conf)
            case _ =>
              throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
              s"$schedulingMode")
          }
        }
        schedulableBuilder.buildPools()
      }
    
      1. 然后返回TaskSchedulerImpl调用其start方法,该start方法中会调用StandaloneSchedulerBackend的start方法。
    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
        // constructor
        _taskScheduler.start()
    
      override def start() {
        backend.start()
    
        if (!isLocal && conf.getBoolean("spark.speculation", false)) {
          logInfo("Starting speculative execution thread")
          speculationScheduler.scheduleWithFixedDelay(new Runnable {
            override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
              checkSpeculatableTasks()
            }
          }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
        }
      }
    
      1. 在StandaloneSchedulerBackend的start方法中,会创建StandaloneAppClient对象,该对象的start方法又会启动ClientEndpoint线程,该线程会去调用一系列方法registerWithMaster() -> tryRegisterAllMasters(),最终tryRegisterAllMasters()方法会向所有Master发送RegisterApplication(是case class,里面封装了Application的信息)进行Application的注册。
        StandaloneSchedulerBackend.scala
     override def start() {
        super.start()
    
        // SPARK-21159. The scheduler backend should only try to connect to the launcher when in client
        // mode. In cluster mode, the code that submits the application to the Master needs to connect
        // to the launcher instead.
        if (sc.deployMode == "client") {
          launcherBackend.connect()
        }
    
        // The endpoint for executors to talk to us
        val driverUrl = RpcEndpointAddress(
          sc.conf.get("spark.driver.host"),
          sc.conf.get("spark.driver.port").toInt,
          CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
        val args = Seq(
          "--driver-url", driverUrl,
          "--executor-id", "{{EXECUTOR_ID}}",
          "--hostname", "{{HOSTNAME}}",
          "--cores", "{{CORES}}",
          "--app-id", "{{APP_ID}}",
          "--worker-url", "{{WORKER_URL}}")
        val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
          .map(Utils.splitCommandString).getOrElse(Seq.empty)
        val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
          .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
        val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
          .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
    
        // When testing, expose the parent class path to the child. This is processed by
        // compute-classpath.{cmd,sh} and makes all needed jars available to child processes
        // when the assembly is built with the "*-provided" profiles enabled.
        val testingClassPath =
          if (sys.props.contains("spark.testing")) {
            sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
          } else {
            Nil
          }
    
        // Start executors with a few necessary configs for registering with the scheduler
        val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
        val javaOpts = sparkJavaOpts ++ extraJavaOpts
        val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
          args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
        val webUrl = sc.ui.map(_.webUrl).getOrElse("")
        val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
        // If we're using dynamic allocation, set our initial executor limit to 0 for now.
        // ExecutorAllocationManager will send the real initial limit to the Master later.
        val initialExecutorLimit =
          if (Utils.isDynamicAllocationEnabled(conf)) {
            Some(0)
          } else {
            None
          }
        val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
          webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
        client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
        client.start()
        launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
        waitForRegistration()
        launcherBackend.setState(SparkAppHandle.State.RUNNING)
      }
    

    StandaloneAppClient.scala

     def start() {
        // Just launch an rpcEndpoint; it will call back into the listener.
        endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
      }
    
     /**
         *  Register with all masters asynchronously and returns an array `Future`s for cancellation.
         */
        private def tryRegisterAllMasters(): Array[JFuture[_]] = {
          for (masterAddress <- masterRpcAddresses) yield {
            registerMasterThreadPool.submit(new Runnable {
              override def run(): Unit = try {
                if (registered.get) {
                  return
                }
                logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
                val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
                masterRef.send(RegisterApplication(appDescription, self))
              } catch {
                case ie: InterruptedException => // Cancelled
                case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
              }
            })
          }
        }
    
        /**
         * Register with all masters asynchronously. It will call `registerWithMaster` every
         * REGISTRATION_TIMEOUT_SECONDS seconds until exceeding REGISTRATION_RETRIES times.
         * Once we connect to a master successfully, all scheduling work and Futures will be cancelled.
         *
         * nthRetry means this is the nth attempt to register with master.
         */
        private def registerWithMaster(nthRetry: Int) {
          registerMasterFutures.set(tryRegisterAllMasters())
          registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
            override def run(): Unit = {
              if (registered.get) {
                registerMasterFutures.get.foreach(_.cancel(true))
                registerMasterThreadPool.shutdownNow()
              } else if (nthRetry >= REGISTRATION_RETRIES) {
                markDead("All masters are unresponsive! Giving up.")
              } else {
                registerMasterFutures.get.foreach(_.cancel(true))
                registerWithMaster(nthRetry + 1)
              }
            }
          }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
        }
    
    
      1. Spark Master接收到该Application的注册后,会为其分配资源,随后通知相关的Worker为该Application启动相对应的Executor。
      1. 所有Executor启动后会反向注册到StandaloneSchedulerBackend(这样Driver中的TaskSchedule就知道哪些Executor为其运行Application了)。

    相关文章

      网友评论

          本文标题:SparkContext原理与源码剖析

          本文链接:https://www.haomeiwen.com/subject/bnotsftx.html