Engine中node和core的解析
针对不同的部署方式,bigdl安装如下方式解析参数中core的数量,返回的结果为Some(node,core)
Local模式
local部署模式支持local[n]
和local[*]
两种,解析结果为:
local = Some(1,n) if local[n]
local = Some(1,getNumMachineCores) if local[*]
//对应的源代码如下
val master = conf.get("spark.master", null)
if (master.toLowerCase.startsWith("local")) {
// Spark local mode
val patternLocalN = "local\\[(\\d+)\\]".r
val patternLocalStar = "local\\[\\*\\]".r
master match {
case patternLocalN(n) => Some(1, n.toInt)
case patternLocalStar(_*) => Some(1, getNumMachineCores)
case _ => throw new IllegalArgumentException(s"Can't parser master $master")
}
}
Standalone 模式
standalone模式下通过spark.executor.cores(--executor-cores) 和spark.core.max(--total-executor-cores)来控制core的数量。
core = spark.executor.cores // --executor-cores
total = dynamicAllocationExecutor(conf).getOrElse{
spark.core.max // --total-executor-cores
}
require (total>=core && total%core == 0)
standalone = Some(total/core,core)
// 其中动态获取executor实现如下
private def dynamicAllocationExecutor(conf: SparkConf): Option[Int] = {
if (conf.get("spark.dynamicAllocation.enabled", null) == "true") {
val maxExecutors = conf.get("spark.dynamicAllocation.maxExecutors", "1").toInt
val minExecutors = conf.get("spark.dynamicAllocation.minExecutors", "1").toInt
require(maxExecutors == minExecutors, "Engine.init: " +
"spark.dynamicAllocation.maxExecutors and " +
"spark.dynamicAllocation.minExecutors must be identical " +
"in dynamic allocation for BigDL")
Some(minExecutors)
} else {
None
}
}
//实现如下
else if (master.toLowerCase.startsWith("spark")) {
// Spark standalone mode
val coreString = conf.get("spark.executor.cores", null)
val maxString = conf.get("spark.cores.max", null)
require(coreString != null, "Engine.init: Can't find executor core number" +
", do you submit with --executor-cores option")
require(maxString != null, "Engine.init: Can't find total core number" +
". Do you submit with --total-executor-cores")
val core = coreString.toInt
val nodeNum = dynamicAllocationExecutor(conf).getOrElse {
val total = maxString.toInt
require(total >= core && total % core == 0, s"Engine.init: total core " +
s"number($total) can't be divided " +
s"by single core number($core) provided to spark-submit")
total / core
}
Some(nodeNum, core)
}
Yarn 模式
core = spark.executor-cores // --executor-cores
node = dynamicAllocationExecutor(conf).getOrElse{
spark.executor.instances //--num-executors
}
yarn = Some(node,core)
//实现如下
else if (master.toLowerCase.startsWith("yarn")) {
// yarn mode
val coreString = conf.get("spark.executor.cores", null)
require(coreString != null, "Engine.init: Can't find executor core number" +
", do you submit with " +
"--executor-cores option")
val core = coreString.toInt
val node = dynamicAllocationExecutor(conf).getOrElse {
val numExecutorString = conf.get("spark.executor.instances", null)
require(numExecutorString != null, "Engine.init: Can't find executor number" +
", do you submit with " +
"--num-executors option")
numExecutorString.toInt
}
Some(node, core)
}
Mesos 模式
core = spark.executor.cores // --executor-cores
total = dynamicAllocationExecutor(conf).getOrElse{
spark.cores.max // --total-executor-cores
}
require(total>=core && total%core == 0)
mesos = Some(total/core,core)
//实现如下
else if (master.toLowerCase.startsWith("mesos")) {
// mesos mode
require(conf.get("spark.mesos.coarse", null) != "false", "Engine.init: " +
"Don't support mesos fine-grained mode")
val coreString = conf.get("spark.executor.cores", null)
require(coreString != null, "Engine.init: Can't find executor core number" +
", do you submit with --executor-cores option")
val core = coreString.toInt
val nodeNum = dynamicAllocationExecutor(conf).getOrElse {
val maxString = conf.get("spark.cores.max", null)
require(maxString != null, "Engine.init: Can't find total core number" +
". Do you submit with --total-executor-cores")
val total = maxString.toInt
require(total >= core && total % core == 0, s"Engine.init: total core " +
s"number($total) can't be divided " +
s"by single core number($core) provided to spark-submit")
total / core
}
Some(nodeNum, core)
}
k8s 模式
core = spark.executor.cores // --executor-cores
total = dynamicAllocationExecutor(conf).getOrElse{
spark.cores.max // --total-executor-cores
}
require(total>=core && total%core == 0)
k8s = Some(total/core,core)
// 实现
else if (master.toLowerCase.startsWith("k8s")) {
// Spark-on-kubernetes mode
val coreString = conf.get("spark.executor.cores", null)
val maxString = conf.get("spark.cores.max", null)
require(coreString != null, "Engine.init: Can't find executor core number" +
", do you submit with --conf spark.executor.cores option")
require(maxString != null, "Engine.init: Can't find total core number" +
". Do you submit with --conf spark.cores.max option")
val core = coreString.toInt
val nodeNum = dynamicAllocationExecutor(conf).getOrElse {
val total = maxString.toInt
require(total >= core && total % core == 0, s"Engine.init: total core " +
s"number($total) can't be divided " +
s"by single core number($core) provided to spark-submit")
total / core
}
Some(nodeNum, core)
}
上面是BigDL源码中对节点数和core数的解析,其他情况抛出异常--基于BigDL-0.6.0.
网友评论