美文网首页Spark源码解读
SparkContex源码解读(二)

SparkContex源码解读(二)

作者: lehi | 来源:发表于2016-03-27 20:10 被阅读232次

    版权声明:本文为原创文章,未经允许不得转载。
    继续前一篇的内容。前一篇内容为:
    SparkContex源码解读(一)http://www.jianshu.com/p/9e75c11a5081

    5.SparkContext如何在三种部署模式Standalone、YARN、Mesos下实现任务的调度

    SparkContext中有一句关键性的代码:
    //根据master(masterURL)及SparkContext对象创建TaskScheduler,返回SchedulerBackend及TaskScheduler
    <code>
    val (sched, ts) = SparkContext.createTaskScheduler(this, master)
    </code>
    createTaskScheduler方法如下所示:
    1.首先是匹配masterURL的正则表达式,从而判断程序的运行是那种模式
    <code>
    private def createTaskScheduler(
    sc: SparkContext,
    master: String): (SchedulerBackend, TaskScheduler) = {
    //匹配local[N] 和 local[]
    val LOCAL_N_REGEX = """local[([0-9]+|*)]""".r
    // 匹配local[N, maxRetries], maxRetries表示失败后的最大重复次数
    val LOCAL_N_FAILURES_REGEX = """local[([0-9]+|*)\s
    ,\s([0-9]+)]""".r
    // 匹配local-cluster[N, cores, memory],它是一种伪分布式模式
    val LOCAL_CLUSTER_REGEX = """local-cluster[\s
    ([0-9]+)\s,\s([0-9]+)\s,\s([0-9]+)\s]""".r
    //匹配 Spark Standalone集群运行模式
    val SPARK_REGEX = """spark://(.
    )""".r
    // 匹配 Mesos集群资源管理器运行模式匹配 mesos:// 或 zk:// url val MESOS_REGEX = """(mesos|zk)://.""".r
    // 匹配Spark in MapReduce v1,用于兼容老版本的Hadoop集群 val SIMR_REGEX = """simr://(.
    )""".r
    </code>
    2.通过1中的正则表达式进行匹配的情况如下所示:
    SchedulerBackend是一个trait,配置TaskSchedulerImpl类共同完成Task的资源调度。它的实现子类如下所示:

    SchedulerBackend子类.png
    SchedulerBackend的实现子类分别对应不同的情况,如下所示:
    (1)本地单线程运行模式,LocalBackend被用来运行本地的程序,即executor、backend和master都运行在同一JVM中。它与TaskSchedulerImpl配合处理,在它创建的单个Executor上执行tasks
    <code>
    master match {
    case "local" =>
    val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
    //LocalBackend被用来运行本地的程序,即executor、backend和master都运行在同一JVM中。它与TaskSchedulerImpl配合处理,在它创建的单个Executor上执行tasks
    val backend = new LocalBackend(sc.getConf, scheduler, 1)
    //TaskSchedulerImpl找到它的合作伙伴LocalBackend,根据调度模式(FIFO、FAIR)的不同构建不同的调度Tree
    scheduler.initialize(backend)
    (backend, scheduler)</code>
    (2)本地多线程运行模式,匹配local[N]和Local[],LocalBackend被用来运行本地的程序,即executor、backend和master都运行在同一JVM中。它与TaskSchedulerImpl配合处理,在它创建的N个Executor上执行tasks
    <code>
    case LOCAL_N_REGEX(threads) =>
    def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
    //local[N]表示本地N个线程同时在执行任务
    val threadCount = if (threads == "
    ") localCpuCount else threads.toInt
    if (threadCount <= 0) {
    throw new SparkException(s"Asked to run locally with $threadCount threads")
    }
    val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
    val backend = new LocalBackend(sc.getConf, scheduler, threadCount)
    scheduler.initialize(backend)
    (backend, scheduler)
    </code>
    (3)匹配local[, M]和local[N, M] ,local[N, M],如果为,则它的值是默认的cpu核数,LocalBackend被用来运行本地的程序,即executor、backend和master都运行在同一JVM中。它与TaskSchedulerImpl配合处理,在它创建的N个Executor上执行tasks
    <code>
    case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
    def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
    //local[N, M]意思是N个线程同时运行M个失败
    val threadCount = if (threads == "*") localCpuCount else threads.toInt
    val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
    val backend = new LocalBackend(sc.getConf, scheduler, threadCount)
    scheduler.initialize(backend)
    (backend, scheduler)
    </code>
    (4)匹配Spark Standalone运行模式,SparkDeploySchedulerBackend与TaskSchedulerImpl配合处理程序在standalone模式下的运行
    <code>
    case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
    case SPARK_REGEX(sparkUrl) =>
    val scheduler = new TaskSchedulerImpl(sc)
    val masterUrls = sparkUrl.split(",").map("spark://" + _)
    val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
    scheduler.initialize(backend)
    (backend, scheduler)
    </code>
    (5)匹配local-cluster运行模式即伪分布模式,SparkDeploySchedulerBackend与TaskSchedulerImpl配合处理程序在单机伪模式下的运行
    <code>
    case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
    // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
    val memoryPerSlaveInt = memoryPerSlave.toInt
    if (sc.executorMemory > memoryPerSlaveInt) {
    throw new SparkException(
    "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
    memoryPerSlaveInt, sc.executorMemory))
    }
    val scheduler = new TaskSchedulerImpl(sc)
    val localCluster = new LocalSparkCluster(
    numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
    val masterUrls = localCluster.start()
    val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
    scheduler.initialize(backend)
    backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
    localCluster.stop()
    }
    (backend, scheduler)</code>
    (6)"yarn-standalone"或"yarn-cluster"运行模式,通过反射得到YARN模式下的调度器YarnClusterSchedulerBackend与YarnClusterScheduler配合处理程序YARN模式下的运行
    <code>
    case "yarn-standalone" | "yarn-cluster" =>
    if (master == "yarn-standalone") {
    logWarning(
    ""yarn-standalone" is deprecated as of Spark 1.0. Use "yarn-cluster" instead.")
    }
    //通过反射得到YARN模式下的调度器,类YarnClusterScheduler是CoarseGrainedSchedulerBackend的子类,说明YARN是一种粗粒度的调度模式
    val scheduler = try {
    val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
    val cons = clazz.getConstructor(classOf[SparkContext])
    cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
    } catch {
    case e: Exception => {
    throw new SparkException("YARN mode not available ?", e)
    }
    }
    //通过反射的到YARN模式的调度类YarnClusterSchedulerBackend
    val backend = try {
    val clazz =
    Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
    val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
    cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
    } catch {
    case e: Exception => {
    throw new SparkException("YARN mode not available ?", e)
    }
    }
    scheduler.initialize(backend)
    (backend, scheduler)
    </code>
    (7)yarn-client运行模式,通过反射得到YARN模式下的调度器YarnClusterSchedulerBackend与YarnClusterScheduler配合处理程序YARN模式下的运行
    <code>
    case "yarn-client" =>
    val scheduler = try {
    val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler")
    val cons = clazz.getConstructor(classOf[SparkContext])
    cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
    } catch {
    case e: Exception => {
    throw new SparkException("YARN mode not available ?", e)
    }
    }
    val backend = try {
    val clazz =
    Utils.classForName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
    val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
    cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
    } catch {
    case e: Exception => {
    throw new SparkException("YARN mode not available ?", e)
    }
    }
    scheduler.initialize(backend)
    (backend, scheduler)</code>

    (8)匹配Mesos运行模式,mesos有粗粒度和细粒度两种调度模式,CoarseMesosSchedulerBackend(粗粒度)和MesosSchedulerBackend(细粒度)与TaskSchedulerImpl配合处理程序在standalone模式下的运行
    <code>
    case mesosUrl @ MESOS_REGEX(_) =>
    MesosNativeLibrary.load()
    val scheduler = new TaskSchedulerImpl(sc)
    val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false)
    val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
    //mesos有粗粒度和细粒度两种调度模式
    val backend = if (coarseGrained) {
    new CoarseMesosSchedulerBackend(scheduler, sc, url, sc.env.securityManager)
    } else {
    new MesosSchedulerBackend(scheduler, sc, url)
    }
    scheduler.initialize(backend)
    (backend, scheduler)</code>

    (9)匹配Spark IN MapReduce V1运行模式,SimrSchedulerBackend与TaskSchedulerImpl配合处理程序在MapReduce V1模式下的运行
    <code>
    case SIMR_REGEX(simrUrl) =>
    val scheduler = new TaskSchedulerImpl(sc)
    val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl)
    scheduler.initialize(backend)
    (backend, scheduler)
    </code>

    (10)异常信息,如果masterURL都不能匹配成功的话,那么将抛SparkException异常信息
    <code>
    case _ =>
    throw new SparkException("Could not parse Master URL: '" + master + "'")
    </code>
    那么它们的具体调度又是怎么实现的呢?请关注后面的文章:-D

    相关文章

      网友评论

        本文标题:SparkContex源码解读(二)

        本文链接:https://www.haomeiwen.com/subject/thbdlttx.html