美文网首页
Executor And Cores Set In BigDL

Executor And Cores Set In BigDL

作者: 由木人_番茄 | 来源:发表于2019-03-21 11:14 被阅读0次

Engine中node和core的解析

针对不同的部署方式,bigdl安装如下方式解析参数中core的数量,返回的结果为Some(node,core)

Local模式

local部署模式支持local[n]local[*]两种,解析结果为:

local = Some(1,n)  if local[n]
local = Some(1,getNumMachineCores) if local[*]


//对应的源代码如下
val master = conf.get("spark.master", null)
if (master.toLowerCase.startsWith("local")) {
  // Spark local mode
  val patternLocalN = "local\\[(\\d+)\\]".r
  val patternLocalStar = "local\\[\\*\\]".r
  master match {
    case patternLocalN(n) => Some(1, n.toInt)
    case patternLocalStar(_*) => Some(1, getNumMachineCores)
    case _ => throw new IllegalArgumentException(s"Can't parser master $master")
  }
}

Standalone 模式

standalone模式下通过spark.executor.cores(--executor-cores) 和spark.core.max(--total-executor-cores)来控制core的数量。

core = spark.executor.cores  // --executor-cores
total = dynamicAllocationExecutor(conf).getOrElse{
  spark.core.max // --total-executor-cores
}

require (total>=core && total%core == 0)
standalone = Some(total/core,core)


// 其中动态获取executor实现如下
private def dynamicAllocationExecutor(conf: SparkConf): Option[Int] = {
  if (conf.get("spark.dynamicAllocation.enabled", null) == "true") {
    val maxExecutors = conf.get("spark.dynamicAllocation.maxExecutors", "1").toInt
    val minExecutors = conf.get("spark.dynamicAllocation.minExecutors", "1").toInt
    require(maxExecutors == minExecutors, "Engine.init: " +
      "spark.dynamicAllocation.maxExecutors and " +
      "spark.dynamicAllocation.minExecutors must be identical " +
      "in dynamic allocation for BigDL")
    Some(minExecutors)
  } else {
    None
  }
}

//实现如下
else if (master.toLowerCase.startsWith("spark")) {
  // Spark standalone mode
  val coreString = conf.get("spark.executor.cores", null)
  val maxString = conf.get("spark.cores.max", null)
  require(coreString != null, "Engine.init: Can't find executor core number" +
    ", do you submit with --executor-cores option")
  require(maxString != null, "Engine.init: Can't find total core number" +
    ". Do you submit with --total-executor-cores")
  val core = coreString.toInt
  val nodeNum = dynamicAllocationExecutor(conf).getOrElse {
    val total = maxString.toInt
    require(total >= core && total % core == 0, s"Engine.init: total core " +
      s"number($total) can't be divided " +
      s"by single core number($core) provided to spark-submit")
    total / core
  }
  Some(nodeNum, core)
} 

Yarn 模式

core = spark.executor-cores // --executor-cores
node = dynamicAllocationExecutor(conf).getOrElse{
  spark.executor.instances //--num-executors
}

yarn = Some(node,core)

//实现如下
else if (master.toLowerCase.startsWith("yarn")) {
  // yarn mode
  val coreString = conf.get("spark.executor.cores", null)
  require(coreString != null, "Engine.init: Can't find executor core number" +
    ", do you submit with " +
    "--executor-cores option")
  val core = coreString.toInt
  val node = dynamicAllocationExecutor(conf).getOrElse {
    val numExecutorString = conf.get("spark.executor.instances", null)
    require(numExecutorString != null, "Engine.init: Can't find executor number" +
      ", do you submit with " +
      "--num-executors option")
    numExecutorString.toInt
  }
  Some(node, core)
} 

Mesos 模式

core = spark.executor.cores // --executor-cores
total = dynamicAllocationExecutor(conf).getOrElse{
  spark.cores.max // --total-executor-cores
}
require(total>=core && total%core == 0)
mesos = Some(total/core,core)

//实现如下
else if (master.toLowerCase.startsWith("mesos")) {
  // mesos mode
  require(conf.get("spark.mesos.coarse", null) != "false", "Engine.init: " +
    "Don't support mesos fine-grained mode")
  val coreString = conf.get("spark.executor.cores", null)
  require(coreString != null, "Engine.init: Can't find executor core number" +
    ", do you submit with --executor-cores option")
  val core = coreString.toInt
  val nodeNum = dynamicAllocationExecutor(conf).getOrElse {
    val maxString = conf.get("spark.cores.max", null)
    require(maxString != null, "Engine.init: Can't find total core number" +
      ". Do you submit with --total-executor-cores")
    val total = maxString.toInt
    require(total >= core && total % core == 0, s"Engine.init: total core " +
      s"number($total) can't be divided " +
      s"by single core number($core) provided to spark-submit")
    total / core
  }
  Some(nodeNum, core)
} 

k8s 模式

core = spark.executor.cores // --executor-cores
total = dynamicAllocationExecutor(conf).getOrElse{
  spark.cores.max // --total-executor-cores
}
require(total>=core && total%core == 0)
k8s = Some(total/core,core)

// 实现
else if (master.toLowerCase.startsWith("k8s")) {
  // Spark-on-kubernetes mode
  val coreString = conf.get("spark.executor.cores", null)
  val maxString = conf.get("spark.cores.max", null)
  require(coreString != null, "Engine.init: Can't find executor core number" +
    ", do you submit with --conf spark.executor.cores option")
  require(maxString != null, "Engine.init: Can't find total core number" +
    ". Do you submit with --conf spark.cores.max option")
  val core = coreString.toInt
  val nodeNum = dynamicAllocationExecutor(conf).getOrElse {
    val total = maxString.toInt
    require(total >= core && total % core == 0, s"Engine.init: total core " +
      s"number($total) can't be divided " +
      s"by single core number($core) provided to spark-submit")
    total / core
  }
  Some(nodeNum, core)
} 

上面是BigDL源码中对节点数和core数的解析,其他情况抛出异常--基于BigDL-0.6.0.

相关文章

网友评论

      本文标题:Executor And Cores Set In BigDL

      本文链接:https://www.haomeiwen.com/subject/ynqqvqtx.html