美文网首页
聊聊flink Table的Distinct Aggregati

聊聊flink Table的Distinct Aggregati

作者: go4it | 来源:发表于2019-01-28 11:17 被阅读7次

    本文主要研究一下flink Table的Distinct Aggregation

    实例

    //Distinct can be applied to GroupBy Aggregation, GroupBy Window Aggregation and Over Window Aggregation.
    Table orders = tableEnv.scan("Orders");
    // Distinct aggregation on group by
    Table groupByDistinctResult = orders
        .groupBy("a")
        .select("a, b.sum.distinct as d");
    // Distinct aggregation on time window group by
    Table groupByWindowDistinctResult = orders
        .window(Tumble.over("5.minutes").on("rowtime").as("w")).groupBy("a, w")
        .select("a, b.sum.distinct as d");
    // Distinct aggregation on over window
    Table result = orders
        .window(Over
            .partitionBy("a")
            .orderBy("rowtime")
            .preceding("UNBOUNDED_RANGE")
            .as("w"))
        .select("a, b.avg.distinct over w, b.max over w, b.min over w");
    
    //User-defined aggregation function can also be used with DISTINCT modifiers
    Table orders = tEnv.scan("Orders");
    // Use distinct aggregation for user-defined aggregate functions
    tEnv.registerFunction("myUdagg", new MyUdagg());
    orders.groupBy("users").select("users, myUdagg.distinct(points) as myDistinctResult");
    
    • Distinct Aggregation可以用于内置的及自定义的aggregation function;内置的aggregation function诸如GroupBy Aggregation、GroupBy Window Aggregation、Over Window Aggregation

    AggregateFunction

    flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/functions/AggregateFunction.scala

    /**
      * Base class for User-Defined Aggregates.
      *
      * The behavior of an [[AggregateFunction]] can be defined by implementing a series of custom
      * methods. An [[AggregateFunction]] needs at least three methods:
      *  - createAccumulator,
      *  - accumulate, and
      *  - getValue.
      *
      *  There are a few other methods that can be optional to have:
      *  - retract,
      *  - merge, and
      *  - resetAccumulator
      *
      * All these methods must be declared publicly, not static and named exactly as the names
      * mentioned above. The methods createAccumulator and getValue are defined in the
      * [[AggregateFunction]] functions, while other methods are explained below.
      *
      *
      * {{{
      * Processes the input values and update the provided accumulator instance. The method
      * accumulate can be overloaded with different custom types and arguments. An AggregateFunction
      * requires at least one accumulate() method.
      *
      * @param accumulator           the accumulator which contains the current aggregated results
      * @param [user defined inputs] the input value (usually obtained from a new arrived data).
      *
      * def accumulate(accumulator: ACC, [user defined inputs]): Unit
      * }}}
      *
      *
      * {{{
      * Retracts the input values from the accumulator instance. The current design assumes the
      * inputs are the values that have been previously accumulated. The method retract can be
      * overloaded with different custom types and arguments. This function must be implemented for
      * datastream bounded over aggregate.
      *
      * @param accumulator           the accumulator which contains the current aggregated results
      * @param [user defined inputs] the input value (usually obtained from a new arrived data).
      *
      * def retract(accumulator: ACC, [user defined inputs]): Unit
      * }}}
      *
      *
      * {{{
      * Merges a group of accumulator instances into one accumulator instance. This function must be
      * implemented for datastream session window grouping aggregate and dataset grouping aggregate.
      *
      * @param accumulator  the accumulator which will keep the merged aggregate results. It should
      *                     be noted that the accumulator may contain the previous aggregated
      *                     results. Therefore user should not replace or clean this instance in the
      *                     custom merge method.
      * @param its          an [[java.lang.Iterable]] pointed to a group of accumulators that will be
      *                     merged.
      *
      * def merge(accumulator: ACC, its: java.lang.Iterable[ACC]): Unit
      * }}}
      *
      *
      * {{{
      * Resets the accumulator for this [[AggregateFunction]]. This function must be implemented for
      * dataset grouping aggregate.
      *
      * @param accumulator  the accumulator which needs to be reset
      *
      * def resetAccumulator(accumulator: ACC): Unit
      * }}}
      *
      *
      * @tparam T   the type of the aggregation result
      * @tparam ACC the type of the aggregation accumulator. The accumulator is used to keep the
      *             aggregated values which are needed to compute an aggregation result.
      *             AggregateFunction represents its state using accumulator, thereby the state of the
      *             AggregateFunction must be put into the accumulator.
      */
    abstract class AggregateFunction[T, ACC] extends UserDefinedFunction {
      /**
        * Creates and init the Accumulator for this [[AggregateFunction]].
        *
        * @return the accumulator with the initial value
        */
      def createAccumulator(): ACC
    
      /**
        * Called every time when an aggregation result should be materialized.
        * The returned value could be either an early and incomplete result
        * (periodically emitted as data arrive) or the final result of the
        * aggregation.
        *
        * @param accumulator the accumulator which contains the current
        *                    aggregated results
        * @return the aggregation result
        */
      def getValue(accumulator: ACC): T
    
        /**
        * Returns true if this AggregateFunction can only be applied in an OVER window.
        *
        * @return true if the AggregateFunction requires an OVER window, false otherwise.
        */
      def requiresOver: Boolean = false
    
      /**
        * Returns the TypeInformation of the AggregateFunction's result.
        *
        * @return The TypeInformation of the AggregateFunction's result or null if the result type
        *         should be automatically inferred.
        */
      def getResultType: TypeInformation[T] = null
    
      /**
        * Returns the TypeInformation of the AggregateFunction's accumulator.
        *
        * @return The TypeInformation of the AggregateFunction's accumulator or null if the
        *         accumulator type should be automatically inferred.
        */
      def getAccumulatorType: TypeInformation[ACC] = null
    }
    
    • AggregateFunction继承了UserDefinedFunction;它有两个泛型,一个T表示value的泛型,一个ACC表示Accumulator的泛型;它定义了createAccumulator、getValue、getResultType、getAccumulatorType方法(这几个方法中子类必须实现createAccumulator、getValue方法)
    • 对于AggregateFunction,有一个accumulate方法这里没定义,但是需要子类定义及实现,该方法接收ACC,T两个参数,返回void;另外还有retract、merge、resetAccumulator三个方法是可选的,需要子类根据情况去定义及实现
    • 对于datastream bounded over aggregate操作,要求实现restract方法,该方法接收ACC,T两个参数,返回void;对于datastream session window grouping aggregate以及dataset grouping aggregate操作,要求实现merge方法,该方法接收ACC,java.lang.Iterable<T>两个参数,返回void;对于dataset grouping aggregate操作,要求实现resetAccumulator方法,该方法接收ACC参数,返回void

    小结

    • Table的Distinct Aggregation可以用于内置的及自定义的aggregation function;内置的aggregation function诸如GroupBy Aggregation、GroupBy Window Aggregation、Over Window Aggregation
    • AggregateFunction继承了UserDefinedFunction;它有两个泛型,一个T表示value的泛型,一个ACC表示Accumulator的泛型;它定义了createAccumulator、getValue、getResultType、getAccumulatorType方法(这几个方法中子类必须实现createAccumulator、getValue方法)
    • 对于AggregateFunction,有一个accumulate方法这里没定义,但是需要子类定义及实现,该方法接收ACC,T两个参数,返回void;另外还有retract、merge、resetAccumulator三个方法是可选的,需要子类根据情况去定义及实现(对于datastream bounded over aggregate操作,要求实现restract方法,该方法接收ACC,T两个参数,返回void;对于datastream session window grouping aggregate以及dataset grouping aggregate操作,要求实现merge方法,该方法接收ACC,java.lang.Iterable\<T\>两个参数,返回void;对于dataset grouping aggregate操作,要求实现resetAccumulator方法,该方法接收ACC参数,返回void)

    doc

    相关文章

      网友评论

          本文标题:聊聊flink Table的Distinct Aggregati

          本文链接:https://www.haomeiwen.com/subject/pgxajqtx.html