美文网首页
Spark学习笔记(3)SparkContext源码

Spark学习笔记(3)SparkContext源码

作者: 灯火gg | 来源:发表于2019-01-31 17:01 被阅读0次

    概述

    Spark主程序的入口。一个SparkContext代表连接Spark集群,并且能用来创建RDD,累加器,广播变量在集群上。
    一个JVM只能有一个SparkContext。不过这个限制可能会被移除详情见 SPARK-2243 for more details.

    源码分析

    A)SparkContext概述

     // The call site where this SparkContext was constructed.
      private val creationSite: CallSite = Utils.getCallSite()
    /**
       * When called inside a class in the spark package, returns the name of the user code class
       * (outside the spark package) that called into Spark, as well as which Spark method they called.
       * This is used, for example, to tell users where in their code each RDD got created.
       *
       * @param skipClass Function that is used to exclude non-user-code classes.
       */
      def getCallSite(skipClass: String => Boolean = sparkInternalExclusionFunction): CallSite = {
        // Keep crawling up the stack trace until we find the first function not inside of the spark
        // package. We track the last (shallowest) contiguous Spark method. This might be an RDD
        // transformation, a SparkContext function (such as parallelize), or anything else that leads
        // to instantiation of an RDD. We also track the first (deepest) user method, file, and line.
        var lastSparkMethod = "<unknown>"
        var firstUserFile = "<unknown>"
        var firstUserLine = 0
        var insideSpark = true
        val callStack = new ArrayBuffer[String]() :+ "<unknown>"
    
        Thread.currentThread.getStackTrace().foreach { ste: StackTraceElement =>
          // When running under some profilers, the current stack trace might contain some bogus
          // frames. This is intended to ensure that we don't crash in these situations by
          // ignoring any frames that we can't examine.
          if (ste != null && ste.getMethodName != null
            && !ste.getMethodName.contains("getStackTrace")) {
            if (insideSpark) {
              if (skipClass(ste.getClassName)) {
                lastSparkMethod = if (ste.getMethodName == "<init>") {
                  // Spark method is a constructor; get its class name
                  ste.getClassName.substring(ste.getClassName.lastIndexOf('.') + 1)
                } else {
                  ste.getMethodName
                }
                callStack(0) = ste.toString // Put last Spark method on top of the stack trace.
              } else {
                if (ste.getFileName != null) {
                  firstUserFile = ste.getFileName
                  if (ste.getLineNumber >= 0) {
                    firstUserLine = ste.getLineNumber
                  }
                }
                callStack += ste.toString
                insideSpark = false
              }
            } else {
              callStack += ste.toString
            }
          }
        }
    

    功能描述:获取当前SparkContext的当前调用栈,将栈里最高进栈底的属于Spark或者Scala核心的类压入callStack的栈顶,并将此类的方法存入lastSparkMethod。将栈里最靠近栈顶的用户类放入callStack,将此类的行号存入firstUserLine,类名存入firstUserFile,最终返回样例类CallSite存储了最短栈和长度默认为20的最长栈的样例。

    // If true, log warnings instead of throwing exceptions when multiple SparkContexts are active
      private val allowMultipleContexts: Boolean =
        config.getBoolean("spark.driver.allowMultipleContexts", false)
    
      SparkContext.markPartiallyConstructed(this, allowMultipleContexts)
    

    功能描述SparkContext默认只有一个实例(由属性spark.driver.allowMultipleContexts来控制),用户需要多个SparkContext实例时,可以构建为true,方法markPartiallyConstructed用来确保实例的唯一性,并将标记为正在构建中。

    接下来对SparkConf进行复制,对各种信息校验。

     _conf = config.clone()
        _conf.validateSettings()
    
        if (!_conf.contains("spark.master")) {
          throw new SparkException("A master URL must be set in your configuration")
        }
        if (!_conf.contains("spark.app.name")) {
          throw new SparkException("An application name must be set in your configuration")
        }
    

    从上面可以看出来,创建SparkContext必须设置Master和App.Name。否者会抛出异常。

    B)创建执行环境SparkEnv

        def isLocal: Boolean = Utils.isLocalMaster(_conf)
    
        //An asynchronous listener bus for Spark events
        private[spark] def listenerBus: LiveListenerBus = _listenerBus
    
        //Create the Spark execution environment (cache, map output tracker, etc)
        _env = createSparkEnv(_conf, isLocal, listenerBus)
    

    上述代码中conf是SparkConf,isLocal标识是否是单机模式,ListenerBus是采用监听器模式维护各类时间的处理。

    1.创建安全管理器SecurityManager
    2.创建基于Akka的分布式消息系统ActorSystem
    3.创建Map任务输出跟踪器MapOutputTracker
    4.实例化ShuffleManager
    5.创建ShuffleMemoryManager
    6.创建块传输服务BlockTransferService
    7.创建BlocakManagerMaster
    8.创建块管理器BlockManager
    9.创建广播管理器BroadcastManager
    10.创建缓存管理器CacheManager
    11.创建Http文件服务器HttpFileServer
    12.创建测量系统MetricsSystem
    13.创建SparkEnv

    C)安全管理器SecurityManager
    对权限,账号进行设置,如果使用Yarn。则需要生成secret key登陆,最后给当前系统设置默认口令认证实例。

    D)基于Akka的分布式消息系统
    scala认为java线程通过共享数据以及通过锁来维护数据以及线程安全很糟糕。而且容易引起锁的征用,降低并发性能。甚至会有死锁。在Scala中只要字定义类型继承Actor,并且提供act方法,就如果java实现Runnable接口实现run方法一样,但是不能直接调用act方法,而是通过发送消息的方式。传递数据。如!

    Actor ! message

    E)MapOutputTracker
    每个map任务或者reduce任务都会有一个唯一标识,分别为reduceid和mapid,每个reduce任务的输入可能是多个map任务的输出,reduce会到各个map任务上拉去block,这个过程叫shuffle。每批shuffle都由shuffleid。
    MapOutputTrackerMaster内部使用mapStatuses:TimeStampedHshMap[Int,Array[MapStatus]]来维护跟踪各个map任务的输出其中key对应shuffleid。

    F)实例化ShuffleManager
    shuffleManager负责管理本地以及远程的block数据的shuffle操作。ShuffleManager默认为通过反射方式来生成SortShuffleManager的实例,可以修改属性为Hash使用HashShuffleManager。
    SortShuffleManager通过indexShuffleBlockManager间接操作BlockManager中的DiskBlockManager将map结果写入本地。并根据shuffleid,mapid写入索引文件

     // Let the user specify short names for shuffle managers
        val shortShuffleMgrNames = Map(
          "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
          "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
        val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
        val shuffleMgrClass =
          shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
        val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
    

    G)块传输服务BlockTransferService
    默认使用netty提供异步时间提供web客户端以及服务获取远程节点上Block的集合NettyBlockTransferService

    创建TaskScheduler

     // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
        // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
        _heartbeatReceiver = env.rpcEnv.setupEndpoint(
          HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
    
        // Create and start the scheduler
        val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
        _schedulerBackend = sched
        _taskScheduler = ts
        _dagScheduler = new DAGScheduler(this)
        _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
    
        // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
        // constructor
        _taskScheduler.start()
    

    createTaskScheduler传入sc,master,depolyMode返回SchedulerBackend,TaskScheduler.

    首先根据正则表达式判断Master URL

    master match {
          case "local" =>
            val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
            val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
            scheduler.initialize(backend)
            (backend, scheduler)
    
          case LOCAL_N_REGEX(threads) =>
            def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
            // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
            val threadCount = if (threads == "*") localCpuCount else threads.toInt
            if (threadCount <= 0) {
              throw new SparkException(s"Asked to run locally with $threadCount threads")
            }
            val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
            val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
            scheduler.initialize(backend)
            (backend, scheduler)
    
          case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
            def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
            // local[*, M] means the number of cores on the computer with M failures
            // local[N, M] means exactly N threads with M failures
            val threadCount = if (threads == "*") localCpuCount else threads.toInt
            val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
            val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
            scheduler.initialize(backend)
            (backend, scheduler)
    
          case SPARK_REGEX(sparkUrl) =>
            val scheduler = new TaskSchedulerImpl(sc)
            val masterUrls = sparkUrl.split(",").map("spark://" + _)
            val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
            scheduler.initialize(backend)
            (backend, scheduler)
    
          case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
            // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
            val memoryPerSlaveInt = memoryPerSlave.toInt
            if (sc.executorMemory > memoryPerSlaveInt) {
              throw new SparkException(
                "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
                  memoryPerSlaveInt, sc.executorMemory))
            }
    
            val scheduler = new TaskSchedulerImpl(sc)
            val localCluster = new LocalSparkCluster(
              numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
            val masterUrls = localCluster.start()
            val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
            scheduler.initialize(backend)
            backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
              localCluster.stop()
            }
            (backend, scheduler)
    
          case masterUrl =>
            val cm = getClusterManager(masterUrl) match {
              case Some(clusterMgr) => clusterMgr
              case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
            }
            try {
              val scheduler = cm.createTaskScheduler(sc, masterUrl)
              val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
              cm.initialize(scheduler, backend)
              (backend, scheduler)
            } catch {
              case se: SparkException => throw se
              case NonFatal(e) =>
                throw new SparkException("External scheduler cannot be instantiated", e)
            }
        }
      }
    
    image.png

    相关文章

      网友评论

          本文标题:Spark学习笔记(3)SparkContext源码

          本文链接:https://www.haomeiwen.com/subject/xjaqsqtx.html