二、spark程序的结构
当用户提交一个saprk程序时,用户在创类SparkContext实例时,在不同部署模式下,均会最终在集群各节点上,启动java虚拟机, 通过调用CoarseGrainedExecutorBackend类的main方法启动executor,excutor会启动线程池,侦听网络端串,接收串化代码后,并启动线程执行代码。
所以,spark程序包含二部分:
- 用户的spark程序,形式是用户提供的一个jar包
- executor的启动程序,main函数在CoarseGrainedExecutorBackend.scala中, spark-assembly.jar中包含。
1.CoarseGrainedExecutorBackend类启动分析
我们首先分析 CoarseGrainedExecutorBackend类,由于内容比较简单,直接追踪代码即可,以下做简要分析
1. main 解析参数,调用run
2. run 建立SparkEnv.createExecutorEnv
3. rpcEnv.setupEndpoint 设定 Endpoint
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(...))
4. 调用 env.rpcEnv.awaitTermination()阻塞进程
5. createExecutorEnv 里调用 RpcEnv.create 建立 rpcEnv
6. RpcEnv支持二种串化库 akka netty 配置如下:
"akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory"
"netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory" 默认
7. NettyRpcEnv 建立 NettyRpcEnv建立ipcEnv
8.setupEndpoint中,覆盖receive
9.receive 对启动任务的处理方法
建立Executor : case RegisteredExecutor(hostname) => executor = new Executor(...)
接收任务并启动: case LaunchTask(data) => executor.launchTask(...)
10. Executor.launchTask
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer): Unit = {
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}
2. 用户应用程序中的任务调度
这里主要分析基于Yarn Cluster模式部分的源码
主要是 CoarseGrainedSchedulerBackend、YarnClusterScheduler 类用于任务调度
1.createSparkEnv 中会建立任务调度器
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
2.createTaskScheduler中,分析 yarn模式部分的代码
case "yarn-standalone" | "yarn-cluster" =>
val scheduler = try {
val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
val cons = clazz.getConstructor(classOf[SparkContext])
cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
} catch {...}
val backend = try {
val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
} catch {...}
scheduler.initialize(backend)
(backend, scheduler)
用反射方法,生成实例,并最终调度器调用initialize初始化函数
3.YarnClusterScheduler 继承自 TaskSchedulerImpl 中定义了initialize初始化方法,定义一个任务池
def initialize(backend: SchedulerBackend) {
this.backend = backend
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
}
}
schedulableBuilder.buildPools()
}
4. SparkContext 中启动YarnClusterScheduler实例的start方法
_taskScheduler.start()
5. YarnClusterScheduler 的start方法继承自 TaskSchedulerImpl
override def start() {
//YarnClusterSchedulerBackend的初始化
backend.start()
}
6.YarnClusterSchedulerBackend 继承 YarnSchedulerBackend
override def start() {
super.start()
totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sc.conf)
}
7. YarnSchedulerBackend 继承 CoarseGrainedSchedulerBackend
val ENDPOINT_NAME = "CoarseGrainedScheduler"
override def start() {
val properties = new ArrayBuffer[(String, String)]
for ((key, value) <- scheduler.sc.conf.getAll) {
if (key.startsWith("spark.")) {
properties += ((key, value))
}
}
//建立一个driverEndpoint 用于发送接收消息
driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
createDriverEndpoint(properties))
}
protected def createDriverEndpoint(
properties: Seq[(String, String)]
) : DriverEndpoint = {
new DriverEndpoint(rpcEnv, properties)
}
8. DriverEndpoint 类中有收消息函数
override def receive: PartialFunction[Any, Unit] = {
case StatusUpdate(executorId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId)
}
}
case ReviveOffers =>
makeOffers()
case KillTask(taskId, executorId, interruptThread) =>
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.executorEndpoint.send(
KillTask(taskId, executorId, interruptThread))
}
}
其中executorDataMap存放着所有可用的executor
9. makeOffers
private def makeOffers() {
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq
launchTasks(scheduler.resourceOffers(workOffers))
}
private def makeOffers(executorId: String) {
if (executorIsAlive(executorId)) {
val executorData = executorDataMap(executorId)
val workOffers = Seq(
new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))
launchTasks(scheduler.resourceOffers(workOffers))
}
}
后面,会重点关注以下几个函数:
- scheduler.resourceOffers 资源任务绑定
- WorkerOffer 对计算资源的简单封装
- launchTask 启动任务函数
10. WorkerOffer 简单封装
case class WorkerOffer(executorId: String, host: String, cores: Int)
11. scheduler.resourceOffers(workOffers)
//在类core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala中定义
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
executorIdToRunningTaskIds.getOrElseUpdate(o.executorId, HashSet[Long]())
}
//打乱excetor的顺序,随机分配任务
val shuffledOffers = Random.shuffle(offers)
//每个excetor有一个task数组,初始化大小为cores数组。 只是分配的内存,并加入任务描述
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
//每个executor可用核数的数组
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue //取任务集
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
launchedTask = resourceOfferSingleTaskSet( taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
}
if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}
12.resourceOfferSingleTaskSe 任务绑定
private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
//确定该executor的能力是否满足任务要求
if (availableCpus(i) >= CPUS_PER_TASK) {
try {
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task // 向每个executor的数组里添加任务
val tid = task.taskId
taskIdToTaskSetManager(tid) = taskSet
taskIdToExecutorId(tid) = execId
executorIdToRunningTaskIds(execId).add(tid)
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK //CPU能力减弱
launchedTask = true
}
} catch {... }
}
return launchedTask
}
由此函数可知,--executor-cores的配置在CPUS_PER_TASK为1情况下,就是executor可并行执行的线程数
13. 该函数从任务队例中取任务,生成任务信息
def resourceOffer( execId: String, host: String, maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
if (!isZombie) {
val curTime = clock.getTimeMillis()
var allowedLocality = maxLocality
if (maxLocality != TaskLocality.NO_PREF) {
allowedLocality = getAllowedLocalityLevel(curTime)
if (allowedLocality > maxLocality) {
allowedLocality = maxLocality
}
}
dequeueTask(execId, host, allowedLocality) match {
case Some((index, taskLocality, speculative)) => {
val task = tasks(index)
val taskId = sched.newTaskId()
copiesRunning(index) += 1
val attemptNum = taskAttempts(index).size
val info = new TaskInfo(taskId, index, attemptNum, curTime,
execId, host, taskLocality, speculative)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
if (maxLocality != TaskLocality.NO_PREF) {
currentLocalityIndex = getLocalityIndex(taskLocality)
lastLaunchTime = curTime
}
val startTime = clock.getTimeMillis()
val serializedTask: ByteBuffer = try {
Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
} catch {... }
if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
!emittedTaskSizeWarning) {
emittedTaskSizeWarning = true
}
addRunningTask(taskId)
sched.dagScheduler.taskStarted(task, info)
return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
taskName, index, serializedTask))
}
case _ =>
}
}
None
}
14.launchTasks 发送任务
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
val serializedTask = ser.serialize(task)
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
executorData.executorEndpoint.send(
LaunchTask(
new SerializableBuffer(serializedTask)
)
)
}
以上代码用 executorData.executorEndpoint.send 把任务发给接收端
网友评论