美文网首页工作生活
SparkStreaming源码:构建处理链

SparkStreaming源码:构建处理链

作者: Jorvi | 来源:发表于2019-12-17 15:40 被阅读0次

    源码目录


    1. 程序入口

     // 初始化 StreamingContext
     SparkConf conf = new SparkConf().setAppName("SparkStreaming_demo")
           .set("spark.streaming.stopGracefullyOnShutdown", "true");
     JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(30));
    
     // 构建处理链
     Map<String, Object> kafkaParams = new HashMap<String, Object>();
     kafkaParams.put("bootstrap.servers", BOOTSTRAP_SERVERS);
     kafkaParams.put("key.deserializer", StringDeserializer.class);
     kafkaParams.put("value.deserializer", StringDeserializer.class);
     kafkaParams.put("group.id", "exampleGroup");
     kafkaParams.put("auto.offset.reset", "latest");
     kafkaParams.put("enable.auto.commit", false);
    
     Collection<String> topics = Arrays.asList("exampleTopic");
    
     JavaInputDStream<ConsumerRecord<String, String>> inputDStream = KafkaUtils.createDirectStream(streamingContext,
                    LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
    
     JavaDStream<String> transformDStream = inputDStream.transform(new Function<JavaRDD<ConsumerRecord<String, String>>, JavaRDD<String>>() {
          @Override
          public JavaRDD<String> call(JavaRDD<ConsumerRecord<String, String>> v1) throws Exception {
             JavaRDD<String> tempRdd = v1.map(new Function<ConsumerRecord<String, String>, String>() {
                 @Override
                 public String call(ConsumerRecord<String, String> v1) throws Exception {
                     return v1.value();
                 }
             });
             return tempRdd;
         }
     });
    
     JavaDStream<String> filterDStream = transformDStream.filter(new Function<String, Boolean>() {
         @Override
         public Boolean call(String v1) throws Exception {
             return StringUtils.isNotBlank(v1);
         }
     });
    
     filterDStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
         @Override
         public void call(JavaRDD<String> javaRDD) throws Exception {
             javaRDD.saveAsTextFile("/home/example/result/");
         }
     });
     
     // 启动运行
     streamingContext.start();
     streamingContext.awaitTermination();
    
    

    本文主要看SparkStreaming链式处理过程的构建。

    2. 进入源码

    2.1 创建 InputDStream

    • 进入 org.apache.spark.streaming.kafka010.KafkaUtils.scala
      def createDirectStream[K, V](
          jssc: JavaStreamingContext,
          locationStrategy: LocationStrategy,
          consumerStrategy: ConsumerStrategy[K, V]
        ): JavaInputDStream[ConsumerRecord[K, V]] = {
        new JavaInputDStream(
          createDirectStream[K, V](
            jssc.ssc, locationStrategy, consumerStrategy))
      }
    
    
      def createDirectStream[K, V](
          ssc: StreamingContext,
          locationStrategy: LocationStrategy,
          consumerStrategy: ConsumerStrategy[K, V]
        ): InputDStream[ConsumerRecord[K, V]] = {
        new DirectKafkaInputDStream[K, V](ssc, locationStrategy, consumerStrategy)
      }
    

    createDirectStream时会直接 new DirectKafkaInputDStream 出来。

    • 进入org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.scala
    private[spark] class DirectKafkaInputDStream[K, V](
        _ssc: StreamingContext,
        locationStrategy: LocationStrategy,
        consumerStrategy: ConsumerStrategy[K, V]
      ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets {
    
      override protected[streaming] val rateController: Option[RateController] = {
        if (RateController.isBackPressureEnabled(ssc.conf)) {
          Some(new DirectKafkaRateController(id,
            RateEstimator.create(ssc.conf, context.graph.batchDuration)))
        } else {
          None
        }
      }
    
      private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt(
        "spark.streaming.kafka.maxRatePerPartition", 0)
    
      ......
    
    }
    

    在创建DirectKafkaInputDStream 时,会:

    1. 创建 rateController,此时会读取配置spark.streaming.backpressure.enabled看反压机制是否打开, 如果打开了则构建一个新的速率推算器作为 rateController;
    2. 读取配置spark.streaming.kafka.maxRatePerPartition看是否配置了各分区最大消费速率,如果配置了则配合上面的 rateController 一起来控制 offset 的变化,从而控制每个批次拉取的数据量;
    3. 等等

    DirectKafkaInputDStream 是 InputDStream 的子类,因此会先构造 InputDStream。

    • 进入org.apache.spark.streaming.dstream.InputDStream.scala
    abstract class InputDStream[T: ClassTag](_ssc: StreamingContext)
      extends DStream[T](_ssc) {
    
      ssc.graph.addInputStream(this)
    
      val id = ssc.getNewInputStreamId()
    
      ......
    
    }
    

    此处将构造的 DirectKafkaInputDStream 加入到之前初始化完毕的 StreamingContext 的 DStreamGraph 的 inputStreams 中。

    2.2 transform 操作

    • 进入org.apache.spark.streaming.api.java.AbstractJavaDStreamLike.scala
      def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = {
        implicit val cm: ClassTag[U] = fakeClassTag
    
        def scalaTransform (in: RDD[T]): RDD[U] =
          transformFunc.call(wrapRDD(in)).rdd
        dstream.transform(scalaTransform(_))
      }
    
    • 再进入org.apache.spark.streaming.dstream.DStream.scala
      def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = ssc.withScope {
        // because the DStream is reachable from the outer object here, and because
        // DStreams can't be serialized with closures, we can't proactively check
        // it for serializability and so we pass the optional false to SparkContext.clean
        val cleanedF = context.sparkContext.clean(transformFunc, false)
        transform((r: RDD[T], _: Time) => cleanedF(r))
      }
    
    
      def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = ssc.withScope {
        // because the DStream is reachable from the outer object here, and because
        // DStreams can't be serialized with closures, we can't proactively check
        // it for serializability and so we pass the optional false to SparkContext.clean
        val cleanedF = context.sparkContext.clean(transformFunc, false)
        val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
          assert(rdds.length == 1)
          cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
        }
        new TransformedDStream[U](Seq(this), realTransformFunc)
      }
    

    transform 操作最终会创建 TransformedDStream。

    • 进入org.apache.spark.streaming.dstream.TransformedDStream.scala
    private[streaming]
    class TransformedDStream[U: ClassTag] (
        parents: Seq[DStream[_]],
        transformFunc: (Seq[RDD[_]], Time) => RDD[U]
      ) extends DStream[U](parents.head.ssc) {
    
      override def dependencies: List[DStream[_]] = parents.toList
    
      override def slideDuration: Duration = parents.head.slideDuration
    
      ......
    }
    

    创建 TransformedDStream 时会设置 dependencies,即:
    dependencies = parents.toList,而 parents 即为上面new TransformedDStream[U](Seq(this), realTransformFunc)中的 Seq(this),而 this 就是 inputDStream.transform中调用 transform 操作的 inputDStream。

    至此,TransformedDStream 通过内部声明的 dependencies 与上一链的 DirectKafkaInputDStream 链接在了一起

    2.3 filter 操作

    与 transform 操作类似,filter 操作会创建 FilteredDStream,在 FilteredDStream 内部同样使用 dependencies 将 自身与 上一链的 TransformedDStream 链接在了一起。

    • 进入org.apache.spark.streaming.dstream.FilteredDStream.scala
    private[streaming]
    class FilteredDStream[T: ClassTag](
        parent: DStream[T],
        filterFunc: T => Boolean
      ) extends DStream[T](parent.ssc) {
    
      override def dependencies: List[DStream[_]] = List(parent)
    
      override def slideDuration: Duration = parent.slideDuration
    
      ......
    }
    

    2.4 foreachRDD 操作

    • 进入org.apache.spark.streaming.dstream.DStream.scala
      private def foreachRDD(
          foreachFunc: (RDD[T], Time) => Unit,
          displayInnerRDDOps: Boolean): Unit = {
        new ForEachDStream(this,
          context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
      }
    

    foreachRDD 操作中:

    1. 创建 ForEachDStream
    2. 调用 register() 注册
    2.4.1 创建 ForEachDStream
    • 进入org.apache.spark.streaming.dstream.ForEachDStream.scala
    private[streaming]
    class ForEachDStream[T: ClassTag] (
        parent: DStream[T],
        foreachFunc: (RDD[T], Time) => Unit,
        displayInnerRDDOps: Boolean
      ) extends DStream[Unit](parent.ssc) {
    
      override def dependencies: List[DStream[_]] = List(parent)
    
      override def slideDuration: Duration = parent.slideDuration
    
      ......
    }
    

    与 前面类似,在 ForEachDStream 内部同样使用 dependencies 将 自身与 上一链的 FilteredDStream 链接在了一起。

    2.4.2 调用register()注册
    • 进入org.apache.spark.streaming.dstream.DStream.scala
      private[streaming] def register(): DStream[T] = {
        ssc.graph.addOutputStream(this)
        this
      }
    

    此处将刚刚创建的 ForEachDStream 加入 DStreamGraph 的 outputStreams 中。

    注意:
    SparkStreaming 所有的 Output 操作都会生成 ForEachDStream。
    即所有的 Output 操作都会注册到 DStreamGraph 的 outputStreams 中。

    例如:print 操作最后调用的就是 foreachRDD

      def print(num: Int): Unit = ssc.withScope {
        def foreachFunc: (RDD[T], Time) => Unit = {
          (rdd: RDD[T], time: Time) => {
            val firstNum = rdd.take(num + 1)
            // scalastyle:off println
            println("-------------------------------------------")
            println(s"Time: $time")
            println("-------------------------------------------")
            firstNum.take(num).foreach(println)
            if (firstNum.length > num) println("...")
            println()
            // scalastyle:on println
          }
        }
        foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
      }
    

    3. 总结

    1. 创建 InputDStream,并加入到 DStreamGraph 的 inputStreams 中。

    2. SparkStreaming的每个算子都会产生相应的 DStream(例如 FilteredDStream、ForEachDStream等)。

    3. 这些 DStream 内部使用 dependencies 来链接上一链的 DStream。

    4. 利用一个一个 DStream 的链接,构建出了 SparkStreaming 的处理链。

    5. 所有的 Output 操作都调用 foreachRDD 算子来运算,foreachRDD 算子将产生 ForEachDStream,并将 ForEachDStream 加入到 DStreamGraph 的 outputStreams 中。

    相关文章

      网友评论

        本文标题:SparkStreaming源码:构建处理链

        本文链接:https://www.haomeiwen.com/subject/dxoncctx.html