美文网首页
Spark-streaming源码走读(序)

Spark-streaming源码走读(序)

作者: 小五_555 | 来源:发表于2017-03-11 21:11 被阅读0次

最近刚好有点时间, 也把Spark代码看了看,这个系列会一直更新下去,由于本人的scala功底也不是太强,这里权当记录了,如果有人看到我的文章,发现其中的错误,也请大家指正。

hello world

package com.dubin

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by dubin on 17/2/9.
  */
object NetworkWordCount {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("first spark streaming app")
    val ssc = new StreamingContext(sparkConf, Seconds(10))
    ssc.checkpoint("/tmp")

    val addFunc = (currValues: Seq[Int], prevValue: Option[Int]) => {
      val currentCount = currValues.sum
      val prevCount = prevValue.getOrElse(0)
      Some(currentCount + prevCount)
    }

    val lines = ssc.socketTextStream(args(0), args(1).toInt)
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))

    val totalWordCounts = pairs.updateStateByKey[Int](addFunc)
    totalWordCounts.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

Spark Core里的RDD的概念有比较重要的一条是RDD创建之后就不会改变了,那像streaming这种应用,表面上看到的是像RDD一样的编程,其实是基于DStream的,DStream相当于RDD的模版,有点像代理模式,DStream的compute等操作都是调用对应的RDD的操作。

开始扫源码:

这里所有都是基于spark 1.6
我们可以看到程序的入口,ssc.start(), intellij 进去看下,



刚开始看的时候有个疑问,598行的锁为了保持context状态的一致性,594的同步是为了ssc的方法的线程安全。622行注册一些关闭资源的钩子。最主要的行为是新开线程来运行JobScheduler,注释其实写得挺清楚的


想起之前看Databricks在YouTube上的讲座,把sparkStreaming的job比做一个传送带,每个一个时间片段,便把当前的数据打包,附上需要的计算,发给相应的执行者执行,这样的比喻其实还挺有意思的,在后面的源码走读中,会看看这样的比喻恰不恰当,具体里面的实现,明天再见!

相关文章

网友评论

      本文标题:Spark-streaming源码走读(序)

      本文链接:https://www.haomeiwen.com/subject/mbrzgttx.html