Struct Streaming

作者: 薛定谔的猫Plus | 来源:发表于2018-09-21 15:23 被阅读2次

    在Struct Streaming中增加了支持sql处理流数据,在sql包中单独处理,其中StreamExecution是下面提到两处流处理的基类,这个流查询在数据源有新数据到达时会生成一个QueryExecution来执行并将结果输出到指定的Sink(处理后数据存放地)中。


    StreamExecution.png

    MicroBatchExecution

    该部分是小批量处理,默认使用ProcessingTimeExecutor这个trigger定时出发,使用的是系统时钟

    case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock = new SystemClock())
      extends TriggerExecutor with Logging {
    
      private val intervalMs = processingTime.intervalMs
      require(intervalMs >= 0)
    
      override def execute(triggerHandler: () => Boolean): Unit = {
        while (true) {
          val triggerTimeMs = clock.getTimeMillis
          val nextTriggerTimeMs = nextBatchTime(triggerTimeMs)
          val terminated = !triggerHandler()
          if (intervalMs > 0) {
            val batchElapsedTimeMs = clock.getTimeMillis - triggerTimeMs
            if (batchElapsedTimeMs > intervalMs) {
              notifyBatchFallingBehind(batchElapsedTimeMs)
            }
            if (terminated) {
              return
            }
            clock.waitTillTime(nextTriggerTimeMs)
          } else {
            if (terminated) {
              return
            }
          }
        }
      }
    

    该执行逻辑是首先生成一个逻辑计划,标记是从什么数据源抽取数据

    override lazy val logicalPlan: LogicalPlan = {
        assert(queryExecutionThread eq Thread.currentThread,
          "logicalPlan must be initialized in QueryExecutionThread " +
            s"but the current thread was ${Thread.currentThread}")
        var nextSourceId = 0L
        val toExecutionRelationMap = MutableMap[StreamingRelation, StreamingExecutionRelation]()
        val v2ToExecutionRelationMap = MutableMap[StreamingRelationV2, StreamingExecutionRelation]()
        // We transform each distinct streaming relation into a StreamingExecutionRelation, keeping a
        // map as we go to ensure each identical relation gets the same StreamingExecutionRelation
        // object. For each microbatch, the StreamingExecutionRelation will be replaced with a logical
        // plan for the data within that batch.
        // Note that we have to use the previous `output` as attributes in StreamingExecutionRelation,
        // since the existing logical plan has already used those attributes. The per-microbatch
        // transformation is responsible for replacing attributes with their final values.
        val _logicalPlan = analyzedPlan.transform {
          case streamingRelation@StreamingRelation(dataSource, _, output) =>
            toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
              // Materialize source to avoid creating it in every batch
              val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
              val source = dataSource.createSource(metadataPath)
              nextSourceId += 1
              StreamingExecutionRelation(source, output)(sparkSession)
            })
          case s@StreamingRelationV2(source: MicroBatchReadSupport, _, options, output, _) =>
            v2ToExecutionRelationMap.getOrElseUpdate(s, {
              // Materialize source to avoid creating it in every batch
              val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
              val reader = source.createMicroBatchReader(
                Optional.empty(), // user specified schema
                metadataPath,
                new DataSourceOptions(options.asJava))
              nextSourceId += 1
              StreamingExecutionRelation(reader, output)(sparkSession)
            })
          case s@StreamingRelationV2(_, sourceName, _, output, v1Relation) =>
            v2ToExecutionRelationMap.getOrElseUpdate(s, {
              // Materialize source to avoid creating it in every batch
              val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
              if (v1Relation.isEmpty) {
                throw new UnsupportedOperationException(
                  s"Data source $sourceName does not support microbatch processing.")
              }
              val source = v1Relation.get.dataSource.createSource(metadataPath)
              nextSourceId += 1
              StreamingExecutionRelation(source, output)(sparkSession)
            })
        }
        sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
        uniqueSources = sources.distinct
        _logicalPlan
      }
    

    以kafka为例,在执行过程中构建kafka的offset范围,在populateStartOffsets以及constructNextBatch这两个方法中完成kafka的offset范围,接下来在runBatch中完成数据数据抽取

    newData = reportTimeTaken("getBatch") {
          availableOffsets.flatMap {
            case (source: Source, available)
              if committedOffsets.get(source).map(_ != available).getOrElse(true) =>
              val current = committedOffsets.get(source)
              //这部分逻辑基于传入的起始offset范围(包含了每个partition的offset范围)形成一个kafka的DataFrame
              val batch = source.getBatch(current, available)
    

    基于该部分生成的DataFrame,替换最开始logicPlan中的数据源

    val newBatchesPlan = logicalPlan transform {
          case StreamingExecutionRelation(source, output) =>
            newData.get(source).map { dataPlan =>
              assert(output.size == dataPlan.output.size,
                s"Invalid batch: ${Utils.truncatedString(output, ",")} != " +
                  s"${Utils.truncatedString(dataPlan.output, ",")}")
              replacements ++= output.zip(dataPlan.output)
              dataPlan
            }.getOrElse {
              LocalRelation(output, isStreaming = true)
            }
        }
    

    后续基于此逻辑计划new一个IncrementalExecution形成执行计划

    reportTimeTaken("queryPlanning") {
          lastExecution = new IncrementalExecution(
            sparkSessionToRunBatch,
            triggerLogicalPlan,
            outputMode,
            checkpointFile("state"),
            runId,
            currentBatchId,
            offsetSeqMetadata)
          lastExecution.executedPlan // Force the lazy generation of execution plan
        }
    
        val nextBatch =
          new Dataset(sparkSessionToRunBatch, lastExecution, RowEncoder(lastExecution.analyzed.schema))
    

    接下来基于不同的sink进行处理,其中SQLExecution.withNewExecutionId主要是为了跟踪jobs的信息

    reportTimeTaken("addBatch") {
          SQLExecution.withNewExecutionId(sparkSessionToRunBatch, lastExecution) {
            sink match {
              case s: Sink =>
                if (s.isInstanceOf[MemorySinkExtend]) {
                  s.addBatch(currentBatchId, nextBatch, batchIdOffsetMap.get(currentBatchId).getOrElse((None, None)))
                } else {
                  s.addBatch(currentBatchId, nextBatch, (None, None))
                }
              case _: StreamWriteSupport =>
                // This doesn't accumulate any data - it just forces execution of the microbatch writer.
                nextBatch.collect()
            }
          }
        }
    

    其中遗留一个问题是在计算过程中水印(watermark)的处理如何,我们继续分析。
    在执行过程中会随着数据中的事件时更新watermark时间

    if (hasNewData) {
          var batchWatermarkMs = offsetSeqMetadata.batchWatermarkMs
          // Update the eventTime watermarks if we find any in the plan.
          if (lastExecution != null) {
            lastExecution.executedPlan.collect {
              case e: EventTimeWatermarkExec => e
            }.zipWithIndex.foreach {
              case (e, index) if e.eventTimeStats.value.count > 0 =>
                logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}")
                val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
                val prevWatermarkMs = watermarkMsMap.get(index)
                if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) {
                  watermarkMsMap.put(index, newWatermarkMs)
                }
    
    

    在随后执行阶段,基于该watermark生成表达式,然后在输出数据时进行过滤
    //statefulOperators.scala

    lazy val watermarkExpression: Option[Expression] = {
        WatermarkSupport.watermarkExpression(
          child.output.find(_.metadata.contains(EventTimeWatermark.delayKey)),
          eventTimeWatermark)
      }
    
      /** Predicate based on keys that matches data older than the watermark */
      lazy val watermarkPredicateForKeys: Option[Predicate] = watermarkExpression.flatMap { e =>
        if (keyExpressions.exists(_.metadata.contains(EventTimeWatermark.delayKey))) {
          Some(newPredicate(e, keyExpressions))
        } else {
          None
        }
      }
    
      /** Predicate based on the child output that matches data older than the watermark. */
      lazy val watermarkPredicateForData: Option[Predicate] =
        watermarkExpression.map(newPredicate(_, child.output))
    

    在输出阶段,根据输出模式不同,根据watermark时间从HDFSBackedStateStoreProvider中过滤聚合后的数据,以及删除存储的一些聚合数据


    streaming-outputMode.png

    ContinusExecution

    该执行逻辑与上面类似,只是这部分在保存offset信息是异步方式,流中的数据一直在处理。

    相关文章

      网友评论

        本文标题:Struct Streaming

        本文链接:https://www.haomeiwen.com/subject/tkaanftx.html