美文网首页
TimeWindow实现分析

TimeWindow实现分析

作者: 分裂四人组 | 来源:发表于2019-04-05 10:36 被阅读0次

    TimeWindow的定义

    • timeColumn: eventTime列;
    • windowDuration: 窗口的宽度;
    • slideDuration: 每次滑动的步长;
    • startTime: 相对于1970-01-01 00:00:00 UTC开启窗口周期调度的开始时间,比如说想小时级别15min开始滚动滑动,e.g. 12:15-13:15, 13:15-14:15...,这时startTime就是15 minutes
    case class TimeWindow(
        timeColumn: Expression,
        windowDuration: Long,
        slideDuration: Long,
        startTime: Long)
    

    TimeWindowing Rule

    目的: 针对于滑窗,时间列会被多个窗口用到,所以需要基于Expand算子将一个时间列映射到多个时间窗口中去;

    实现步骤:

       * maxNumOverlapping <- ceil(windowDuration / slideDuration)
       * for (i <- 0 until maxNumOverlapping)
       *   windowId <- ceil((timestamp - startTime) / slideDuration)
       *   windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTime
       *   windowEnd <- windowStart + windowDuration
       *   return windowStart, windowEnd
    
    • 计算总共重叠的窗口个数:ceil(windowDuration / slideDuration) = maxNumOverlapping;
    • 从0开始迭代到maxNumOverlapping,计算Window;
    • 在真实的实现中,区分了滚动窗口和滑动窗口:
      • 如果是滚动窗口,其maxNumOverlapping就是1,直接转换成Window返回即可;
      • 如果是滑动窗口,其maxNumOverlapping>1,需要基于Expand算子整合多个Window,达到一次input映射到多个Window的目的;
    • 需要注意的时最终的结果其实是一个包含WINDOW_STARTWINDOW_END的struct,其实也就是13:15-14:15这样的表达式,并映射为EventTime列,并不是WindowExpression

    具体实现代码分析:

    val window = windowExpressions.head
    
            val metadata = window.timeColumn match {
              case a: Attribute => a.metadata
              case _ => Metadata.empty
            }
     
           // 根据传入的i值,获取第i个窗口:
          // - 在滚动窗口中,这个值即为0;
          // - 在滑动窗口中,这个值即为`0 until maxNumOverlapping`;
            def getWindow(i: Int, overlappingWindows: Int): Expression = {
              val division = (PreciseTimestampConversion(
                window.timeColumn, TimestampType, LongType) - window.startTime) / window.slideDuration
              val ceil = Ceil(division)
              // if the division is equal to the ceiling, our record is the start of a window
              val windowId = CaseWhen(Seq((ceil === division, ceil + 1)), Some(ceil))
              val windowStart = (windowId + i - overlappingWindows) *
                window.slideDuration + window.startTime
              val windowEnd = windowStart + window.windowDuration
              // 注意这个结果:最终生成的其实是一个包含`WINDOW_START`和`WINDOW_END`的struct,其实也就是`13:15-14:15`这样的表达式;
              CreateNamedStruct(
                Literal(WINDOW_START) ::
                  PreciseTimestampConversion(windowStart, LongType, TimestampType) ::
                  Literal(WINDOW_END) ::
                  PreciseTimestampConversion(windowEnd, LongType, TimestampType) ::
                  Nil)
            }
    
            val windowAttr = AttributeReference(
              WINDOW_COL_NAME, window.dataType, metadata = metadata)()
    
            if (window.windowDuration == window.slideDuration) {
            // 滚动窗口的实现,这里将通过getWindow获得的struct表达式alias为COL_NAME
              val windowStruct = Alias(getWindow(0, 1), WINDOW_COL_NAME)(
                exprId = windowAttr.exprId, explicitMetadata = Some(metadata))
    
              val replacedPlan = p transformExpressions {
                case t: TimeWindow => windowAttr
              }
    
              // For backwards compatibility we add a filter to filter out nulls
              val filterExpr = IsNotNull(window.timeColumn)
    
              replacedPlan.withNewChildren(
                Filter(filterExpr,
                  Project(windowStruct +: child.output, child)) :: Nil)
            } else {
             // 滑动窗口的实现
              val overlappingWindows =
                math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt
            // 基于`tabulate`遍历实现伪代码中的迭代
              val windows =
                Seq.tabulate(overlappingWindows)(i => getWindow(i, overlappingWindows))
    
              val projections = windows.map(_ +: child.output)
    
              val filterExpr =
                window.timeColumn >= windowAttr.getField(WINDOW_START) &&
                  window.timeColumn < windowAttr.getField(WINDOW_END)
              
             // 最后基于Expand算子实现多个Window的映射
              val substitutedPlan = Filter(filterExpr,
                Expand(projections, windowAttr +: child.output, child))
    
              val renamedPlan = p transformExpressions {
                case t: TimeWindow => windowAttr
              }
    
              renamedPlan.withNewChildren(substitutedPlan :: Nil)
            
    

    相关文章

      网友评论

          本文标题:TimeWindow实现分析

          本文链接:https://www.haomeiwen.com/subject/fwbdbqtx.html