1.整体结构
Executor是在worker上的执行单元, 是一个独立的JVM哦!
在启动时, Executor也需要维护一个本地的SparkEnv. 这里给出和Driver一样的结构图. 内容不再累述
sparkEnv
Executor是在任务执行的时候, 由Driver向集群资源管理Master要资源, 资源管理中心将Woker(物理机)上的资源进行分配. Driver告知对应的节点启动Executor.
Executor是由对应的集群管理器的ExecutorBackend进程创建的
-
CoarseGrainedExecutorBackend
接到RegisteredExecutor
消息时创建(StandAlone和Yarn) -
MesosExecutorBackend
进行接到同类消息创建(Mesos)
Executor使用向Driver发送心跳来确定自己进入Driver的管理列表
任务执行逻辑
Driver监听到心跳后, SparkListener监听器告诉TaskScheduler又一个搬砖的到位了. TaskScheduler会分配具体的任务给Executor, Executor通过本地的TaskRunner(这是一个进程) 来执行对应的任务, 并把结果向上级报告.
2. Executor实例的创建
private[spark] class Executor(
executorId: String,
executorHostname: String,
env: SparkEnv,
userClassPath: Seq[URL] = Nil,
isLocal: Boolean = false)
extends Logging {
}
// Application dependencies (added through SparkContext) that we've fetched so far on this node.
// Each map holds the master's timestamp for the version of that file or JAR we got.
private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
private val currentJars: HashMap[String, Long] = new HashMap[String, Long]()
// Start worker thread pool
// 每个Executor可以同时执行多个Task, 所以用一个ThreadPool来管理, TaskRunner是实际的执行者, 是一个实现了Runnable接口的内部类
private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
private val executorSource = new ExecutorSource(threadPool, executorId)
// Akka's message frame size. If task result is bigger than this, we use the block manager
// to send the result back.
// 这个默认大小是128MB
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
// Limit of bytes for total size of results (default is 1GB)
private val maxResultSize = Utils.getMaxResultSize(conf)
// Maintains the list of running tasks.
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
// Executor for the heartbeat task.
// 这里要注意到心跳是一个单例, 避免单个Executor重复发送心跳包的问题
private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
当创建成果时, 可以看到下面这个打印日志
INFO Executor: Starting executor ID [executorId] on host [executorHostname]
-
Executor
需要sparkEnv里的BlockManager初始化成果以管理本地的Block对象 -
Executor
将ExecutorSource和shuffleMetricSource, 也就是本地资源注册到sparkEvn里的MetricsSystem, 经过心跳发送到Driver里进行统计 -
Executor
需要一个classLoader来挂载从Driver那里下载的第三方Jar -
Execuror
还需要序列化工具来帮助反序列化那些受到的任务指示包
3. 挂载任务
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer): Unit = {
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}
可以看到Executor
把接收到的序列化的任务, 交付给一个TaskRunner来工作, TaskRunner是一个内部类, 负责干活. Executor可以同时又多个TaskRunner干活, 所以用了一个线程池来管理.
4. TaskRunner
TaskRunner4.1 TaskRunner概要
TaskRunner
实现Runnable接口, 它是一个进程, 所以可以启动, 被杀死, 被打断, 被加入到各种Pool里面去, 抛出哪些进程应该抛出的错误. 加那些进程应该加的锁, 包含那些进程应该包含的BUG.
4.2 TaskRunne比较重要的几个内部器官
class TaskRunner(
execBackend: ExecutorBackend,
val taskId: Long,
val attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer)
extends Runnable {
/** Whether this task has been killed. */
@volatile private var killed = false
/** How much the JVM process has spent in GC when the task starts to run. */
@volatile var startGCTime: Long = _
/**
* The task to run. This will be set in run() by deserializing the task binary coming
* from the driver. Once it is set, it will never be changed.
*/
@volatile var task: Task[Any] = _
4.3 TaskRunner在运行中干了啥
截取部分源码进行注释
override def run(): Unit = {
// 初始化一个内存管理
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
// 开启ClassLoader
Thread.currentThread.setContextClassLoader(replClassLoader)
// 执行到这里的时候, 可以在日志中看到一个Task日志的开始
logInfo(s"Running $taskName (TID $taskId)")
// 向execBackend汇报当前的状态
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
// 统计垃圾回收的时间
startGCTime = computeTotalGcTime()
// 统计序列化的耗时
val deserializeStartTime = System.currentTimeMillis()
try {
// 把Driver端发过来的压缩后的图纸变成内存里看的懂得图纸, 开始搬砖吧
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
// 这里调用了一个Executor的Method, 从Driver端的HTTP服务器那里下之地第三方包
updateDependencies(taskFiles, taskJars)
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
task.setTaskMemoryManager(taskMemoryManager)
// If this task has been killed before we deserialized it, let's quit now. Otherwise,
// continue executing the task.
if (killed) {
// Throw an exception rather than returning, because returning within a try{} block
// causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
// exception will be caught by the catch block, leading to an incorrect ExceptionFailure
// for the task.
throw new TaskKilledException
}
logDebug("Task " + taskId + "'s epoch is " + task.epoch)
env.mapOutputTracker.updateEpoch(task.epoch)
// Run the actual task and measure its runtime.
taskStart = System.currentTimeMillis()
var threwException = true
val (value, accumUpdates) = try {
// 实际执行任务, 执行过程中需要申请Memroy资源和Block资源
val res = task.run(
taskAttemptId = taskId,
attemptNumber = attemptNumber,
metricsSystem = env.metricsSystem)
threwException = false
res
} finally {
// 执行完毕后释放刚才执行中用到的各种锁
val releasedLocks = env.blockManager.releaseAllLocksForTask(taskId)
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
// 清理不干净, 就会报一个Warning提示内存泄漏, 这是Debug中一个点
if (freedMemory > 0) {
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false) && !threwException) {
throw new SparkException(errMsg)
} else {
logError(errMsg)
}
}
// 锁释放不干净, 也会报错
if (releasedLocks.nonEmpty) {
val errMsg =
s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" +
releasedLocks.mkString("[", ", ", "]")
if (conf.getBoolean("spark.storage.exceptionOnPinLeak", false) && !threwException) {
throw new SparkException(errMsg)
} else {
logError(errMsg)
}
}
}
val taskFinish = System.currentTimeMillis()
// If the task has been killed, let's fail it.
if (task.killed) {
throw new TaskKilledException
}
// 对执行结果进行序列化
val resultSer = env.serializer.newInstance()
val beforeSerialization = System.currentTimeMillis()
val valueBytes = resultSer.serialize(value)
val afterSerialization = System.currentTimeMillis()
for (m <- task.metrics) {
/** 各种统计信息的整理
这些统计信息跟着心跳发送到Driver端, 并展示在WebUI上
*/
// 注意, 这里会对全局Accumulator变量进行update
m.updateAccumulators()
}
// directSend = sending directly back to the driver
val serializedResult: ByteBuffer = {
/** 对结果进行序列化
128MB以下通过Akka进行传输
超过了就通过BlockManager进行传输
这里会打印多个日志, 表述结果是多大, 有多少result发给了Driver
*/
}
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
} catch {
/** 各种错误处理 */
} finally {
runningTasks.remove(taskId)
}
}
}
4.4 统计信息包括
- [executorDeserializeTime]
- [executorDeserializeCpuTime]
- [executorRunTime]
- [executorCpuTime]
- [jvmGCTime]
- [resultSerializationTime]
ExecutorSource
另外一个比较重要的概念, 就是这个搬砖工有多大的力气, 这些数值都统计在了这个对象里
ExecutorSource Jconsole
Gauge | Description |
---|---|
threadpool.activeTasks | 可用线程数 |
threadpool.completeTasks | 已经完成的task数 |
threadpool.currentPool_size | 线程池容量 |
threadpool.maxPool_size | 最大容量 |
filesystem.hdfs.read_bytes | HDFS FileSystem.getAllStatistics() 中getBytesRead()
|
filesystem.hdfs.write_bytes | HDFS FileSystem.getAllStatistics() 中getBytesWritten()
|
filesystem.hdfs.read_ops | HDFS FileSystem.getAllStatistics() 中getReadOps()
|
filesystem.hdfs.largeRead_ops | HDFS FileSystem.getAllStatistics() 中getLargeReadOps() . |
filesystem.hdfs.write_ops | HDFS FileSystem.getAllStatistics() 中 getWriteOps() . |
网友评论