更多关注spark streaming源码分析之流程详解
DStreamGraph的作用是什么呢?
- DStreamGraph通过持有所有的inputstream和outputstream,划分提交job
- 清理,spark streaming中一直接收数据,会不会把内存撑爆?checkpoint的data什么时候清理?什么时候更新?
//1. add input/output DS
def addInputStream(inputStream: InputDStream[_]) {
this.synchronized {
inputStream.setGraph(this)
inputStreams += inputStream
}
}
//output 同上
//2. 调用持有的output ds实例提交作业
def generateJobs(time: Time): Seq[Job] = {
.....
val jobs = this.synchronized {
outputStreams.flatMap { outputStream =>
val jobOption = outputStream.generateJob(time)
jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption
}
}
jobs
}
//3.清理,spark streaming中一直接收数据,会不会把内存撑爆?checkpoint的data什么时候清理?什么时候更新?等等见博客:
def clearMetadata(time: Time)
def updateCheckpointData(time: Time)
def clearCheckpointData(time: Time)
def restoreCheckpointData()
网友评论