美文网首页
Spark源码:初始化SparkContext

Spark源码:初始化SparkContext

作者: Jorvi | 来源:发表于2019-12-17 15:38 被阅读0次

    源码目录


    提交 Application 到 Spark,创建启动 Driver,在 Driver 内开始执行自己的应用程序代码。

    1 程序入口

        var conf: SparkConf = new SparkConf().setAppName("SparkJob_Demo").setMaster("local[*]")
        val sparkContext: SparkContext = new SparkContext(conf)
    
        sparkContext.parallelize(List("aaa", "bbb", "ccc", "ddd"), 2)
          .repartition(4)
          .collect()
    
    

    2 进入源码

    • 进入org.apache.spark.SparkContext.scala
    class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
    
      // 省略部分内容
    
      private var _conf: SparkConf = _
      private var _eventLogDir: Option[URI] = None
      private var _eventLogCodec: Option[String] = None
      private var _env: SparkEnv = _
      private var _jobProgressListener: JobProgressListener = _
      private var _statusTracker: SparkStatusTracker = _
      private var _progressBar: Option[ConsoleProgressBar] = None
      private var _ui: Option[SparkUI] = None
      private var _hadoopConfiguration: Configuration = _
      private var _executorMemory: Int = _
      private var _schedulerBackend: SchedulerBackend = _
      private var _taskScheduler: TaskScheduler = _
      private var _heartbeatReceiver: RpcEndpointRef = _
      @volatile private var _dagScheduler: DAGScheduler = _
      private var _applicationId: String = _
      private var _applicationAttemptId: Option[String] = None
      private var _eventLogger: Option[EventLoggingListener] = None
      private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
      private var _cleaner: Option[ContextCleaner] = None
      private var _listenerBusStarted: Boolean = false
      private var _jars: Seq[String] = _
      private var _files: Seq[String] = _
      private var _shutdownHookRef: AnyRef = _
    
    
      try {
        _conf = config.clone()
        _conf.validateSettings()
    
        // Create the Spark execution environment (cache, map output tracker, etc)
        _env = createSparkEnv(_conf, isLocal, listenerBus)
        SparkEnv.set(_env)
    
        _statusTracker = new SparkStatusTracker(this)
    
        _progressBar =
          if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
            Some(new ConsoleProgressBar(this))
          } else {
            None
          }
    
        _ui =
          if (conf.getBoolean("spark.ui.enabled", true)) {
            Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
              _env.securityManager, appName, startTime = startTime))
          } else {
            // For tests, do not enable the UI
            None
          }
        // Bind the UI before starting the task scheduler to communicate
        // the bound port to the cluster manager properly
        _ui.foreach(_.bind())
    
        _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
    
        _executorMemory = _conf.getOption("spark.executor.memory")
          .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
          .orElse(Option(System.getenv("SPARK_MEM"))
          .map(warnSparkMem))
          .map(Utils.memoryStringToMb)
          .getOrElse(1024)
    
    
        // The Mesos scheduler backend relies on this environment variable to set executor memory.
        // TODO: Set this only in the Mesos scheduler.
        executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
        executorEnvs ++= _conf.getExecutorEnv
        executorEnvs("SPARK_USER") = sparkUser
    
        // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
        // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
        _heartbeatReceiver = env.rpcEnv.setupEndpoint(
          HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
    
        // Create and start the scheduler
        val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
        _schedulerBackend = sched
        _taskScheduler = ts
        _dagScheduler = new DAGScheduler(this)
        _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
    
        // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
        // constructor
        _taskScheduler.start()
    
        _applicationId = _taskScheduler.applicationId()
        _applicationAttemptId = taskScheduler.applicationAttemptId()
        _conf.set("spark.app.id", _applicationId)
        _ui.foreach(_.setAppId(_applicationId))
        _env.blockManager.initialize(_applicationId)
    
        // Optionally scale number of executors dynamically based on workload. Exposed for testing.
        val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
        _executorAllocationManager =
          if (dynamicAllocationEnabled) {
            Some(new ExecutorAllocationManager(this, listenerBus, _conf))
          } else {
            None
          }
        _executorAllocationManager.foreach(_.start())
    
        setupAndStartListenerBus()
        postEnvironmentUpdate()
        postApplicationStart()
    
        // Make sure the context is stopped if the user forgets about it. This avoids leaving
        // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
        // is killed, though.
        _shutdownHookRef = ShutdownHookManager.addShutdownHook(
          ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
          logInfo("Invoking stop() from shutdown hook")
          stop()
        }
      } catch {
        case NonFatal(e) =>
          logError("Error initializing SparkContext.", e)
          try {
            stop()
          } catch {
            case NonFatal(inner) =>
              logError("Error stopping SparkContext after init error.", inner)
          } finally {
            throw e
          }
      }
    }
    

    初始化各种参数,设置默认值(只显示了部分内容)。

    创建并启动TaskScheduler,创建DAGScheduler等,见后面文章的分析。

    这里分析一下创建 Spark execution environment 的过程,即createSparkEnv(_conf, isLocal, listenerBus)

    2.1 创建SparkEnv

    • 进入org.apache.spark.SparkContext.scala
      private[spark] def createSparkEnv(
          conf: SparkConf,
          isLocal: Boolean,
          listenerBus: LiveListenerBus): SparkEnv = {
        SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf))
      }
    
    • 进入org.apache.spark.SparkEnv.scala
      /**
       * Create a SparkEnv for the driver.
       */
      private[spark] def createDriverEnv(
          conf: SparkConf,
          isLocal: Boolean,
          listenerBus: LiveListenerBus,
          numCores: Int,
          mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
        assert(conf.contains(DRIVER_HOST_ADDRESS),
          s"${DRIVER_HOST_ADDRESS.key} is not set on the driver!")
        assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
        val bindAddress = conf.get(DRIVER_BIND_ADDRESS)
        val advertiseAddress = conf.get(DRIVER_HOST_ADDRESS)
        val port = conf.get("spark.driver.port").toInt
        val ioEncryptionKey = if (conf.get(IO_ENCRYPTION_ENABLED)) {
          Some(CryptoStreamUtils.createKey(conf))
        } else {
          None
        }
        create(
          conf,
          SparkContext.DRIVER_IDENTIFIER,
          bindAddress,
          advertiseAddress,
          Option(port),
          isLocal,
          numCores,
          ioEncryptionKey,
          listenerBus = listenerBus,
          mockOutputCommitCoordinator = mockOutputCommitCoordinator
        )
      }
    

    该方法调用 create 方法创建 Driver 端的 SparkEnv。

    • 进入org.apache.spark.SparkEnv.scala
      /**
       * Helper method to create a SparkEnv for a driver or an executor.
       */
      private def create(
          conf: SparkConf,
          executorId: String,
          bindAddress: String,
          advertiseAddress: String,
          port: Option[Int],
          isLocal: Boolean,
          numUsableCores: Int,
          ioEncryptionKey: Option[Array[Byte]],
          listenerBus: LiveListenerBus = null,
          mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
    
        val isDriver = executorId == SparkContext.DRIVER_IDENTIFIER
    
        // Listener bus is only used on the driver
        if (isDriver) {
          assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")
        }
    
        val securityManager = new SecurityManager(conf, ioEncryptionKey)
        if (isDriver) {
          securityManager.initializeAuth()
        }
    
        ioEncryptionKey.foreach { _ =>
          if (!securityManager.isEncryptionEnabled()) {
            logWarning("I/O encryption enabled without RPC encryption: keys will be visible on the " +
              "wire.")
          }
        }
    
        val systemName = if (isDriver) driverSystemName else executorSystemName
        val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
          securityManager, numUsableCores, !isDriver)
    
        // Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied.
        if (isDriver) {
          conf.set("spark.driver.port", rpcEnv.address.port.toString)
        }
    
        // Create an instance of the class with the given name, possibly initializing it with our conf
        def instantiateClass[T](className: String): T = {
          val cls = Utils.classForName(className)
          // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just
          // SparkConf, then one taking no arguments
          try {
            cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE)
              .newInstance(conf, new java.lang.Boolean(isDriver))
              .asInstanceOf[T]
          } catch {
            case _: NoSuchMethodException =>
              try {
                cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T]
              } catch {
                case _: NoSuchMethodException =>
                  cls.getConstructor().newInstance().asInstanceOf[T]
              }
          }
        }
    
        // Create an instance of the class named by the given SparkConf property, or defaultClassName
        // if the property is not set, possibly initializing it with our conf
        def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = {
          instantiateClass[T](conf.get(propertyName, defaultClassName))
        }
    
        val serializer = instantiateClassFromConf[Serializer](
          "spark.serializer", "org.apache.spark.serializer.JavaSerializer")
        logDebug(s"Using serializer: ${serializer.getClass}")
    
        val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
    
        val closureSerializer = new JavaSerializer(conf)
    
        def registerOrLookupEndpoint(
            name: String, endpointCreator: => RpcEndpoint):
          RpcEndpointRef = {
          if (isDriver) {
            logInfo("Registering " + name)
            rpcEnv.setupEndpoint(name, endpointCreator)
          } else {
            RpcUtils.makeDriverRef(name, conf, rpcEnv)
          }
        }
    
        val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
    
        val mapOutputTracker = if (isDriver) {
          new MapOutputTrackerMaster(conf, broadcastManager, isLocal)
        } else {
          new MapOutputTrackerWorker(conf)
        }
    
        // Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint
        // requires the MapOutputTracker itself
        mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME,
          new MapOutputTrackerMasterEndpoint(
            rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
    
        // Let the user specify short names for shuffle managers
        val shortShuffleMgrNames = Map(
          "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
          "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
        val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
        val shuffleMgrClass =
          shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
        val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
    
        val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
        val memoryManager: MemoryManager =
          if (useLegacyMemoryManager) {
            new StaticMemoryManager(conf, numUsableCores)
          } else {
            UnifiedMemoryManager(conf, numUsableCores)
          }
    
        val blockManagerPort = if (isDriver) {
          conf.get(DRIVER_BLOCK_MANAGER_PORT)
        } else {
          conf.get(BLOCK_MANAGER_PORT)
        }
    
        val blockTransferService =
          new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress,
            blockManagerPort, numUsableCores)
    
        val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
          BlockManagerMaster.DRIVER_ENDPOINT_NAME,
          new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),
          conf, isDriver)
    
        // NB: blockManager is not valid until initialize() is called later.
        val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,
          serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager,
          blockTransferService, securityManager, numUsableCores)
    
        val metricsSystem = if (isDriver) {
          // Don't start metrics system right now for Driver.
          // We need to wait for the task scheduler to give us an app ID.
          // Then we can start the metrics system.
          MetricsSystem.createMetricsSystem("driver", conf, securityManager)
        } else {
          // We need to set the executor ID before the MetricsSystem is created because sources and
          // sinks specified in the metrics configuration file will want to incorporate this executor's
          // ID into the metrics they report.
          conf.set("spark.executor.id", executorId)
          val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager)
          ms.start()
          ms
        }
    
        val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse {
          new OutputCommitCoordinator(conf, isDriver)
        }
        val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator",
          new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator))
        outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef)
    
        val envInstance = new SparkEnv(
          executorId,
          rpcEnv,
          serializer,
          closureSerializer,
          serializerManager,
          mapOutputTracker,
          shuffleManager,
          broadcastManager,
          blockManager,
          securityManager,
          metricsSystem,
          memoryManager,
          outputCommitCoordinator,
          conf)
    
        // Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is
        // called, and we only need to do it for driver. Because driver may run as a service, and if we
        // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs.
        if (isDriver) {
          val sparkFilesDir = Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
          envInstance.driverTmpDir = Some(sparkFilesDir)
        }
    
        envInstance
      }
    

    该方法是一个通用方法,用于创建 Driver端Executor端 的SparkEnv。

    根据 executorId 区分 Driver端 还是 Executor 端,然后分别构建 RpcEnv、Serializer、ShuffleManager、BroadcastManager 等作为参数创建 SparkEnv。

    相关文章

      网友评论

          本文标题:Spark源码:初始化SparkContext

          本文链接:https://www.haomeiwen.com/subject/ifnkdctx.html