美文网首页
关于 spark context 初始化TaskSchedule

关于 spark context 初始化TaskSchedule

作者: mahua | 来源:发表于2016-11-03 16:28 被阅读0次
case masterUrl =>
  val cm = getClusterManager(masterUrl) match {
    case Some(clusterMgr) => clusterMgr
    case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
  }
  try {
    val scheduler = cm.createTaskScheduler(sc, masterUrl)
    val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
    cm.initialize(scheduler, backend)
    (backend, scheduler)
  } catch {
    case se: SparkException => throw se
    case NonFatal(e) =>
      throw new SparkException("External scheduler cannot be instantiated", e)
  }
private def getClusterManager(url: String): Option[ExternalClusterManager] = {
  val loader = Utils.getContextOrSparkClassLoader
  val serviceLoaders =
  ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
  if (serviceLoaders.size > 1) {
    throw new SparkException(s"Multiple Cluster Managers ($serviceLoaders) registered " +
        s"for the url $url:")
  }
  serviceLoaders.headOption
}

这边 是如何将

class YarnClusterManager extends ExternalClusterManager

回答:
通过getClusterManager 方法中的:

ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))

先加载所有ExternalClusterManager的实现类,然后根据方法中的canCreate方法来判断是哪一个实现是符合url 的规制的将这个
实例对象判断出来。
比如:

YarnClusterManager 中
override def canCreate(masterURL: String): Boolean = {
  masterURL == "yarn"
}

两个实例的创建过程:

val scheduler = cm.createTaskScheduler(sc, masterUrl)
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)
(backend, scheduler)

其中 cm 表示YarnClusterManager(以yarn为例) 所以三个方法的具体实现如下

override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
  sc.deployMode match {
    case "cluster" => new YarnClusterScheduler(sc)
    case "client" => new YarnScheduler(sc)
    case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
  }
}

override def createSchedulerBackend(sc: SparkContext,
    masterURL: String,
    scheduler: TaskScheduler): SchedulerBackend = {
  sc.deployMode match {
    case "cluster" =>
      new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
    case "client" =>
      new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
    case  _ =>
      throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
  }
}

override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
  scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
}

相关文章

网友评论

      本文标题:关于 spark context 初始化TaskSchedule

      本文链接:https://www.haomeiwen.com/subject/ilxiuttx.html