美文网首页
Spark源码分析八-事件总线及ListenerBus

Spark源码分析八-事件总线及ListenerBus

作者: 无色的叶 | 来源:发表于2020-07-14 15:26 被阅读0次

    LiveListenerBus

    这个是Spark的 事件总线,定义了事件与监听器的映射关系,并不直接处理事件。而是把事件传递给专门的监听器处理,即它的功能是异步地将SparkListenerEvent传递给已经注册的SparkListener,这种异步的机制是通过生产消费者模型来实现的。

    首先,它定义了 4 个 消息堵塞队列,队列的名字分别为shared、appStatus、executorManagement、eventLog。队列的类型是 org.apache.spark.scheduler.AsyncEventQueue#AsyncEventQueue,保存在 queues 变量中。每一个队列上都可以注册监听器,如果队列没有监听器,则会被移除。

    它有启动和stop和start两个标志位来指示 监听总线的的启动停止状态。 如果总线没有启动,有事件过来,先放到 一个待添加的可变数组中,否则直接将事件 post 到每一个队列中。其直接依赖类是 AsyncEventQueue, 相当于 LiveListenerBus 的多事件队列是对 AsyncEventQueue 进一步的封装。

    ListenerBus特征是Spark内所有事件总线实现的基类,下图示出ListenerBus的一部分继承结构


    image.png

    AsyncEventQueue

    它有启动和stop和start两个标志位来指示 监听总线的的启动停止状态。

    其内部维护了listenersPlusTimers 主要就是用来保存注册到这个总线上的监听器对象的。

    post 操作将事件放入内部的 LinkedBlockingQueue中,默认大小是 10000。

    有一个事件分发器,它不停地从 LinkedBlockingQueue 执行 take 操作,获取事件,并将事件进一步分发给所有的监听器,由org.apache.spark.scheduler.SparkListenerBus#doPostEvent 方法实现事件转发

    private[spark] trait SparkListenerBus
      extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {
    
      protected override def doPostEvent(
          listener: SparkListenerInterface,
          event: SparkListenerEvent): Unit = {
        event match {
          case stageSubmitted: SparkListenerStageSubmitted =>
            listener.onStageSubmitted(stageSubmitted)
          case stageCompleted: SparkListenerStageCompleted =>
            listener.onStageCompleted(stageCompleted)
          case jobStart: SparkListenerJobStart =>
            listener.onJobStart(jobStart)
          case jobEnd: SparkListenerJobEnd =>
            listener.onJobEnd(jobEnd)
          case taskStart: SparkListenerTaskStart =>
            listener.onTaskStart(taskStart)
          case taskGettingResult: SparkListenerTaskGettingResult =>
            listener.onTaskGettingResult(taskGettingResult)
          case taskEnd: SparkListenerTaskEnd =>
            listener.onTaskEnd(taskEnd)
          case environmentUpdate: SparkListenerEnvironmentUpdate =>
            listener.onEnvironmentUpdate(environmentUpdate)
          case blockManagerAdded: SparkListenerBlockManagerAdded =>
            listener.onBlockManagerAdded(blockManagerAdded)
          case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
            listener.onBlockManagerRemoved(blockManagerRemoved)
          case unpersistRDD: SparkListenerUnpersistRDD =>
            listener.onUnpersistRDD(unpersistRDD)
          case applicationStart: SparkListenerApplicationStart =>
            listener.onApplicationStart(applicationStart)
          case applicationEnd: SparkListenerApplicationEnd =>
            listener.onApplicationEnd(applicationEnd)
          case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
            listener.onExecutorMetricsUpdate(metricsUpdate)
          case stageExecutorMetrics: SparkListenerStageExecutorMetrics =>
            listener.onStageExecutorMetrics(stageExecutorMetrics)
          case executorAdded: SparkListenerExecutorAdded =>
            listener.onExecutorAdded(executorAdded)
          case executorRemoved: SparkListenerExecutorRemoved =>
            listener.onExecutorRemoved(executorRemoved)
          case executorBlacklistedForStage: SparkListenerExecutorBlacklistedForStage =>
            listener.onExecutorBlacklistedForStage(executorBlacklistedForStage)
          case nodeBlacklistedForStage: SparkListenerNodeBlacklistedForStage =>
            listener.onNodeBlacklistedForStage(nodeBlacklistedForStage)
          case executorBlacklisted: SparkListenerExecutorBlacklisted =>
            listener.onExecutorBlacklisted(executorBlacklisted)
          case executorUnblacklisted: SparkListenerExecutorUnblacklisted =>
            listener.onExecutorUnblacklisted(executorUnblacklisted)
          case nodeBlacklisted: SparkListenerNodeBlacklisted =>
            listener.onNodeBlacklisted(nodeBlacklisted)
          case nodeUnblacklisted: SparkListenerNodeUnblacklisted =>
            listener.onNodeUnblacklisted(nodeUnblacklisted)
          case blockUpdated: SparkListenerBlockUpdated =>
            listener.onBlockUpdated(blockUpdated)
          case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
            listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
          case _ => listener.onOtherEvent(event)
        }
      }
    
    }
    
    

    相关文章

      网友评论

          本文标题:Spark源码分析八-事件总线及ListenerBus

          本文链接:https://www.haomeiwen.com/subject/fvzxhktx.html