美文网首页
聊聊flink Table的Group Windows

聊聊flink Table的Group Windows

作者: go4it | 来源:发表于2019-01-26 11:01 被阅读45次

    本文主要研究一下flink Table的Group Windows

    实例

    Table table = input
      .window([Window w].as("w"))  // define window with alias w
      .groupBy("w")  // group the table by window w
      .select("b.sum");  // aggregate
    
    Table table = input
      .window([Window w].as("w"))  // define window with alias w
      .groupBy("w, a")  // group the table by attribute a and window w 
      .select("a, b.sum");  // aggregate
    
    Table table = input
      .window([Window w].as("w"))  // define window with alias w
      .groupBy("w, a")  // group the table by attribute a and window w 
      .select("a, w.start, w.end, w.rowtime, b.count"); // aggregate and add window start, end, and rowtime timestamps
    
    • window操作可以对Window进行别名,然后可以在groupBy及select中引用,window有start、end、rowtime属性可以用,其中start及rowtime是inclusive的,而end为exclusive

    Tumbling Windows实例

    // Tumbling Event-time Window
    .window(Tumble.over("10.minutes").on("rowtime").as("w"));
    
    // Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
    .window(Tumble.over("10.minutes").on("proctime").as("w"));
    
    // Tumbling Row-count Window (assuming a processing-time attribute "proctime")
    .window(Tumble.over("10.rows").on("proctime").as("w"));
    
    • Tumbling Windows按固定窗口大小来移动,因而窗口不重叠;over方法用于指定窗口大小;窗口大小可以基于event-time、processing-time、row-count来定义

    Sliding Windows实例

    // Sliding Event-time Window
    .window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"));
    
    // Sliding Processing-time window (assuming a processing-time attribute "proctime")
    .window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"));
    
    // Sliding Row-count window (assuming a processing-time attribute "proctime")
    .window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"));
    
    • Sliding Windows在slide interval小于window size的时候,窗口会有重叠,因而rows可能归属多个窗口;over方法用于指定窗口大小,窗口大小可以基于event-time、processing-time、row-count来定义;every方法用于指定slide interval

    Session Windows实例

    // Session Event-time Window
    .window(Session.withGap("10.minutes").on("rowtime").as("w"));
    
    // Session Processing-time Window (assuming a processing-time attribute "proctime")
    .window(Session.withGap("10.minutes").on("proctime").as("w"));
    
    • Session Windows没有固定的窗口大小,它基于inactivity的程度来关闭窗口,withGap方法用于指定两个窗口的gap,作为time interval;Session Windows只能使用event-time或者processing-time

    Table.window

    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

    class Table(
        private[flink] val tableEnv: TableEnvironment,
        private[flink] val logicalPlan: LogicalNode) {
    
      //......
      
      def window(window: Window): WindowedTable = {
        new WindowedTable(this, window)
      }
      
      //......
    }
    
    • Table提供了window操作,接收Window参数,创建的是WindowedTable

    WindowedTable

    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

    class WindowedTable(
        private[flink] val table: Table,
        private[flink] val window: Window) {
    
      def groupBy(fields: Expression*): WindowGroupedTable = {
        val fieldsWithoutWindow = fields.filterNot(window.alias.equals(_))
        if (fields.size != fieldsWithoutWindow.size + 1) {
          throw new ValidationException("GroupBy must contain exactly one window alias.")
        }
    
        new WindowGroupedTable(table, fieldsWithoutWindow, window)
      }
    
      def groupBy(fields: String): WindowGroupedTable = {
        val fieldsExpr = ExpressionParser.parseExpressionList(fields)
        groupBy(fieldsExpr: _*)
      }
    
    }
    
    • WindowedTable只提供groupBy操作,其中groupBy可以接收String类型的参数,也可以接收Expression类型的参数;String类型的参数会被转换为Expression类型,最后调用的是Expression类型参数的groupBy方法;如果groupBy除了window没有其他属性,则其parallelism为1,只会在单一task上执行;groupBy方法创建的是WindowGroupedTable

    WindowGroupedTable

    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/api/table.scala

    class WindowGroupedTable(
        private[flink] val table: Table,
        private[flink] val groupKeys: Seq[Expression],
        private[flink] val window: Window) {
    
      def select(fields: Expression*): Table = {
        val expandedFields = expandProjectList(fields, table.logicalPlan, table.tableEnv)
        val (aggNames, propNames) = extractAggregationsAndProperties(expandedFields, table.tableEnv)
    
        val projectsOnAgg = replaceAggregationsAndProperties(
          expandedFields, table.tableEnv, aggNames, propNames)
    
        val projectFields = extractFieldReferences(expandedFields ++ groupKeys :+ window.timeField)
    
        new Table(table.tableEnv,
          Project(
            projectsOnAgg,
            WindowAggregate(
              groupKeys,
              window.toLogicalWindow,
              propNames.map(a => Alias(a._1, a._2)).toSeq,
              aggNames.map(a => Alias(a._1, a._2)).toSeq,
              Project(projectFields, table.logicalPlan).validate(table.tableEnv)
            ).validate(table.tableEnv),
            // required for proper resolution of the time attribute in multi-windows
            explicitAlias = true
          ).validate(table.tableEnv))
      }
    
      def select(fields: String): Table = {
        val fieldExprs = ExpressionParser.parseExpressionList(fields)
        //get the correct expression for AggFunctionCall
        val withResolvedAggFunctionCall = fieldExprs.map(replaceAggFunctionCall(_, table.tableEnv))
        select(withResolvedAggFunctionCall: _*)
      }
    }
    
    • WindowGroupedTable只提供select操作,其中select可以接收String类型的参数,也可以接收Expression类型的参数;String类型的参数会被转换为Expression类型,最后调用的是Expression类型参数的select方法;select方法创建了新的Table,其Project的child为WindowAggregate

    小结

    • window操作可以对Window进行别名,然后可以在groupBy及select中引用,window有start、end、rowtime属性可以用,其中start及rowtime是inclusive的,而end为exclusive
    • Tumbling Windows按固定窗口大小来移动,因而窗口不重叠;over方法用于指定窗口大小;窗口大小可以基于event-time、processing-time、row-count来定义;Sliding Windows在slide interval小于window size的时候,窗口会有重叠,因而rows可能归属多个窗口;over方法用于指定窗口大小,窗口大小可以基于event-time、processing-time、row-count来定义;every方法用于指定slide interval;Session Windows没有固定的窗口大小,它基于inactivity的程度来关闭窗口,withGap方法用于指定两个窗口的gap,作为time interval;Session Windows只能使用event-time或者processing-time
    • Table提供了window操作,接收Window参数,创建的是WindowedTable;WindowedTable只提供groupBy操作,其中groupBy可以接收String类型的参数,也可以接收Expression类型的参数;String类型的参数会被转换为Expression类型,最后调用的是Expression类型参数的groupBy方法;如果groupBy除了window没有其他属性,则其parallelism为1,只会在单一task上执行;groupBy方法创建的是WindowGroupedTable;WindowGroupedTable只提供select操作,其中select可以接收String类型的参数,也可以接收Expression类型的参数;String类型的参数会被转换为Expression类型,最后调用的是Expression类型参数的select方法;select方法创建了新的Table,其Project的child为WindowAggregate

    doc

    相关文章

      网友评论

          本文标题:聊聊flink Table的Group Windows

          本文链接:https://www.haomeiwen.com/subject/tztijqtx.html