Spark-Core源码精读(6)、SparkContext和S

作者: sun4lower | 来源:发表于2017-03-06 17:18 被阅读452次

    本文我们将详细分析SparkContext的源码。

    我们先来看一下SparkConf:

    SparkConf

    SparkContext实例化的时候需要传进一个SparkConf作为参数,SparkConf描述整个Spark应用程序的配置信息,如果和系统默认的配置冲突就会覆盖系统默认的设置。我们经常会在单元测试的时候使用new SparkConf(fasle)(如果不传入参数,默认是true)实例化SparkConf,这样就不会加载“conf/”下默认的配置,这样无论在什么样的集群环境中运行单元测试,其配置都是一样的,不会随着环境的变化而变化。另外在程序运行的时候不能修改SparkConf,数据结构通过ConcurrentHashMap进行维护,只能通过clone的方式读取Spark的配置信息。最后需要说明的是SparkConf可以进行链式的调用,即:

    new SparkConf().setMaster("local").setAppName("TestApp")
    

    因为这些方法在设置完配置信息后最终都返回了自己,即SparkConf本身,SparkConf的部分源码如下:

    // 用来存储key-value的配置信息
    private val settings = new ConcurrentHashMap[String, String]()
    // 默认会加载“spark.*”格式的配置信息
    if (loadDefaults) {
      // Load any spark.* system properties
      for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
        set(key, value)
      }
    }
    /** Set a configuration variable. */
    def set(key: String, value: String): SparkConf = {
      if (key == null) {
        throw new NullPointerException("null key")
      }
      if (value == null) {
        throw new NullPointerException("null value for " + key)
      }
      logDeprecationWarning(key)
      settings.put(key, value)
      // 这里我们可以清楚的看到,每次进行设置后都会返回SparkConf自身,所以可以进行链式的调用
      this
    }
    

    SparkContext

    SparkContext是整个Spark功能的入口,代表了应用程序与整个集群的连接点,通过SparkContext可以创建RDD、Accumulators和Broadcast。

    Spark应用程序是通过SparkContext发布到Spark集群的,并且Spark程序的运行都是在SparkContext为核心的调度指挥下进行的,SparkContext崩溃或者结束就代表Spark应用程序执行结束,可见SparkContext在Spark中是多么的重要,下面我们结合源码进行详细分析(只选取重要部分):

    // 方便开发人员查看调用的信息
    // The call site where this SparkContext was constructed.
    private val creationSite: CallSite = Utils.getCallSite()
    // 是否允许存在多个SparkContext,默认是false
    // If true, log warnings instead of throwing exceptions when multiple SparkContexts are active
    private val allowMultipleContexts: Boolean =
      config.getBoolean("spark.driver.allowMultipleContexts", false)
    // In order to prevent multiple SparkContexts from being active at the same time, mark this
    // context as having started construction.
    // NOTE: this must be placed at the beginning of the SparkContext constructor.
    SparkContext.markPartiallyConstructed(this, allowMultipleContexts)
    val startTime = System.currentTimeMillis()
    // 判断上下文状态
    private[spark] val stopped: AtomicBoolean = new AtomicBoolean(false)
    
    ...
    
    // 消息通信相关的内容,我们会单独进行说明
    // An asynchronous listener bus for Spark events
    private[spark] val listenerBus = new LiveListenerBus
    
    ...
    
    // 追踪所有执行持久化的RDD
    // Keeps track of all persisted RDDs
    private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]]
    
    ...
    
    // 传递给executors的环境变量信息
    // Environment variables to pass to our executors.
    private[spark] val executorEnvs = HashMap[String, String]()
    // 配置SPARK_USER
    // Set SPARK_USER for user who is running SparkContext.
    val sparkUser = Utils.getCurrentUserName()
    

    同时还有几个重载的构造方法,我们不进行一一说明,下面我们来看SparkContext中最重要的一个部分,即try里面的内容(大部分初始化的工作都在这里面,因为内容较多,大家可以根据具体的功能点进行查看):

    try {
      // 读取SparkConf的信息,即Spark的配置信息,并检查是否有非法的配置信息
      _conf = config.clone()
      _conf.validateSettings()
      // 判断是否配置了Master,没有的话抛出异常
      if (!_conf.contains("spark.master")) {
        throw new SparkException("A master URL must be set in your configuration")
      }
      // 判断是否配置了AppName,没有的话抛出异常
      if (!_conf.contains("spark.app.name")) {
        throw new SparkException("An application name must be set in your configuration")
      }
      // Yarn模式下的判断
      // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
      // yarn-standalone is deprecated, but still supported
      if ((master == "yarn-cluster" || master == "yarn-standalone") &&
          !_conf.contains("spark.yarn.app.id")) {
        throw new SparkException("Detected yarn-cluster mode, but isn't running on a cluster. " +
          "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
      }
      if (_conf.getBoolean("spark.logConf", false)) {
        logInfo("Spark configuration:\n" + _conf.toDebugString)
      }
      
      // 设置Driver的host和port
      // Set Spark driver host and port system properties
      _conf.setIfMissing("spark.driver.host", Utils.localHostName())
      _conf.setIfMissing("spark.driver.port", "0")
      _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
      _jars = _conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
      _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0))
        .toSeq.flatten
      _eventLogDir =
        if (isEventLogEnabled) {
          // 这里需要进行设置,否则默认路径是“/tmp/spark-events”,防止系统自动清除
          val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
            .stripSuffix("/")
          Some(Utils.resolveURI(unresolvedDir))
        } else {
          None
        }
      _eventLogCodec = {
        val compress = _conf.getBoolean("spark.eventLog.compress", false)
        if (compress && isEventLogEnabled) {
          Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
        } else {
          None
        }
      }
      _conf.set("spark.externalBlockStore.folderName", externalBlockStoreFolderName)
      if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
      // "_jobProgressListener" should be set up before creating SparkEnv because when creating
      // "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them.
      _jobProgressListener = new JobProgressListener(_conf)
      // 关于ListenerBus我们会单独分析
      listenerBus.addListener(jobProgressListener)
      // Create the Spark execution environment (cache, map output tracker, etc)
      // 创建SparkEnv
      _env = createSparkEnv(_conf, isLocal, listenerBus)
      SparkEnv.set(_env)
      // 元数据清理器
      _metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf)
      // 监控job和Stage的执行
      _statusTracker = new SparkStatusTracker(this)
      // 显示Stage的执行进度,从statusTracker定期拉取活动状态的Stages的进度,将在Stage至少运行500ms后显示,如果多个Stage在同一时间执行,则它们的状态将会合并到一起,在一行中显示,每200ms更新一次
      _progressBar =
        if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
          Some(new ConsoleProgressBar(this))
        } else {
          None
        }
      // Spark UI部分
      _ui =
        if (conf.getBoolean("spark.ui.enabled", true)) {
          Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
            _env.securityManager, appName, startTime = startTime))
        } else {
          // For tests, do not enable the UI
          None
        }
      // Bind the UI before starting the task scheduler to communicate
      // the bound port to the cluster manager properly
      _ui.foreach(_.bind())
      _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
      // 添加Jar包的依赖
      // Add each JAR given through the constructor
      if (jars != null) {
        jars.foreach(addJar)
      }
      if (files != null) {
        files.foreach(addFile)
      }
      
      // 获取executor的内存配置信息,如果没有设置,默认就是1G
      _executorMemory = _conf.getOption("spark.executor.memory")
        .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
        .orElse(Option(System.getenv("SPARK_MEM"))
        .map(warnSparkMem))
        .map(Utils.memoryStringToMb)
        .getOrElse(1024)
      // Convert java options to env vars as a work around
      // since we can't set env vars directly in sbt.
      for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
        value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
        executorEnvs(envKey) = value
      }
      Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
        executorEnvs("SPARK_PREPEND_CLASSES") = v
      }
      // Mesos相关的配置
      // The Mesos scheduler backend relies on this environment variable to set executor memory.
      // TODO: Set this only in the Mesos scheduler.
      executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
      executorEnvs ++= _conf.getExecutorEnv
      executorEnvs("SPARK_USER") = sparkUser
      // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
      // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
      _heartbeatReceiver = env.rpcEnv.setupEndpoint(
        HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
      // 下面就是SparkContext中最重要的部分,即创建一系列调度器
      // Create and start the scheduler
      val (sched, ts) = SparkContext.createTaskScheduler(this, master)
      _schedulerBackend = sched
      _taskScheduler = ts
      _dagScheduler = new DAGScheduler(this)
      _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
      // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
      // constructor
      _taskScheduler.start()
      // 初始化应用程序Id信息
      _applicationId = _taskScheduler.applicationId()
      _applicationAttemptId = taskScheduler.applicationAttemptId()
      _conf.set("spark.app.id", _applicationId)
      _ui.foreach(_.setAppId(_applicationId))
      _env.blockManager.initialize(_applicationId)
      // 统计系统相关内容
      // The metrics system for Driver need to be set spark.app.id to app ID.
      // So it should start after we get app ID from the task scheduler and set spark.app.id.
      metricsSystem.start()
      // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
      metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
      _eventLogger =
        if (isEventLogEnabled) {
          val logger =
            new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
              _conf, _hadoopConfiguration)
          logger.start()
          listenerBus.addListener(logger)
          Some(logger)
        } else {
          None
        }
      // Optionally scale number of executors dynamically based on workload. Exposed for testing.
      val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
      if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
        logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.")
      }
      _executorAllocationManager =
        if (dynamicAllocationEnabled) {
          Some(new ExecutorAllocationManager(this, listenerBus, _conf))
        } else {
          None
        }
      _executorAllocationManager.foreach(_.start())
      _cleaner =
        if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
          Some(new ContextCleaner(this))
        } else {
          None
        }
      _cleaner.foreach(_.start())
      setupAndStartListenerBus()
      postEnvironmentUpdate()
      postApplicationStart()
      // Post init
      _taskScheduler.postStartHook()
      _env.metricsSystem.registerSource(_dagScheduler.metricsSource)
      _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
      _executorAllocationManager.foreach { e =>
        _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
      }
      // Make sure the context is stopped if the user forgets about it. This avoids leaving
      // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
      // is killed, though.
      _shutdownHookRef = ShutdownHookManager.addShutdownHook(
        ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
        logInfo("Invoking stop() from shutdown hook")
        stop()
      }
    } catch {
      case NonFatal(e) =>
        logError("Error initializing SparkContext.", e)
        try {
          stop()
        } catch {
          case NonFatal(inner) =>
            logError("Error stopping SparkContext after init error.", inner)
        } finally {
          throw e
        }
    }
    

    其实SparkContext中最主要的三大核心对象就是DAGScheduler、TaskScheduler、SchedulerBackend,我们会在接下来的文章中详细进行分析。

    SparkEnv

    下面我们再来分析一下SparkEnv,SparkEnv保存了一个正在运行的Spark实例(Master或者Worker)的运行时环境信息,包括序列化(serializer)、Akka的actor system(虽然1.6.x默认使用的是Netty但是有一些历史遗留代码,spark2.x开始已经不在依赖Akka)、BlockManager、MapOutPutTracker(Shuffle过程中非常重要)等,跟SparkContext一样,这些具体的功能点我们会用单独的文章分别进行说明,现在我们简单过滤一下SparkEnv的源码:

    先来看createDriverEnv和createExecutorEnv,顾名思义,这两个方法就是分别创建Driver和Executor的运行时环境。

    /**
     * Create a SparkEnv for the driver.
     */
    private[spark] def createDriverEnv(
        conf: SparkConf,
        isLocal: Boolean,
        listenerBus: LiveListenerBus,
        numCores: Int,
        mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
      assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!")
      assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
      val hostname = conf.get("spark.driver.host")
      val port = conf.get("spark.driver.port").toInt
      create(
        conf,
        SparkContext.DRIVER_IDENTIFIER,
        hostname,
        port,
        isDriver = true,
        isLocal = isLocal,
        numUsableCores = numCores,
        listenerBus = listenerBus,
        mockOutputCommitCoordinator = mockOutputCommitCoordinator
      )
    }
    /**
     * Create a SparkEnv for an executor.
     * In coarse-grained mode, the executor provides an actor system that is already instantiated.
     */
    private[spark] def createExecutorEnv(
        conf: SparkConf,
        executorId: String,
        hostname: String,
        port: Int,
        numCores: Int,
        isLocal: Boolean): SparkEnv = {
      val env = create(
        conf,
        executorId,
        hostname,
        port,
        isDriver = false,
        isLocal = isLocal,
        numUsableCores = numCores
      )
      SparkEnv.set(env)
      env
    }
    

    可以看到这两个方法最终都调用了create()方法:

    private def create(
        conf: SparkConf,
        executorId: String,
        hostname: String,
        port: Int,
        isDriver: Boolean,
        isLocal: Boolean,
        numUsableCores: Int,
        listenerBus: LiveListenerBus = null,
        mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
      // Listener bus仅使用在Driver上,所以要进行判断
      // Listener bus is only used on the driver
      if (isDriver) {
        assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")
      }
      // 实例化SecurityManager
      val securityManager = new SecurityManager(conf)
      // 这里是Spark1.6.x中遗留的一部分关于Akka的代码,spark2.x中已经完全移除
      // Create the ActorSystem for Akka and get the port it binds to.
      val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
      // 创建rpcEnv
      val rpcEnv = RpcEnv.create(actorSystemName, hostname, port, conf, securityManager,
        clientMode = !isDriver)
      val actorSystem: ActorSystem =
        if (rpcEnv.isInstanceOf[AkkaRpcEnv]) {
          rpcEnv.y[AkkaRpcEnv].actorSystem
        } else {
          val actorSystemPort =
            if (port == 0 || rpcEnv.address == null) {
              port
            } else {
              rpcEnv.address.port + 1
            }
          // Create a ActorSystem for legacy codes
          AkkaUtils.createActorSystem(
            actorSystemName + "ActorSystem",
            hostname,
            actorSystemPort,
            conf,
            securityManager
          )._1
        }
      // Figure out which port Akka actually bound to in case the original port is 0 or occupied.
      // In the non-driver case, the RPC env's address may be null since it may not be listening
      // for incoming connections.
      // 设置Driver或者Executor的端口号
      if (isDriver) {
        conf.set("spark.driver.port", rpcEnv.address.port.toString)
      } else if (rpcEnv.address != null) {
        conf.set("spark.executor.port", rpcEnv.address.port.toString)
      }
      
      // 根据给定的类的名字进行实例化操作
      // Create an instance of the class with the given name, possibly initializing it with our conf
      def instantiateClass[T](className: String): T = {
        val cls = Utils.classForName(className)
        // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
        // SparkConf, then one taking no arguments
        try {
          cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
            .newInstance(conf, new java.lang.Boolean(isDriver))
            .asInstanceOf[T]
        } catch {
          case _: NoSuchMethodException =>
            try {
              cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
            } catch {
              case _: NoSuchMethodException =>
                cls.getConstructor().newInstance().asInstanceOf[T]
            }
        }
      }
      // Create an instance of the class named by the given SparkConf property, or defaultClassName
      // if the property is not set, possibly initializing it with our conf
      def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
        instantiateClass[T](conf.get(propertyName, defaultClassName))
      }
      
      // 设置序列化器,可以看到默认使用的是Java的序列化器
      val serializer = instantiateClassFromConf[Serializer](
        "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
      logDebug(s"Using serializer: ${serializer.getClass}")
      val closureSerializer = instantiateClassFromConf[Serializer](
        "spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")
      def registerOrLookupEndpoint(
          name: String, endpointCreator: => RpcEndpoint):
        RpcEndpointRef = {
        if (isDriver) {
          logInfo("Registering " + name)
          rpcEnv.setupEndpoint(name, endpointCreator)
        } else {
          RpcUtils.makeDriverRef(name, conf, rpcEnv)
        }
      }
      
      // 如果是Driver实例化MapOutputTrackerMaster,如果是Executor实例化MapOutputTrackerWorker,在Shuffle时会详细说明
      // 说明MapOutputTracker也是Master/Slaves的结构
      val mapOutputTracker = if (isDriver) {
        new MapOutputTrackerMaster(conf)
      } else {
        new MapOutputTrackerWorker(conf)
      }
      
      // 实例化向MapOutputTrackerMasterEndpoint并向MapOutputTracker注册
      // Have to assign trackerActor after initialization as MapOutputTrackerActor
      // requires the MapOutputTracker itself
      mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
        new MapOutputTrackerMasterEndpoint(
          rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
      // Shuffle的配置信息,默认使用的是SortShuffleManager
      // Let the user specify short names for shuffle managers
      val shortShuffleMgrNames = Map(
        "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
        "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
        "tungsten-sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
      val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
      val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
      // 实例化ShuffleManager,默认是SortShuffleManager
      val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
      // 是否使用原始的MemoryManager,即StaticMemoryManager
      val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
      // 默认使用的是UnifiedMemoryManager内存管理器
      val memoryManager: MemoryManager =
        if (useLegacyMemoryManager) {
          new StaticMemoryManager(conf, numUsableCores)
        } else {
          UnifiedMemoryManager(conf, numUsableCores)
        }
      // 实例化BlockTransferService,默认的实现方式是Netty,即NettyBlockTransferService
      val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)
      // 下面是BlockManager相关的初始化过程,我们会用单独的文章说明
      val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
        BlockManagerMaster.DRIVER_ENDPOINT_NAME,
        new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
        conf, isDriver)
      // NB: blockManager is not valid until initialize() is called later.
      val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
        serializer, conf, memoryManager, mapOutputTracker, shuffleManager,
        blockTransferService, securityManager, numUsableCores)
      // 实例化BroadcastManager
      val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
      // 实例化cacheManager
      val cacheManager = new CacheManager(blockManager)
      // 统计系统
      val metricsSystem = if (isDriver) {
        // Don't start metrics system right now for Driver.
        // We need to wait for the task scheduler to give us an app ID.
        // Then we can start the metrics system.
        MetricsSystem.createMetricsSystem("driver", conf, securityManager)
      } else {
        // We need to set the executor ID before the MetricsSystem is created because sources and
        // sinks specified in the metrics configuration file will want to incorporate this executor's
        // ID into the metrics they report.
        conf.set("spark.executor.id", executorId)
        val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
        ms.start()
        ms
      }
      
      // 设置sparkFiles的存储目录,用来下载依赖。local模式下是一个临时的目录,分布式模式下是Executor的工作目录
      // Set the sparkFiles directory, used when downloading dependencies.  In local mode,
      // this is a temporary directory; in distributed mode, this is the executor's current working
      // directory.
      val sparkFilesDir: String = if (isDriver) {
        Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
      } else {
        "."
      }
      
      // outputCommitCoordinator相关的初始化及注册部分
      val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
        new OutputCommitCoordinator(conf, isDriver)
      }
      val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
        new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
      outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
      val envInstance = new SparkEnv(
        executorId,
        rpcEnv,
        actorSystem,
        serializer,
        closureSerializer,
        cacheManager,
        mapOutputTracker,
        shuffleManager,
        broadcastManager,
        blockTransferService,
        blockManager,
        securityManager,
        sparkFilesDir,
        metricsSystem,
        memoryManager,
        outputCommitCoordinator,
        conf)
      // Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
      // called, and we only need to do it for driver. Because driver may run as a service, and if we
      // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
      if (isDriver) {
        envInstance.driverTmpDirToDelete = Some(sparkFilesDir)
      }
      envInstance
    }
    

    本文只是带领大家浏览了一下SparkContext和SparkEnv的源码,具体的模块会进行单独的分析。

    本文为原创,欢迎转载,转载请注明出处、作者,谢谢!

    相关文章

      网友评论

        本文标题:Spark-Core源码精读(6)、SparkContext和S

        本文链接:https://www.haomeiwen.com/subject/bjeugttx.html