UDF是什么?
UDF是用户自定义函数(User Define Function)的缩写,从定义可以看出UDF是一个函数,UDF在SQL开发中是一个非常重要的特性,它可以帮助我们扩展SQL的表意能力。
Flink SQL默认给我们提供了一些内置函数如COALESCE、IF、TO_DATE等函数,但是可能无法满足我们的需求,比如我们希望实现时间戳转换、从JSON字符串中获取对应值、维度关联等复杂功能需要借助UDF来完成。
Flink中UDF总共有几类?
- UDF(Scalar Function)
- UDTF
- UDAF
- UDTAF
UDF如何使用
UDF需要先注册后使用,在TableEnvironment
中调用registerFunction()
方法完成UDF方法注册。当完成UDF的注册后该函数就会注册到TablEnvironment
中,后续在SQL中使用该函数Table API和SQL parser就能够识并解析该函数。
Scalar Functions(标量函数)
标量函数用于对传递给它的一个或者多个参数值进行处理和计算,并返回一个单一的值。
定义标量函数需要继承包org.apache.flink.table.functions
下的类ScalarFunction
并且实现一到多个求值函数(evaluation method)。标量函数的行为是取决于求值函数。一个求值函数必须要定义个访问权限为public
的eval
函数。求值函数可以自定义参数类型和返回值类型。求值函数还支持重载(可以实现多个eval
函数)。
public class HashCode extends ScalarFunction {
private int factor = 12;
public HashCode(int factor) {
this.factor = factor;
}
public int eval(String s) {
return s.hashCode() * factor;
}
}
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
// register the function
tableEnv.registerFunction("hashCode", new HashCode(10));
// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");
// use the function in SQL API
tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable");
求值函数的返回值默认是由Flink的类型提取器获取的。这对于基本类型及POJO是够用的但是对于复杂类型如自定义或组合类型会提取错误。在这些情况下可以通过手动覆盖ScalarFunction#getResultType()
函数来定义结果类型的TypeInformation
。
接下来的例子展示一个高级的例子,内部使用timestamp表示当返回时内部timestamp表示成long值进行返回。通过覆盖ScalarFunction#getResultType()
我们定义这个返回的long值将被代码生成器解释成Types.TIMESTAMP
类型。
public static class TimestampModifier extends ScalarFunction {
public long eval(long t) {
return t % 1000;
}
public TypeInformation<?> getResultType(Class<?>[] signature) {
return Types.SQL_TIMESTAMP;
}
}
Table Functions
Table Function接受任意数量的输入值,输出一张表(一或多行同时每行有一或多列)。
类似用户自定义的标量函数,用户自定义的table function使用0、1或者多个标量值作为输入参数。但是与标量函数不同的是,它可以返回任意数量的行而不是一个单一的值。返回的行可以包含一个或多个列。
为了定义Tabe Function需要继承包org.apache.flink.table.functions
下的TableFunction
基类然后实现(一个或多个)求值函数。Table Function的行为是由它的求值函数决定的。一个求值函数必须声明为public
且命名为eval
。Table Function可以通过实现多个eval
函数来实现重载。求值函数的参数类型决定了Table Function所有可用的合法参数。求值函数还可以支持不定长参数,比如eval(String... strs)
。返回的表类型是由TableFunction定义的泛型类型决定的。求值函数发出输出row使用protected collect(T)
方法。
在Table API中Table Function 主要用于.joinLateral
or .leftOuterJoinLateral
。joinLateral
算子(cross join) 连接外表(算子左边的表)的每一行及Table Function的值(算子右边的函数)产生的所有行。leftOuterJoinLateral
算子连接外表的每一行(算子左边的表)及TableFunction的值(算子右边的函数)产生所有行同时如果Table Function返回空表的话则保留外部表的行。在SQL中使用LATERAL TABLE
(<TableFunction>)需要配合CROSS JOIN
或LEFT JOIN
与ON TRUE
的join条件组合。
下面的例子展示了TableFunction如何定义、在TableEnvironment中注册及在查询中调用。注意你可以在注册TableFunction之前通过它的构造函数来进行配置。
// The generic type "Tuple2<String, Integer>" determines the schema of the returned table as (String, Integer).
public class Split extends TableFunction<Tuple2<String, Integer>> {
private String separator = " ";
public Split(String separator) {
this.separator = separator;
}
public void eval(String str) {
for (String s : str.split(separator)) {
// use collect(...) to emit a row
collect(new Tuple2<String, Integer>(s, s.length()));
}
}
}
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
Table myTable = ... // table schema: [a: String]
// Register the function.
tableEnv.registerFunction("split", new Split("#"));
// Use the table function in the Java Table API. "as" specifies the field names of the table.
myTable.joinLateral("split(a) as (word, length)")
.select("a, word, length");
myTable.leftOuterJoinLateral("split(a) as (word, length)")
.select("a, word, length");
// Use the table function in SQL with LATERAL and TABLE keywords.
// CROSS JOIN a table function (equivalent to "join" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable, LATERAL TABLE(split(a)) as T(word, length)");
// LEFT JOIN a table function (equivalent to "leftOuterJoin" in Table API).
tableEnv.sqlQuery("SELECT a, word, length FROM MyTable LEFT JOIN LATERAL TABLE(split(a)) as T(word, length) ON TRUE");
请注意POJO类型无法决定字段的顺序。所以你不能通过对Table Function使用AS
来实现对POJO字段的重命名。
Table Function的结果类型默认是由Flink的类型提取工具确定的。这对于基本类型及简单的POJO有效,但是对于更复杂如自定义或者复合类型可能会出错。在这种情况,结果的类型能通过手动重写TableFunction#getResultType()
返回指定的TypeInformation来确定类型信息。
下面的例子展示了TableFunction需要返回Row类型所以需要显示指定类型信息的例子。我们通过重写TableFunction#getResultType()
来定义返回的表类型为RowTypeInfo(String, Integer)
。
public class CustomTypeSplit extends TableFunction<Row> {
public void eval(String str) {
for (String s : str.split(" ")) {
Row row = new Row(2);
row.setField(0, s);
row.setField(1, s.length());
collect(row);
}
}
@Override
public TypeInformation<Row> getResultType() {
return Types.ROW(Types.STRING(), Types.INT());
}
}
Aggregation Functions
User-Defined Aggregate Functions(UDAF)是输入一张表(一或多行同时每行有一或多列),输出一个标量值,它是将一张表聚合成一个值类型与SQL中Group操作类似。
image.png 上面的图展示了聚合的例子。假设你有一张包含各种饮料的数据的表。这个表格里包含3列属性(id、名称和价格)且一共有5行。现在你需要找到表格里所有饮料中最高价格。比如执行了聚合函数max()。你需要检查这5行最后输出一个数字值作为结果。
用户自定义聚合函数式通过继承AggregationFunction
类来实现的。一个AggregationFunction
的工作内容如下。首先它需要一个累加器(accumulator
),累加器是一个能保留聚合的中间结果的数据结构。一个空的累加器通过调用AggregationFunction
的createAccmulator()
方法来创建的。随后对于每个输入行都调用函数的accumulate()
方法来更新累加器。一旦所有的行都已经处理完成,函数的getValue()
方法会被调用用来计算并返回最终结果。
对于AggregationFunction下面的方法是强制要求设置的:
- createAccumulator()
- accumulate()
- getValue()
Flink的类型提取器对于识别复杂数据类型会失败,比如如果他们不是基本类型或者简单的POJO。因此和ScalarFunction
和TableFunction
类似,AggregateFunction
提供方法用于指定结果类型的TypeInformation
(通过AggregateFunction#getResultType()
)及累加器的类型(通过`AggregateFunction#getAccumulatorType())。
除了上面的方法,这还有一些简化的方法可以有选择的来实现。当中有一些方法允许系统更高效的执行查询,其他的在某些情况下是需要必须实现的。比如在聚合函数需要在session group window(当以行察觉到需要”连接“它们时,两个session窗口的累加器需要关联)中使用context时需要强制实现merge()
方法。
下面的AggregationFunction的方法需要取决于使用情况来强制实现:
- retract() 是在聚合进行有界数据聚合时需要的
- merge()是在许多批聚合及session窗口聚合时需要的
- resetAccumulator()是在许多批聚合是需要的
AggregateFunction
所有的方法必须声明为public
,not static
且命名必须和上面提到的保持一致。方法createAccumulator
,getValue
,getResultType
及getAccumulatorType
都是在抽象类AggregateFunction
中定义的,其他的方法都是需要手动定义的。为了定义一个聚合函数,需啊集成基类org.apache.flink.table.functions.AggregateFunction
并且实现一或多个accumulate
方法。这个accmulate
方法能够通过不同类型的参数及支持变长参数来实现重载。
AggregationFunction
的所有方法都在下方文档。
/**
* Base class for user-defined aggregates and table aggregates.
*
* @param <T> the type of the aggregation result.
* @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the
* aggregated values which are needed to compute an aggregation result.
*/
public abstract class UserDefinedAggregateFunction<T, ACC> extends UserDefinedFunction {
/**
* Creates and init the Accumulator for this (table)aggregate function.
*
* @return the accumulator with the initial value
*/
public ACC createAccumulator(); // MANDATORY
/**
* Returns the TypeInformation of the (table)aggregate function's result.
*
* @return The TypeInformation of the (table)aggregate function's result or null if the result
* type should be automatically inferred.
*/
public TypeInformation<T> getResultType = null; // PRE-DEFINED
/**
* Returns the TypeInformation of the (table)aggregate function's accumulator.
*
* @return The TypeInformation of the (table)aggregate function's accumulator or null if the
* accumulator type should be automatically inferred.
*/
public TypeInformation<ACC> getAccumulatorType = null; // PRE-DEFINED
}
/**
* Base class for aggregation functions.
*
* @param <T> the type of the aggregation result
* @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the
* aggregated values which are needed to compute an aggregation result.
* AggregateFunction represents its state using accumulator, thereby the state of the
* AggregateFunction must be put into the accumulator.
*/
public abstract class AggregateFunction<T, ACC> extends UserDefinedAggregateFunction<T, ACC> {
/** Processes the input values and update the provided accumulator instance. The method
* accumulate can be overloaded with different custom types and arguments. An AggregateFunction
* requires at least one accumulate() method.
*
* @param accumulator the accumulator which contains the current aggregated results
* @param [user defined inputs] the input value (usually obtained from a new arrived data).
*/
public void accumulate(ACC accumulator, [user defined inputs]); // MANDATORY
/**
* Retracts the input values from the accumulator instance. The current design assumes the
* inputs are the values that have been previously accumulated. The method retract can be
* overloaded with different custom types and arguments. This function must be implemented for
* datastream bounded over aggregate.
*
* @param accumulator the accumulator which contains the current aggregated results
* @param [user defined inputs] the input value (usually obtained from a new arrived data).
*/
public void retract(ACC accumulator, [user defined inputs]); // OPTIONAL
/**
* Merges a group of accumulator instances into one accumulator instance. This function must be
* implemented for datastream session window grouping aggregate and dataset grouping aggregate.
*
* @param accumulator the accumulator which will keep the merged aggregate results. It should
* be noted that the accumulator may contain the previous aggregated
* results. Therefore user should not replace or clean this instance in the
* custom merge method.
* @param its an {@link java.lang.Iterable} pointed to a group of accumulators that will be
* merged.
*/
public void merge(ACC accumulator, java.lang.Iterable<ACC> its); // OPTIONAL
/**
* Called every time when an aggregation result should be materialized.
* The returned value could be either an early and incomplete result
* (periodically emitted as data arrive) or the final result of the
* aggregation.
*
* @param accumulator the accumulator which contains the current
* aggregated results
* @return the aggregation result
*/
public T getValue(ACC accumulator); // MANDATORY
/**
* Resets the accumulator for this [[AggregateFunction]]. This function must be implemented for
* dataset grouping aggregate.
*
* @param accumulator the accumulator which needs to be reset
*/
public void resetAccumulator(ACC accumulator); // OPTIONAL
/**
* Returns true if this AggregateFunction can only be applied in an OVER window.
*
* @return true if the AggregateFunction requires an OVER window, false otherwise.
*/
public Boolean requiresOver = false; // PRE-DEFINED
}
下面的例子展示了定义一个AggregateFunction
计算给定列的加权平均值、在TableEnvironment
中注册及在查询中使用这个函数。
为了计算加权平均值,累加器需要存权值的总和以及所有的数据出现次数的累加值。在我们的例子里我们定义了一个类WeightedAvgAccum
用做累加器。累加器能够自动使用Flink的checkpoint机制并且当出错时能够自动回复从而保证精准消费一次的语义。
我们的聚合函数WeightedAvg
的accumulate()
方法有三个输入。第一个是WeightedAvgAccum
累加器,其他两个是用户自定义的输入:输入值ivalue
和权重输入iweight
。尽管retract()
,merge()
及resetAccmulator()
方法在大多数聚合类型中不需要强制设置,但我们还是提供了一个例子。请注意我们使用Java的基本类型同时在Scala例子中定义了getResultType()
和getAccumulatorType()
方法因为Flink类型提取对于Scala类型执行的并不是很好。
/**
* Accumulator for WeightedAvg.
*/
public static class WeightedAvgAccum {
public long sum = 0;
public int count = 0;
}
/**
* Weighted Average user-defined aggregate function.
*/
public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {
@Override
public WeightedAvgAccum createAccumulator() {
return new WeightedAvgAccum();
}
@Override
public Long getValue(WeightedAvgAccum acc) {
if (acc.count == 0) {
return null;
} else {
return acc.sum / acc.count;
}
}
public void accumulate(WeightedAvgAccum acc, long iValue, int iWeight) {
acc.sum += iValue * iWeight;
acc.count += iWeight;
}
public void retract(WeightedAvgAccum acc, long iValue, int iWeight) {
acc.sum -= iValue * iWeight;
acc.count -= iWeight;
}
public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
Iterator<WeightedAvgAccum> iter = it.iterator();
while (iter.hasNext()) {
WeightedAvgAccum a = iter.next();
acc.count += a.count;
acc.sum += a.sum;
}
}
public void resetAccumulator(WeightedAvgAccum acc) {
acc.count = 0;
acc.sum = 0L;
}
}
// register function
StreamTableEnvironment tEnv = ...
tEnv.registerFunction("wAvg", new WeightedAvg());
// use function
tEnv.sqlQuery("SELECT user, wAvg(points, level) AS avgPoints FROM userScores GROUP BY user");
Table Aggregation Functions
User-Defined Table Aggregate Functions(UDTAGGs) 是输入一张表(一或多行且每行有一或多个属性),输出一个张结果表。
image.png 上面的图展示了聚UDAG的例子。假设你有一张包含饮料数据的表格。这个表包含id、名称、价格3列总共有5行。现在你要在表格中所有饮料找到最高的2个价格。比如实现一个top2()表聚合的功能。你需要依次检查5行最后输出结果是包含最高2个值的一张表。
UDAGF是通过实现继承TableAggregateFunction
类来实现的。一个TableAggregateFunction
实现需要以下步骤。首先,需要实现一个数据结构accumulator,
它是用于保留聚合的中间结果。一个空的累加器是由调用TableAggregateFunction
的方法createAccumulator()
方法来创建的。随后每一行输入进来都会调用accumulate()
方法来更新累加器。一旦所有的行都处理完成,emitValue()
方法会被调用来计算和返回最终结果。
**下面的方法是每一个TableAggregateFunction都需要实现的:
- createAccumulator()
- accumulate()
Flink的类型提取器对于复杂数据类型识别会失败,比如对于非基本类型或非简单的POJO。类似ScalarFunction
和TableFunction
,TableAggregateFunction
提供了方法用来识别结果的TypeInformation
(通过TableAggregateFunction#getResultType())及累加器的类型(通过
TableAggregateFunction#getAccumulatorType()`)。
除了以上方法,还有一些未在父类定义的方法可以选择性的实现。有些方法可以允许系统查询更高效,还有一些在某些情况下要求强制实现。比如,merge()
方法在聚合函数需要使用session group window的上下文时必须要实现(两个session window的累加器当行发现需要做连接时进行关联操作)。
下面就是TableAggregateFunction需要强制实现方法的情况:
- retract()在有界窗口上做聚合需要实现
- merge()在需要批聚合及session window聚合是需要实现
- resetAccumulator()在许多批聚合需要实现
- emitValue()在批和窗口聚合需要实现
下面是TableAggregateFunction需要改善流式任务的性能的方法:
- emitUpdateWithRetract()用于在回撤模式下更新已经发送的数据
对于emitValue
方法,它会将累加器中所有的数据都发送出来。对于TopN的例子来说,emitValue
每次都会将top n的数据都发送出来。这对于流式任务来说会产生应能问题。为了改善性能,用户可以通过实现emitUpdateWithRetract
方法来改善性能。这个方法在回车模式下是增量输出数据的,比如一旦发生了更新,我们会在发送新更新后的数据前先回车旧的记录。如果这两个方法都定义在table aggregate function中会优先使用emitUpdateWithRetract
方法,因为该方法是增量输出结果较emitValue
更加高效。
所有的TableAggregateFunction
必须要声明成public
,非static
而且命名需要和上面提及的保持一致。方法createAccumulator
,getResultType
和getAccumulatorType
都是定义在抽象父类TableAggregateFunction
中,其他的方法未在父类定义。为了定义table aggregation function需要继承父类org.apache.flink.table.functions.TableAggregateFunction
且实现一或多个accumulate
方法。accumulate
可以使用不同的参数及支持变长参数来实现重载。
详细关于TableAggregateFunction
所有的方法在下面列出来。
/**
* Base class for user-defined aggregates and table aggregates.
*
* @param <T> the type of the aggregation result.
* @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the
* aggregated values which are needed to compute an aggregation result.
*/
public abstract class UserDefinedAggregateFunction<T, ACC> extends UserDefinedFunction {
/**
* Creates and init the Accumulator for this (table)aggregate function.
*
* @return the accumulator with the initial value
*/
public ACC createAccumulator(); // MANDATORY
/**
* Returns the TypeInformation of the (table)aggregate function's result.
*
* @return The TypeInformation of the (table)aggregate function's result or null if the result
* type should be automatically inferred.
*/
public TypeInformation<T> getResultType = null; // PRE-DEFINED
/**
* Returns the TypeInformation of the (table)aggregate function's accumulator.
*
* @return The TypeInformation of the (table)aggregate function's accumulator or null if the
* accumulator type should be automatically inferred.
*/
public TypeInformation<ACC> getAccumulatorType = null; // PRE-DEFINED
}
/**
* Base class for table aggregation functions.
*
* @param <T> the type of the aggregation result
* @param <ACC> the type of the aggregation accumulator. The accumulator is used to keep the
* aggregated values which are needed to compute a table aggregation result.
* TableAggregateFunction represents its state using accumulator, thereby the state of
* the TableAggregateFunction must be put into the accumulator.
*/
public abstract class TableAggregateFunction<T, ACC> extends UserDefinedAggregateFunction<T, ACC> {
/** Processes the input values and update the provided accumulator instance. The method
* accumulate can be overloaded with different custom types and arguments. A TableAggregateFunction
* requires at least one accumulate() method.
*
* @param accumulator the accumulator which contains the current aggregated results
* @param [user defined inputs] the input value (usually obtained from a new arrived data).
*/
public void accumulate(ACC accumulator, [user defined inputs]); // MANDATORY
/**
* Retracts the input values from the accumulator instance. The current design assumes the
* inputs are the values that have been previously accumulated. The method retract can be
* overloaded with different custom types and arguments. This function must be implemented for
* datastream bounded over aggregate.
*
* @param accumulator the accumulator which contains the current aggregated results
* @param [user defined inputs] the input value (usually obtained from a new arrived data).
*/
public void retract(ACC accumulator, [user defined inputs]); // OPTIONAL
/**
* Merges a group of accumulator instances into one accumulator instance. This function must be
* implemented for datastream session window grouping aggregate and dataset grouping aggregate.
*
* @param accumulator the accumulator which will keep the merged aggregate results. It should
* be noted that the accumulator may contain the previous aggregated
* results. Therefore user should not replace or clean this instance in the
* custom merge method.
* @param its an {@link java.lang.Iterable} pointed to a group of accumulators that will be
* merged.
*/
public void merge(ACC accumulator, java.lang.Iterable<ACC> its); // OPTIONAL
/**
* Called every time when an aggregation result should be materialized. The returned value
* could be either an early and incomplete result (periodically emitted as data arrive) or
* the final result of the aggregation.
*
* @param accumulator the accumulator which contains the current
* aggregated results
* @param out the collector used to output data
*/
public void emitValue(ACC accumulator, Collector<T> out); // OPTIONAL
/**
* Called every time when an aggregation result should be materialized. The returned value
* could be either an early and incomplete result (periodically emitted as data arrive) or
* the final result of the aggregation.
*
* Different from emitValue, emitUpdateWithRetract is used to emit values that have been updated.
* This method outputs data incrementally in retract mode, i.e., once there is an update, we
* have to retract old records before sending new updated ones. The emitUpdateWithRetract
* method will be used in preference to the emitValue method if both methods are defined in the
* table aggregate function, because the method is treated to be more efficient than emitValue
* as it can outputvalues incrementally.
*
* @param accumulator the accumulator which contains the current
* aggregated results
* @param out the retractable collector used to output data. Use collect method
* to output(add) records and use retract method to retract(delete)
* records.
*/
public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> out); // OPTIONAL
/**
* Collects a record and forwards it. The collector can output retract messages with the retract
* method. Note: only use it in {@code emitRetractValueIncrementally}.
*/
public interface RetractableCollector<T> extends Collector<T> {
/**
* Retract a record.
*
* @param record The record to retract.
*/
void retract(T record);
}
}
下面的例子的展示了如何定义一个通过给定的列计算出top 2值的TableAggregateFuncgtion
、在TableEnvironment
中注册及在Table API中使用该函数查询(Flink 1.9中TableAggregateFunction只支持在在Table API中使用)。
为了计算top 2值,累加器需要存储已经累加后的最大的两个值。在我们的例子中我们定义Top2Accum
类作为我们累加器。累加器自动支持使用Flink的checkpoint机制在程序出错时能存储数据保证精准一次的语义。
我们的Top2 UTAF的accumulate()
方法有两个输入。第一个是Top2Accum
累加器,另一个是用户自定义数据:输入值v。尽管merge()
方法对于大多数聚合类型不是强制要求的,我们在下面还是提供了样例。请注意我们使用Java基本类型在Scala样例中定义getResultType()
和getAccumulatorType()
方法因为Flink类型提取对于Scala类型计算的不是很好。
/**
* Accumulator for Top2.
*/
public class Top2Accum {
public Integer first;
public Integer second;
}
/**
* The top2 user-defined table aggregate function.
*/
public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {
@Override
public Top2Accum createAccumulator() {
Top2Accum acc = new Top2Accum();
acc.first = Integer.MIN_VALUE;
acc.second = Integer.MIN_VALUE;
return acc;
}
public void accumulate(Top2Accum acc, Integer v) {
if (v > acc.first) {
acc.second = acc.first;
acc.first = v;
} else if (v > acc.second) {
acc.second = v;
}
}
public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) {
for (Top2Accum otherAcc : iterable) {
accumulate(acc, otherAcc.first);
accumulate(acc, otherAcc.second);
}
}
public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {
// emit the value and rank
if (acc.first != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.first, 1));
}
if (acc.second != Integer.MIN_VALUE) {
out.collect(Tuple2.of(acc.second, 2));
}
}
}
// register function
StreamTableEnvironment tEnv = ...
tEnv.registerFunction("top2", new Top2());
// init table
Table tab = ...;
// use function
tab.groupBy("key")
.flatAggregate("top2(a) as (v, rank)")
.select("key, v, rank");
下面的例子展示了如何使用emitUpdateWithRetract
方法来发送只有更新的数据。为了发送更新,在我们的例子中累加器保留了旧的和新的top 2值。注意:如果topN的N是很大的话,对于保留旧的和新的值不是特别高效的。一种做法是在accumulate
方法中存储将输入数据的值存储到累加器中然后在emitUpdateWithRetract
方法中执行计算。
/**
* Accumulator for Top2.
*/
public class Top2Accum {
public Integer first;
public Integer second;
public Integer oldFirst;
public Integer oldSecond;
}
/**
* The top2 user-defined table aggregate function.
*/
public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {
@Override
public Top2Accum createAccumulator() {
Top2Accum acc = new Top2Accum();
acc.first = Integer.MIN_VALUE;
acc.second = Integer.MIN_VALUE;
acc.oldFirst = Integer.MIN_VALUE;
acc.oldSecond = Integer.MIN_VALUE;
return acc;
}
public void accumulate(Top2Accum acc, Integer v) {
if (v > acc.first) {
acc.second = acc.first;
acc.first = v;
} else if (v > acc.second) {
acc.second = v;
}
}
public void emitUpdateWithRetract(Top2Accum acc, RetractableCollector<Tuple2<Integer, Integer>> out) {
if (!acc.first.equals(acc.oldFirst)) {
// if there is an update, retract old value then emit new value.
if (acc.oldFirst != Integer.MIN_VALUE) {
out.retract(Tuple2.of(acc.oldFirst, 1));
}
out.collect(Tuple2.of(acc.first, 1));
acc.oldFirst = acc.first;
}
if (!acc.second.equals(acc.oldSecond)) {
// if there is an update, retract old value then emit new value.
if (acc.oldSecond != Integer.MIN_VALUE) {
out.retract(Tuple2.of(acc.oldSecond, 2));
}
out.collect(Tuple2.of(acc.second, 2));
acc.oldSecond = acc.second;
}
}
}
// register function
StreamTableEnvironment tEnv = ...
tEnv.registerFunction("top2", new Top2());
// init table
Table tab = ...;
// use function
tab.groupBy("key")
.flatAggregate("top2(a) as (v, rank)")
.select("key, v, rank");
实现UDF的最佳实践
Table API和SQL内部代码生成尝试尽可能多的使用基本类型。一个UDF如果使用过多的对象创建、转换、封箱(拆箱)会引入很多开销。因此强烈建议声明参数和结果类型时使用基本类型而不是它们包装类。Types.DATE
和Types.TIME
能被表示成int。Types.TIMESTAMP
能被表示成long
。
将UDF和Runtime进行整合
有时对于UDF有必要获取全局runtime参数信息来在实际运行前做一些准备(setup)和(清理)工作。UDF提供重写open
及close()
方法来实现与DataSet或DataStream API中的RichFunction
类似的功能。
在求值函数执行前open
方法会被调用一次。close()
方法会在最后一次求值函数调用后执行。
open()
方法提供了FunctionContext
用于记录UDF执行中的上下文信息,比如指标组、分布式缓存文件或者全局作业参数。
下面的信息能够通过调用FunctionContext
的方法来获得:
Method | Description |
---|---|
getMetricGroup() | 并行子任务的指标组 |
getCachedFile(name) | 分布式缓存文件的本地临时拷贝文件 |
getJobParameter(name, defaultValue) | 根据指定键获取全局作业参数 |
下面的代码例子展示了如何在标量函数中使用FunctionContext
获取全局作业参数:
public class HashCode extends ScalarFunction {
private int factor = 0;
@Override
public void open(FunctionContext context) throws Exception {
// access "hashcode_factor" parameter
// "12" would be the default value if parameter does not exist
factor = Integer.valueOf(context.getJobParameter("hashcode_factor", "12"));
}
public int eval(String s) {
return s.hashCode() * factor;
}
}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
// set job parameter
Configuration conf = new Configuration();
conf.setString("hashcode_factor", "31");
env.getConfig().setGlobalJobParameters(conf);
// register the function
tableEnv.registerFunction("hashCode", new HashCode());
// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");
// use the function in SQL
tableEnv.sqlQuery("SELECT string, HASHCODE(string) FROM MyTable");
网友评论