美文网首页
Master原理剖析与源码分析

Master原理剖析与源码分析

作者: 有一束阳光叫温暖 | 来源:发表于2019-02-11 21:43 被阅读0次

    一、主备切换机制原理剖析

    Spark Master主备切换可以基于两种机制,一种是基于文件系统,一种是基于Zookeeper的。基于文件系统的主备切换机制需要在Active Master挂掉之后,由我们手动去切换到Standby Master上,而基于Zookeeper的主备切换机制,可以实现自动切换Master这里要说的Master主备切换机制,实际上指的就是,在Active Master挂掉之后,切换到Standby Master时,Master会做哪些操作

    1. 使用持久化引擎去读取持久化的storedApps、storesDrivers、storedWorkers、
      FileSystemPersistenceEngine、ZookeeperPersistenceEngine
    2. 判断,如何storedApps、storedDrivers、storedWokers有任何一个是非空的
    3. 将持久化的Application、Driver、Worker的信息重新进行注册,注册到Master内部的内部缓存结构中
    4. 将application和Worker的状态都修改为UNKNOWN,然后向Application所对应的Driver,以及worker发送Standby Master的地址
    5. Master在陆续接收到Driver和worker发送来的响应消息之后,会使用completeRecovery()方法对没有发送响应消息的Driver和Worker进行处理,过滤掉它们的消息
    6. 调用Master自己的schedule()方法,对正在等待资源调度Driver和application进行调度,比如在某个worker上启动Driver或者为Application在Worker上启动excutor
    主备切换机制原理剖析

    二、注册机制原理剖析与源码分析

    1. worker注册

    (1)worker在启动之后,就会主动向Master进行注册
    (2)Master将状态为DEAD的Worker过滤掉,对于状态为UNKNOWN的Worker,清理掉旧的Worker信息,替换为新的worker信息
    (3)把worker加入内存缓存中(HashMap)
    (4)把持久化引擎,将worker信息进行持久化(文件系统,Zookeeper)
    (5)调用schedule(方法)

    1. Driver注册

    (1) 用spark-sumbit提交spark Application时,首先会注册Driver
    (2) 将Driver信息放入内存缓存中
    (3) 加入等待调度队列(ArrayBuffer)
    (4) 用持久化引擎将Driver信息持久化
    (5) 调用schedule(方法)

    1. Application

    (1)Driver启动好了,执行我们编写的Application代码,执行SparkContext初始化,底层的SparkDeploySchedulerBackend,会通过AppClinet内部的线程,
    ClinetActor,发送RegisterApplication到Master进行Application的注册
    (2) 将Application信息放入内存缓存中
    (3) 加入等待调度队列(ArrayBuffer)
    (4) 用持久化引擎将Application信息持久化
    (5) 调用schedule(方法)

    注册机制原理剖析

    三、状态改变机制源码分配

    1. driver
    2. excutor

    四、资源调度机制源码分析

    schedule()代码片段

      private def schedule() {
    
        // 首先判断,master状态不是ALIVE的话直接返回
        // 也就是说standby masters是不会进行application等资源调度
        if (state != RecoveryState.ALIVE) { return }
    
        // First schedule drivers, they take strict precedence over applications
        // Randomization helps balance drivers
        // Random的原理就是对传入的集合的元素进行随机打乱
        // 取出workers中的所有之前注册上来的worker进行过滤,状态是ALIVE
        // 对状态Alive的worker,调用random.shuffle进行打乱
        val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
        val numWorkersAlive = shuffledAliveWorkers.size
        var curPos = 0
        // 首先,调度driver
        // 只有在yarn-cluster模式提交时候,才会调度driver,因为standalone和yarn-client模式都会在本地直接运行
        // 启动driver,而b不会来注册driver,就更不可能让master调度driver了
        for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
          // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
          // start from the last worker that was assigned a driver, and continue onwards until we have
          // explored all alive workers.
          var launched = false
          var numWorkersVisited = 0
          // 主要还有活着的worker没有遍历到,那就继续进行遍历
          // 当前driver没有被启动,launcher为false
          while (numWorkersVisited < numWorkersAlive && !launched) {
            val worker = shuffledAliveWorkers(curPos)
            numWorkersVisited += 1
            // 如何当前这个worker内存空闲量大于等于driver需要内存
            // 并且worker的空闲cpu数量,大于等于driverx需要cpu数量
            if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
              // 启动driver
              launchDriver(worker, driver)
              // 并且driver从waitingDrivers队列中移除
              waitingDrivers -= driver
              launched = true
            }
            // 指针指向下一个worker
            curPos = (curPos + 1) % numWorkersAlive
          }
        }
    

    launchDriver()方法

    /**
        * 在worker上,启动dirver
        * @param worker
        * @param driver
        */
      def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
        logInfo("Launching driver " + driver.id + " on worker " + worker.id)
        // 将driver加入worker内存的缓存结构
        // 将worker内使用的内存和cpu内存数量,都加上driver需要内存和cpu数量
        worker.addDriver(driver)
        // 同时把worker也加入到driver内存缓存结构中
        driver.worker = Some(worker)
        // 调用worker的actor,给它发送LanuchDriver消息,让worker来启动driver
        worker.actor ! LaunchDriver(driver.id, driver.desc)
        // 状态driver的状态设置RUNNING
        driver.state = DriverState.RUNNING
      }
    
    

    Application的调度算法有两种,一种是spreadOutApps。另一种是非spreadOutApps
    (1)spreadOutApps

    平均分配到每一个worker
    (2)非spreadOutApps
    少启动worker,每个worker能启动多少个core,就分配多少个core
    源码剖析

        // 将每个application要启动executor都平均分布到各个worker上去
        if (spreadOutApps) {
          // Try to spread out each app among all the nodes, until it has all its cores
          for (app <- waitingApps if app.coresLeft > 0) {
            val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
              .filter(canUse(app, _)).sortBy(_.coresFree).reverse
            val numUsable = usableWorkers.length
            // 创建一个空数组,存储了要分配给每个worker的cpu数量
            val assigned = new Array[Int](numUsable) // Number of cores to give on each node
            // 获取到底要分配多少cpu,取cpu剩余要分配的cpu数量和worker总共可用cpus数量最小值
            var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
            var pos = 0
            // cpu还分配完,就继续循环
            while (toAssign > 0) {
              // worker还有可分配cpu
              if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
                // 总要分配cpu-1
                toAssign -= 1
                assigned(pos) += 1
              }
              // 指针指向下一个worker
              pos = (pos + 1) % numUsable
            }
            // Now that we've decided how many cores to give on each node, let's actually give them
            // 给每个worker分配完application要求cpu core之后,遍历worker
            for (pos <- 0 until numUsable) {
              // 只要判断之前worker分配core
              if (assigned(pos) > 0) {
                // 在application内部缓存结构中,添加executor
                // 并创建ExecuotorDesc对象,其中封装了这个executor分配多少个cpu
                // 在spark-submit中,可以指定要多少个exectuor,每个executor多少个cpu,多少内存。
                //那么基于我们机制,实际上最后executor的数量,以及 每个executor的cpu可能与配置不一样
                // 比如要求3个executor,每个3个cpu,实际有9个worker,每个woker有一个cpu,要分配9个core,
                // 根据这个算法会给每个worker分配9个executor,每个executor 1个core
    
                val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
                // 启动executor
                launchExecutor(usableWorkers(pos), exec)
                app.state = ApplicationState.RUNNING
              }
            }
          }
        } else {
          // Pack each app into as few nodes as possible until we've assigned all its cores
          // 非spreadOutApps调度算法
          // 这个算法和spreadOutApps算法正好相反
          // 每个application 最大可能少使用worker,比如总共有10个worker节点,每个有10个core
          // application要分配20个core,根据这种算法只会分配到2个worker上,
    
          // 遍历worker,并且状态为ALIVE还有空闲cpu的worker
          for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
            // 遍历application,并且还有需要分配core的application
            for (app <- waitingApps if app.coresLeft > 0) {
              // 如果d当前这个worker可以被application使用
              if (canUse(app, worker)) {
                // 取出worker剩余cpu数量与app要分配的cpu的最小z值
                val coresToUse = math.min(worker.coresFree, app.coresLeft)
                if (coresToUse > 0) {
                  // 给app启动一个executor
                  val exec = app.addExecutor(worker, coresToUse)
                  // 启动executor
                  launchExecutor(worker, exec)
                  app.state = ApplicationState.RUNNING
                }
              }
            }
          }
        }
    
    

    相关文章

      网友评论

          本文标题:Master原理剖析与源码分析

          本文链接:https://www.haomeiwen.com/subject/gbsceqtx.html