美文网首页
聊聊flink Table的Over Windows

聊聊flink Table的Over Windows

作者: go4it | 来源:发表于2019-01-27 11:52 被阅读17次

    本文主要研究一下flink Table的Over Windows

    实例

    Table table = input
      .window([OverWindow w].as("w"))           // define over window with alias w
      .select("a, b.sum over w, c.min over w"); // aggregate over the over window w
    
    • Over Windows类似SQL的over子句,它可以基于event-time、processing-time或者row-count;具体可以通过Over类来构造,其中必须设置orderBy、preceding及as方法;它有Unbounded及Bounded两大类

    Unbounded Over Windows实例

    
    // Unbounded Event-time over window (assuming an event-time attribute "rowtime")
    .window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_range").as("w"));
    
    // Unbounded Processing-time over window (assuming a processing-time attribute "proctime")
    .window(Over.partitionBy("a").orderBy("proctime").preceding("unbounded_range").as("w"));
    
    // Unbounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
    .window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_row").as("w"));
     
    // Unbounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
    .window(Over.partitionBy("a").orderBy("proctime").preceding("unbounded_row").as("w"));
    
    • 对于event-time及processing-time使用unbounded_range来表示Unbounded,对于row-count使用unbounded_row来表示Unbounded

    Bounded Over Windows实例

    // Bounded Event-time over window (assuming an event-time attribute "rowtime")
    .window(Over.partitionBy("a").orderBy("rowtime").preceding("1.minutes").as("w"))
    
    // Bounded Processing-time over window (assuming a processing-time attribute "proctime")
    .window(Over.partitionBy("a").orderBy("proctime").preceding("1.minutes").as("w"))
    
    // Bounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
    .window(Over.partitionBy("a").orderBy("rowtime").preceding("10.rows").as("w"))
     
    // Bounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
    .window(Over.partitionBy("a").orderBy("proctime").preceding("10.rows").as("w"))
    
    • 对于event-time及processing-time使用诸如1.minutes来表示Bounded,对于row-count使用诸如10.rows来表示Bounded

    Table.window

    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

    class Table(
        private[flink] val tableEnv: TableEnvironment,
        private[flink] val logicalPlan: LogicalNode) {
    
      //......  
    
      @varargs
      def window(overWindows: OverWindow*): OverWindowedTable = {
    
        if (tableEnv.isInstanceOf[BatchTableEnvironment]) {
          throw new TableException("Over-windows for batch tables are currently not supported.")
        }
    
        if (overWindows.size != 1) {
          throw new TableException("Over-Windows are currently only supported single window.")
        }
    
        new OverWindowedTable(this, overWindows.toArray)
      }
    
      //......
    
    }    
    
    • Table提供了OverWindow参数的window方法,用来进行Over Windows操作,它创建的是OverWindowedTable

    OverWindow

    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/windows.scala

    /**
      * Over window is similar to the traditional OVER SQL.
      */
    case class OverWindow(
        private[flink] val alias: Expression,
        private[flink] val partitionBy: Seq[Expression],
        private[flink] val orderBy: Expression,
        private[flink] val preceding: Expression,
        private[flink] val following: Expression)
    
    • OverWindow定义了alias、partitionBy、orderBy、preceding、following属性

    Over

    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/java/windows.scala

    object Over {
    
      /**
        * Specifies the time attribute on which rows are grouped.
        *
        * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to specify time mode.
        *
        * For batch tables, refer to a timestamp or long attribute.
        */
      def orderBy(orderBy: String): OverWindowWithOrderBy = {
        val orderByExpr = ExpressionParser.parseExpression(orderBy)
        new OverWindowWithOrderBy(Array[Expression](), orderByExpr)
      }
    
      /**
        * Partitions the elements on some partition keys.
        *
        * @param partitionBy some partition keys.
        * @return A partitionedOver instance that only contains the orderBy method.
        */
      def partitionBy(partitionBy: String): PartitionedOver = {
        val partitionByExpr = ExpressionParser.parseExpressionList(partitionBy).toArray
        new PartitionedOver(partitionByExpr)
      }
    }
    
    class OverWindowWithOrderBy(
      private val partitionByExpr: Array[Expression],
      private val orderByExpr: Expression) {
    
      /**
        * Set the preceding offset (based on time or row-count intervals) for over window.
        *
        * @param preceding preceding offset relative to the current row.
        * @return this over window
        */
      def preceding(preceding: String): OverWindowWithPreceding = {
        val precedingExpr = ExpressionParser.parseExpression(preceding)
        new OverWindowWithPreceding(partitionByExpr, orderByExpr, precedingExpr)
      }
    
    }
    
    class PartitionedOver(private val partitionByExpr: Array[Expression]) {
    
      /**
        * Specifies the time attribute on which rows are grouped.
        *
        * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to specify time mode.
        *
        * For batch tables, refer to a timestamp or long attribute.
        */
      def orderBy(orderBy: String): OverWindowWithOrderBy = {
        val orderByExpr = ExpressionParser.parseExpression(orderBy)
        new OverWindowWithOrderBy(partitionByExpr, orderByExpr)
      }
    }
    
    class OverWindowWithPreceding(
        private val partitionBy: Seq[Expression],
        private val orderBy: Expression,
        private val preceding: Expression) {
    
      private[flink] var following: Expression = _
    
      /**
        * Assigns an alias for this window that the following `select()` clause can refer to.
        *
        * @param alias alias for this over window
        * @return over window
        */
      def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias))
    
      /**
        * Assigns an alias for this window that the following `select()` clause can refer to.
        *
        * @param alias alias for this over window
        * @return over window
        */
      def as(alias: Expression): OverWindow = {
    
        // set following to CURRENT_ROW / CURRENT_RANGE if not defined
        if (null == following) {
          if (preceding.resultType.isInstanceOf[RowIntervalTypeInfo]) {
            following = CURRENT_ROW
          } else {
            following = CURRENT_RANGE
          }
        }
        OverWindow(alias, partitionBy, orderBy, preceding, following)
      }
    
      /**
        * Set the following offset (based on time or row-count intervals) for over window.
        *
        * @param following following offset that relative to the current row.
        * @return this over window
        */
      def following(following: String): OverWindowWithPreceding = {
        this.following(ExpressionParser.parseExpression(following))
      }
    
      /**
        * Set the following offset (based on time or row-count intervals) for over window.
        *
        * @param following following offset that relative to the current row.
        * @return this over window
        */
      def following(following: Expression): OverWindowWithPreceding = {
        this.following = following
        this
      }
    }
    
    • Over类是创建over window的帮助类,它提供了orderBy及partitionBy两个方法,分别创建的是OverWindowWithOrderBy及PartitionedOver
    • PartitionedOver提供了orderBy方法,创建的是OverWindowWithOrderBy;OverWindowWithOrderBy提供了preceding方法,创建的是OverWindowWithPreceding
    • OverWindowWithPreceding则包含了partitionBy、orderBy、preceding属性,它提供了as方法创建OverWindow,另外还提供了following方法用于设置following offset

    OverWindowedTable

    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

    class OverWindowedTable(
        private[flink] val table: Table,
        private[flink] val overWindows: Array[OverWindow]) {
    
      def select(fields: Expression*): Table = {
        val expandedFields = expandProjectList(
          fields,
          table.logicalPlan,
          table.tableEnv)
    
        if(fields.exists(_.isInstanceOf[WindowProperty])){
          throw new ValidationException(
            "Window start and end properties are not available for Over windows.")
        }
    
        val expandedOverFields = resolveOverWindows(expandedFields, overWindows, table.tableEnv)
    
        new Table(
          table.tableEnv,
          Project(
            expandedOverFields.map(UnresolvedAlias),
            table.logicalPlan,
            // required for proper projection push down
            explicitAlias = true)
            .validate(table.tableEnv)
        )
      }
    
      def select(fields: String): Table = {
        val fieldExprs = ExpressionParser.parseExpressionList(fields)
        //get the correct expression for AggFunctionCall
        val withResolvedAggFunctionCall = fieldExprs.map(replaceAggFunctionCall(_, table.tableEnv))
        select(withResolvedAggFunctionCall: _*)
      }
    }
    
    • OverWindowedTable构造器需要overWindows参数;它只提供select操作,其中select可以接收String类型的参数,也可以接收Expression类型的参数;String类型的参数会被转换为Expression类型,最后调用的是Expression类型参数的select方法;select方法创建了新的Table,其Project的projectList为expandedOverFields.map(UnresolvedAlias),而expandedOverFields则通过resolveOverWindows(expandedFields, overWindows, table.tableEnv)得到

    小结

    • Over Windows类似SQL的over子句,它可以基于event-time、processing-time或者row-count;具体可以通过Over类来构造,其中必须设置orderBy、preceding及as方法;它有Unbounded及Bounded两大类(对于event-time及processing-time使用unbounded_range来表示Unbounded,对于row-count使用unbounded_row来表示Unbounded;对于event-time及processing-time使用诸如1.minutes来表示Bounded,对于row-count使用诸如10.rows来表示Bounded)
    • Table提供了OverWindow参数的window方法,用来进行Over Windows操作,它创建的是OverWindowedTable;OverWindow定义了alias、partitionBy、orderBy、preceding、following属性;Over类是创建over window的帮助类,它提供了orderBy及partitionBy两个方法,分别创建的是OverWindowWithOrderBy及PartitionedOver,而PartitionedOver提供了orderBy方法,创建的是OverWindowWithOrderBy;OverWindowWithOrderBy提供了preceding方法,创建的是OverWindowWithPreceding;OverWindowWithPreceding则包含了partitionBy、orderBy、preceding属性,它提供了as方法创建OverWindow,另外还提供了following方法用于设置following offset
    • OverWindowedTable构造器需要overWindows参数;它只提供select操作,其中select可以接收String类型的参数,也可以接收Expression类型的参数;String类型的参数会被转换为Expression类型,最后调用的是Expression类型参数的select方法;select方法创建了新的Table,其Project的projectList为expandedOverFields.map(UnresolvedAlias),而expandedOverFields则通过resolveOverWindows(expandedFields, overWindows, table.tableEnv)得到

    doc

    相关文章

      网友评论

          本文标题:聊聊flink Table的Over Windows

          本文链接:https://www.haomeiwen.com/subject/siugjqtx.html