1. 结构概要
Listener Bus和其它的监听器方法一样, 实现一个总线, DAGScheduler通过监听总线上的多个事件来得知当前的任务进度, 继而规划下一步的安排.
LiveListenerBus关注的是Application相关的Event, 包括APP的运行状态, JOB的状态等等
ListenerBus内部使用 java.util.concurrent.LinkedBlockingQueue 默认长度10000 来实现一个消息队列. 它本身是AsynchronousListenerBus
的一个具体实现
SparkListenerBus
使用一个Thread Poll Message from Bus
这个Bus也是最重要的bus, 挂载在sparkContext里为后续一系列的service提供服务
2. 部分代码解读
/**
* Asynchronously passes events to registered listeners.
*
* Until `start()` is called, all posted events are only buffered. Only after this listener bus
* has started will events be actually propagated to all attached listeners. This listener bus
* is stopped when `stop()` is called, and it will drop further events after stopping.
*
* @param name name of the listener bus, will be the name of the listener thread.
* @tparam L type of listener
* @tparam E type of event
*/
private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: String)
extends ListenerBus[L, E] {
self =>
private var sparkContext: SparkContext = null
/* Cap the capacity of the event queue so we get an explicit error (rather than
* an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
private val EVENT_QUEUE_CAPACITY = 10000
// 这是一个线程安全的链表
private val eventQueue = new LinkedBlockingQueue[E](EVENT_QUEUE_CAPACITY)
// Indicate if `start()` is called
private val started = new AtomicBoolean(false)
// Indicate if `stop()` is called
private val stopped = new AtomicBoolean(false)
/** A counter for dropped events. It will be reset every time we log it. */
private val droppedEventsCounter = new AtomicLong(0L)
/** When `droppedEventsCounter` was logged last time in milliseconds. */
@volatile private var lastReportTimestamp = 0L
// Indicate if we are processing some event
// Guarded by `self`
private var processingEvent = false
// A counter that represents the number of events produced and consumed in the queue
private val eventLock = new Semaphore(0)
// 这个内部Thread是用来从队里里拉信息处理的, 所以只有队列里有信息的时候, 也就是信号量>1的时候才能进行处理
private val listenerThread = new Thread(name) {
// 这是一个守护进程哦!
setDaemon(true)
override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
AsynchronousListenerBus.withinListenerThread.withValue(true) {
while (true) {
// 队列里有信息的时候就会得到信号量, 另外一种情况是bus被关闭的时候, 也会取得信号量好把剩下的event都处理掉
eventLock.acquire()
// 加标志位, 告诉大家我们开工了
self.synchronized {
processingEvent = true
}
try {
val event = eventQueue.poll
// 各种错误处理
// 把消息发送给所有的监听者
postToAll(event)
} finally {
// 加标志位, 告诉大家工作搞定
self.synchronized {
processingEvent = false
}
}
}
}
}
}
/**
* Start sending events to attached listeners.
*
* This first sends out all buffered events posted before this listener bus has started, then
* listens for any additional events asynchronously while the listener bus is still running.
* This should only be called once.
*
* @param sc Used to stop the SparkContext in case the listener thread dies.
*/
def start(sc: SparkContext) {
// 这里用CAS保证了我们仅仅启动一个LisenerThread进程来处理这些消息
if (started.compareAndSet(false, true)) {
sparkContext = sc
listenerThread.start()
} else {
throw new IllegalStateException(s"$name already started!")
}
}
// 添加消息到等待队列中
def post(event: E) {
// 各种错误检查后, 把event加入到队列中
val eventAdded = eventQueue.offer(event)
// 消息添加成功, 就可以释放信号量
if (eventAdded) {
eventLock.release()
} else {
onDropEvent(event)
droppedEventsCounter.incrementAndGet()
}
// 如果队列满了, 或者其它什么高并发的bug发生了, 消息可能失败, 这个时候就要用log记录. 这里为了防止日志爆炸, 设定上是60秒打印一次汇总.
val droppedEvents = droppedEventsCounter.get
if (droppedEvents > 0) {
// Don't log too frequently
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
// There may be multiple threads trying to decrease droppedEventsCounter.
// Use "compareAndSet" to make sure only one thread can win.
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
// then that thread will update it.
if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
logWarning(s"Dropped $droppedEvents SparkListenerEvents since " +
new java.util.Date(prevLastReportTimestamp))
}
}
}
}
/**
* Return whether the event queue is empty.
*
* The use of synchronized here guarantees that all events that once belonged to this queue
* have already been processed by all attached listeners, if this returns true.
*/
private def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty && !processingEvent }
/**
* Stop the listener bus. It will wait until the queued events have been processed, but drop the
* new events after stopping.
*/
def stop() {
if (!started.get()) {
throw new IllegalStateException(s"Attempted to stop $name that has not yet started!")
}
// 这里也是用CAS来保证关闭时一次性的, 而且这里关闭时, 其它正在处理的进程会通过信号量失败掉
if (stopped.compareAndSet(false, true)) {
// Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know
// `stop` is called.
eventLock.release()
// 会等待所有的event处理完了再结束, 不会立即结束.
listenerThread.join()
} else {
// Keep quiet
}
}
/**
* If the event queue exceeds its capacity, the new events will be dropped. The subclasses will be
* notified with the dropped events.
*
* Note: `onDropEvent` can be called in any thread.
*/
def onDropEvent(event: E): Unit
}
网友评论