self.context.runJob(self, writeToFile)//开始提交任务,self就是最后一个rdd,这个rdd通过依赖关系进行stage切分
runJob(rdd, func)//将最后一个rdd和一个函数(taskContext, iterator)传入到该方法中
spark.logLineage = true 打印血统关系
//DAGScheduler的runJob方法,切分stage
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties)
这个方法中有
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
并返回一个回调器
在sparkContext中,new DAGScheduler
启动了一个先进先出的队列!!!:
在DAGScheduler的主构造器中的最后一行eventProcessLoop.start()
eventProcessLoop:DAGSchedulerEventProcessLoop 继承 abstract class EventLoop[E](name: String)
这个父类中有
private val eventQueue: BlockingQueue[E] = new LinkedBlockingDequeE
private val eventThread = new Thread(name) 该线程从上面的阻塞队列中取,有内容取,没内容阻塞,等着, 先进先出的调度器!
其中 onReceive(event) 的实现是它的子类DAGSchedulerEventProcessLoop的onReceive
dagscheduler中先将数据封装在event中,然后放到eventprocessloop阻塞队列中
DAGScheduler中的handleJobSubmitted用于切分stage(其中的newstage方法)
网友评论