美文网首页
spark源码阅读之executor模块①

spark源码阅读之executor模块①

作者: invincine | 来源:发表于2019-01-21 15:01 被阅读0次

    本文基于Spark 1.6.3源码,采用一步一步深入的方式来展开阅读,本文是为了纪录自己在阅读源码时候的思路,看完一遍真的很容易忘记,写一篇文章梳理一遍可以加深印象。

    SparkContext:Spark应用的入口

    SparkContext是用户应用于Spark集群交互的主要接口,所以把SparkContext作为入口来展开executor的源码阅读,主要针对standaone模式下的executor模块。

    SparkContext通过调用createTaskScheduler()方法来创建两个重要的类:TaskScheduler和SchedulerBackend

        // Create and start the scheduler
        val (sched, ts) = SparkContext.createTaskScheduler(this, master)
        _schedulerBackend = sched
        _taskScheduler = ts
        _dagScheduler = new DAGScheduler(this)
        _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
    
        // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
        // constructor
        // 在DAGScheduler的构造中持有TaskScheduler的引用之后,开始TaskScheduler
        _taskScheduler.start()
    

    两个重要的类

    TaskScheduler类:低级的task调度接口,仅有一个实现类为:TaskSchedulerImpl,这个类的作用是为高级task调度接口DAGScheduler划分好的stage分配TaskSet,然后提交给Spark集群,处理Task的运行消息,并将event返回给DAGScheduler,这里可以看出DAGScheduler实例化后持有了TaskSchedulerImpl的引用,有关DAGScheduler与TaskSchedulerImpl配合的调度机制,在后面的文章中展开。

    SchedulerBackend类:调度的后台接口,实现类有很多,根据传入的master url采用模式匹配的方式来确定需要什么实现类,主要的作用是当有新的task或者资源变动时找到合适的executor来分配资源,或者是处理从TaskSchedulerImpl发出杀掉Task请求。

    在standalone模式中,SchedulerBackend的具体实现类为:SparkDeploySchedulerBackend,通过以下createTaskScheduler()方法中的截选代码可以了解这个过程:

    case SPARK_REGEX(sparkUrl) =>
            val scheduler = new TaskSchedulerImpl(sc)
            val masterUrls = sparkUrl.split(",").map("spark://" + _)
            val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
            scheduler.initialize(backend)
            (backend, scheduler)
    

    scheduler.initialize(backend)表明了TaskSchedulerImpl持有backend的引用,且在这个方法里初始化了用于FIFO和FAIR两种调度模式的容器和池,这部分放到调度模块展开。

    至此为止,两个重要的类的实例已经构造完毕:TaskSchedulerImpl和SparkDeploySchedulerBackend

    driverEndpoint和appClient的初始化

    紧接着,调用了TaskSchedulerImpl的start()方法,在start()方法中首先调用了backend的start()方法

    override def start() {
        backend.start()   //调用SchedulerBackend的start()方法
    
        // 如果开启了推测执行功能的话,就开启一条speculation线程来计算,参数是通过配置文件的参数来传入,或者使用默认值
        if (!isLocal && conf.getBoolean("spark.speculation", false)) {
          logInfo("Starting speculative execution thread")
          speculationScheduler.scheduleAtFixedRate(new Runnable {
            override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
              checkSpeculatableTasks()
            }
          }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
        }
      }
    

    SparkDeploySchedulerBackend的start()方法首先调用了super的start()方法,这里需要说明的是SparkDeploySchedulerBackend并不是直接继承自SchedulerBackend,而是继承自CoarseGrainedSchedulerBackend,CoarseGrainedSchedulerBackend继承自SchedulerBackend
    这样的话,最后其实调用的是CoarseGrainedSchedulerBackend的start()方法,代码如下:

    override def start() {
        val properties = new ArrayBuffer[(String, String)]
        for ((key, value) <- scheduler.sc.conf.getAll) {
          if (key.startsWith("spark.")) {
            properties += ((key, value))
          }
        }
    
        // TODO (prashant) send conf instead of properties
        driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
      }
    

    start方法中注册了DriverEndpoint,调用createDriverEndpoint方法创建了一个DriverEndpoint的实例,至此DriverEndpoint创建完成,DriverEndpoint在实例化的过程中,会去调用生命周期中onstart方法,在onStart方法中会周期性的执行以下代码:Option(self).foreach(_.send(ReviveOffers))
    即自己给自己发送ReviveOffers的消息,收到ReviveOffers消息后会调用makeOffers方法选出合适executor然后分配资源。

    SparkDeploySchedulerBackend在start方法中,还创建了AppClient实例:

        val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
          command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
        client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
        client.start()
    

    AppClient实例封装了关于application的一些信息ApplicationDescription,如appName,maxCores,executorMemory等
    client.start()方法中注册了AppClient中的通信端ClientEndpoint

    def start() {
        // Just launch an rpcEndpoint; it will call back into the listener.
        // 注册appClient的rpcEndpoint
        endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
      }
    

    在注册ClientEndpoint的过程中,调用其生命周期中的onstart方法

    override def onStart(): Unit = {
          try {
            registerWithMaster(1)
          } catch {
            case e: Exception =>
              logWarning("Failed to connect to master", e)
              markDisconnected()
              stop()
          }
        }
    

    至此,DriverEndpoint和AppClient都已经实例化完成
    DriverEndpoint已经准备好了,一旦有新的application提交或是集群的资源发生了变化,即调用makeoffers方法去分配资源;
    AppClient在注册ClientEndpoint的过程中,将要调用registerWithMaster将application注册请求提交给Master。

    registerWithMaster之后的剖析将会放在下一篇文章里继续深入。

    相关文章

      网友评论

          本文标题:spark源码阅读之executor模块①

          本文链接:https://www.haomeiwen.com/subject/apbidqtx.html