最近刚好有点时间, 也把Spark代码看了看,这个系列会一直更新下去,由于本人的scala功底也不是太强,这里权当记录了,如果有人看到我的文章,发现其中的错误,也请大家指正。
hello world
package com.dubin
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Created by dubin on 17/2/9.
*/
object NetworkWordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("first spark streaming app")
val ssc = new StreamingContext(sparkConf, Seconds(10))
ssc.checkpoint("/tmp")
val addFunc = (currValues: Seq[Int], prevValue: Option[Int]) => {
val currentCount = currValues.sum
val prevCount = prevValue.getOrElse(0)
Some(currentCount + prevCount)
}
val lines = ssc.socketTextStream(args(0), args(1).toInt)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val totalWordCounts = pairs.updateStateByKey[Int](addFunc)
totalWordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
Spark Core里的RDD的概念有比较重要的一条是RDD创建之后就不会改变了,那像streaming这种应用,表面上看到的是像RDD一样的编程,其实是基于DStream的,DStream相当于RDD的模版,有点像代理模式,DStream的compute等操作都是调用对应的RDD的操作。
开始扫源码:
这里所有都是基于spark 1.6
我们可以看到程序的入口,ssc.start(), intellij 进去看下,

刚开始看的时候有个疑问,598行的锁为了保持context状态的一致性,594的同步是为了ssc的方法的线程安全。622行注册一些关闭资源的钩子。最主要的行为是新开线程来运行JobScheduler,注释其实写得挺清楚的

想起之前看Databricks在YouTube上的讲座,把sparkStreaming的job比做一个传送带,每个一个时间片段,便把当前的数据打包,附上需要的计算,发给相应的执行者执行,这样的比喻其实还挺有意思的,在后面的源码走读中,会看看这样的比喻恰不恰当,具体里面的实现,明天再见!
网友评论