美文网首页
TimeWindow实现分析

TimeWindow实现分析

作者: 分裂四人组 | 来源:发表于2019-04-05 10:36 被阅读0次

TimeWindow的定义

  • timeColumn: eventTime列;
  • windowDuration: 窗口的宽度;
  • slideDuration: 每次滑动的步长;
  • startTime: 相对于1970-01-01 00:00:00 UTC开启窗口周期调度的开始时间,比如说想小时级别15min开始滚动滑动,e.g. 12:15-13:15, 13:15-14:15...,这时startTime就是15 minutes
case class TimeWindow(
    timeColumn: Expression,
    windowDuration: Long,
    slideDuration: Long,
    startTime: Long)

TimeWindowing Rule

目的: 针对于滑窗,时间列会被多个窗口用到,所以需要基于Expand算子将一个时间列映射到多个时间窗口中去;

实现步骤:

   * maxNumOverlapping <- ceil(windowDuration / slideDuration)
   * for (i <- 0 until maxNumOverlapping)
   *   windowId <- ceil((timestamp - startTime) / slideDuration)
   *   windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTime
   *   windowEnd <- windowStart + windowDuration
   *   return windowStart, windowEnd
  • 计算总共重叠的窗口个数:ceil(windowDuration / slideDuration) = maxNumOverlapping;
  • 从0开始迭代到maxNumOverlapping,计算Window;
  • 在真实的实现中,区分了滚动窗口和滑动窗口:
    • 如果是滚动窗口,其maxNumOverlapping就是1,直接转换成Window返回即可;
    • 如果是滑动窗口,其maxNumOverlapping>1,需要基于Expand算子整合多个Window,达到一次input映射到多个Window的目的;
  • 需要注意的时最终的结果其实是一个包含WINDOW_STARTWINDOW_END的struct,其实也就是13:15-14:15这样的表达式,并映射为EventTime列,并不是WindowExpression

具体实现代码分析:

val window = windowExpressions.head

        val metadata = window.timeColumn match {
          case a: Attribute => a.metadata
          case _ => Metadata.empty
        }
 
       // 根据传入的i值,获取第i个窗口:
      // - 在滚动窗口中,这个值即为0;
      // - 在滑动窗口中,这个值即为`0 until maxNumOverlapping`;
        def getWindow(i: Int, overlappingWindows: Int): Expression = {
          val division = (PreciseTimestampConversion(
            window.timeColumn, TimestampType, LongType) - window.startTime) / window.slideDuration
          val ceil = Ceil(division)
          // if the division is equal to the ceiling, our record is the start of a window
          val windowId = CaseWhen(Seq((ceil === division, ceil + 1)), Some(ceil))
          val windowStart = (windowId + i - overlappingWindows) *
            window.slideDuration + window.startTime
          val windowEnd = windowStart + window.windowDuration
          // 注意这个结果:最终生成的其实是一个包含`WINDOW_START`和`WINDOW_END`的struct,其实也就是`13:15-14:15`这样的表达式;
          CreateNamedStruct(
            Literal(WINDOW_START) ::
              PreciseTimestampConversion(windowStart, LongType, TimestampType) ::
              Literal(WINDOW_END) ::
              PreciseTimestampConversion(windowEnd, LongType, TimestampType) ::
              Nil)
        }

        val windowAttr = AttributeReference(
          WINDOW_COL_NAME, window.dataType, metadata = metadata)()

        if (window.windowDuration == window.slideDuration) {
        // 滚动窗口的实现,这里将通过getWindow获得的struct表达式alias为COL_NAME
          val windowStruct = Alias(getWindow(0, 1), WINDOW_COL_NAME)(
            exprId = windowAttr.exprId, explicitMetadata = Some(metadata))

          val replacedPlan = p transformExpressions {
            case t: TimeWindow => windowAttr
          }

          // For backwards compatibility we add a filter to filter out nulls
          val filterExpr = IsNotNull(window.timeColumn)

          replacedPlan.withNewChildren(
            Filter(filterExpr,
              Project(windowStruct +: child.output, child)) :: Nil)
        } else {
         // 滑动窗口的实现
          val overlappingWindows =
            math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt
        // 基于`tabulate`遍历实现伪代码中的迭代
          val windows =
            Seq.tabulate(overlappingWindows)(i => getWindow(i, overlappingWindows))

          val projections = windows.map(_ +: child.output)

          val filterExpr =
            window.timeColumn >= windowAttr.getField(WINDOW_START) &&
              window.timeColumn < windowAttr.getField(WINDOW_END)
          
         // 最后基于Expand算子实现多个Window的映射
          val substitutedPlan = Filter(filterExpr,
            Expand(projections, windowAttr +: child.output, child))

          val renamedPlan = p transformExpressions {
            case t: TimeWindow => windowAttr
          }

          renamedPlan.withNewChildren(substitutedPlan :: Nil)
        

相关文章

网友评论

      本文标题:TimeWindow实现分析

      本文链接:https://www.haomeiwen.com/subject/fwbdbqtx.html