美文网首页
SparkStreaming源码:初始化StreamingCon

SparkStreaming源码:初始化StreamingCon

作者: Jorvi | 来源:发表于2019-12-17 15:39 被阅读0次

    源码目录


    1. 程序入口

     // 初始化 StreamingContext
     SparkConf conf = new SparkConf().setAppName("SparkStreaming_demo")
           .set("spark.streaming.stopGracefullyOnShutdown", "true");
     JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Durations.seconds(30));
    
     // 构建处理链
     Map<String, Object> kafkaParams = new HashMap<String, Object>();
     kafkaParams.put("bootstrap.servers", BOOTSTRAP_SERVERS);
     kafkaParams.put("key.deserializer", StringDeserializer.class);
     kafkaParams.put("value.deserializer", StringDeserializer.class);
     kafkaParams.put("group.id", "exampleGroup");
     kafkaParams.put("auto.offset.reset", "latest");
     kafkaParams.put("enable.auto.commit", false);
    
     Collection<String> topics = Arrays.asList("exampleTopic");
    
     JavaInputDStream<ConsumerRecord<String, String>> inputDStream = KafkaUtils.createDirectStream(streamingContext,
                    LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
    
     JavaDStream<String> transformDStream = inputDStream.transform(new Function<JavaRDD<ConsumerRecord<String, String>>, JavaRDD<String>>() {
          @Override
          public JavaRDD<String> call(JavaRDD<ConsumerRecord<String, String>> v1) throws Exception {
             JavaRDD<String> tempRdd = v1.map(new Function<ConsumerRecord<String, String>, String>() {
                 @Override
                 public String call(ConsumerRecord<String, String> v1) throws Exception {
                     return v1.value();
                 }
             });
             return tempRdd;
         }
     });
    
     JavaDStream<String> filterDStream = transformDStream.filter(new Function<String, Boolean>() {
         @Override
         public Boolean call(String v1) throws Exception {
             return StringUtils.isNotBlank(v1);
         }
     });
    
     filterDStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
         @Override
         public void call(JavaRDD<String> javaRDD) throws Exception {
             javaRDD.saveAsTextFile("/home/example/result/");
         }
     });
     
     // 启动运行
     streamingContext.start();
     streamingContext.awaitTermination();
    
    

    本文主要看StreamingContext初始化过程。

    2. 进入源码

    • 进入org.apache.spark.streaming.StreamingContext.scala
    class StreamingContext private[streaming] (
        _sc: SparkContext,
        _cp: Checkpoint,
        _batchDur: Duration
      ) extends Logging {
    
      private[streaming] val graph: DStreamGraph = {
        if (isCheckpointPresent) {
          _cp.graph.setContext(this)
          _cp.graph.restoreCheckpointData()
          _cp.graph
        } else {
          require(_batchDur != null, "Batch duration for StreamingContext cannot be null")
          val newGraph = new DStreamGraph()
          newGraph.setBatchDuration(_batchDur)
          newGraph
        }
      }
    
      private val nextInputStreamId = new AtomicInteger(0)
    
      private[streaming] var checkpointDir: String = {
        if (isCheckpointPresent) {
          sc.setCheckpointDir(_cp.checkpointDir)
          _cp.checkpointDir
        } else {
          null
        }
      }
    
      private[streaming] val scheduler = new JobScheduler(this)
    
      private[streaming] val uiTab: Option[StreamingTab] =
        if (conf.getBoolean("spark.ui.enabled", true)) {
          Some(new StreamingTab(this))
        } else {
          None
        }
    
      private var state: StreamingContextState = INITIALIZED
    
      ......
    
    }
    

    在初始化 StreamingContext 时,会:

    1. 创建 DStreamGraph,设置 BatchDuration 为 SparkStreaming 任务的批次时间;
    2. 设置 checkpointDir;
    3. 创建 JobScheduler;
    4. 创建 UITab页;
    5. 初始化状态为 INITIALIZED;
    6. 等等。
    • 再进入org.apache.spark.streaming.DStreamGraph.scala
    final private[streaming] class DStreamGraph extends Serializable with Logging {
    
      private val inputStreams = new ArrayBuffer[InputDStream[_]]()
      private val outputStreams = new ArrayBuffer[DStream[_]]()
    
      var rememberDuration: Duration = null
      var checkpointInProgress = false
    
      var zeroTime: Time = null
      var startTime: Time = null
      var batchDuration: Duration = null
    
      ......
    }
    

    创建 DStreamGraph 时,会声明 inputStreams 和 outputStreams 等。

    • 再进入org.apache.spark.streaming.scheduler.JobScheduler.scala
    private[streaming]
    class JobScheduler(val ssc: StreamingContext) extends Logging {
    
      private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]
      private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
      private val jobExecutor =
        ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
      private val jobGenerator = new JobGenerator(this)
      val clock = jobGenerator.clock
      val listenerBus = new StreamingListenerBus(ssc.sparkContext.listenerBus)
    
      // These two are created only when scheduler starts.
      // eventLoop not being null means the scheduler has been started and not stopped
      var receiverTracker: ReceiverTracker = null
      // A tracker to track all the input stream information as well as processed record number
      var inputInfoTracker: InputInfoTracker = null
    
      private var executorAllocationManager: Option[ExecutorAllocationManager] = None
    
      private var eventLoop: EventLoop[JobSchedulerEvent] = null
    
      ......
    
    }
    

    创建 JobScheduler 时,会声明 jobSets、numConcurrentJobs、jobExecutor、jobGenerator 等。

    3. 总结

    初始化 StreamingContext 时,会:

    1. 创建 DStreamGraph,声明 inputStreams 和 outputStreams 等

    2. 创建 JobScheduler,声明 jobSets、numConcurrentJobs、jobExecutor、jobGenerator 等

    相关文章

      网友评论

          本文标题:SparkStreaming源码:初始化StreamingCon

          本文链接:https://www.haomeiwen.com/subject/thezkctx.html