Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
前言
近期使用Flink SQL内置函数的时候遇到了点问题,函数的返回结果和想象中的不一致。于是阅读了Flink内置函数的调用流程源代码。下面把这些内容分享给大家。
BuiltInFunctionDefinitions
首先我们从函数定义部分开始。函数定义在解析AST语法树的时候会用到。
BuiltInFunctionDefinitions
中定义了很多内置的function。我们以ROUND四舍五入方法为例,分析SQL函数从SQL语法树中定义到转换为Java可执行代码的整个执行流程。
下面给出ROUND方法定义的过程:
public static final BuiltInFunctionDefinition ROUND =
BuiltInFunctionDefinition.newBuilder()
.name("round") // 方法名称为round
.kind(SCALAR) // 类型为标量函数(只有一个输出)
.inputTypeStrategy( // 参数类型,参数可以是一个数字类型,或者一个数字类型和一个整数类型
or(
sequence(logical(LogicalTypeFamily.NUMERIC)),
sequence(
logical(LogicalTypeFamily.NUMERIC),
logical(LogicalTypeRoot.INTEGER))))
.outputTypeStrategy(nullableIfArgs(SpecificTypeStrategies.ROUND)) // 输出类型推断策略(flink table中使用)
.build();
DirectConvertRule
只有函数的定义,我们怎么知道它是和那个函数关联起来的呢?
DirectConvertRule
正是用来保存这个关联关系,它定义了上面BuiltInFunctionDefinitions
和FlinkSqlOperatorTable
中function的转换关系(一对一关系)。
例如ROUND函数的对应关系:。
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.ROUND, FlinkSqlOperatorTable.ROUND);
BuiltInFunctionDefinitions
为上面所述的SQL方法的定义,FlinkSqlOperatorTable
保存了Flink的内置SQL函数。DEFINITION_OPERATOR_MAP
维护了方法的定义和SqlFunction的对应关系。
通过这一行Flink可以得知将SQL中的函数翻译为Flink哪一个内置的函数执行。
FlinkSqlOperatorTable
在这个类中定义了一系列内置的SQL function。FlinkSqlOperatorTable
中创建了一系列static的SQLFunction
对象。
我们以ROUND方法为例:
public static final SqlFunction ROUND =
new SqlFunction(
"ROUND",
SqlKind.OTHER_FUNCTION,
FlinkReturnTypes.ROUND_FUNCTION_NULLABLE,
null,
OperandTypes.or(OperandTypes.NUMERIC_INTEGER, OperandTypes.NUMERIC),
SqlFunctionCategory.NUMERIC);
猛一看SqlFunction的定义和
BuiltInFunctionDefinitions
非常相似,但是使用的位置不同。BuiltInFunctionDefinitions
用于SQL解析的时候,FlinkSqlOperatorTable
在SQL翻译为可执行代码的时候使用。
我们查看下SqlFunction
的构造函数,代码如下:
public SqlFunction(
String name,
SqlKind kind,
@Nullable SqlReturnTypeInference returnTypeInference,
@Nullable SqlOperandTypeInference operandTypeInference,
@Nullable SqlOperandTypeChecker operandTypeChecker,
SqlFunctionCategory category) {
// We leave sqlIdentifier as null to indicate
// that this is a built-in.
this(name, null, kind, returnTypeInference, operandTypeInference,
operandTypeChecker, category);
assert !((category == SqlFunctionCategory.USER_DEFINED_CONSTRUCTOR)
&& (returnTypeInference == null));
}
参数的含义为:
- 函数名称
- 函数细分类型
- 返回值推断策略。包含数据转换规则和数据类型转换规则
- 操作数类型推断策略。根据callBinding, returnType, operandTypes来推断操作数的类型
- SQL函数的归类,比SQLKind的分类要粗
FlinkReturnTypes
定义返回类型推断方式。有些特殊的函数,比如说ROUND四舍五入,他的返回类型可能为BigDecimal(如果传入数据类型为BigDecimal的话)。BigDecimal是一个复合类型,它有两个参数precision(精度)和scale(小数点后面位数)。这两个参数是需要根据ROUND函数传入的数据和参数实时计算的。因此需要一个专门定义一段返回值类型推断逻辑。
/** ROUND(num [,len]) type inference. */
public static final SqlReturnTypeInference ROUND_FUNCTION =
new SqlReturnTypeInference() {
@Override
public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
// 获取操作数的类型
final RelDataType numType = opBinding.getOperandType(0);
// 如果操作数的类型不是BigDecimal,无需推断,直接返回原类型
if (numType.getSqlTypeName() != SqlTypeName.DECIMAL) {
return numType;
}
// lenVal为需要四舍五入到第几位
final BigDecimal lenVal;
// 获取参数长度,ROUND函数两个参数,第一个为操作数,第二个为四舍五入保留小数位数
if (opBinding.getOperandCount() == 1) {
// 如果ROUND没指明四舍五入到第几位,则只保留整数部分
lenVal = BigDecimal.ZERO;
} else if (opBinding.getOperandCount() == 2) {
// 获取保留小数点位数
lenVal = getArg1Literal(opBinding); // may return null
} else {
throw new AssertionError();
}
if (lenVal == null) {
return numType;
}
// ROUND( decimal(p,s), r )
// 分别获取原操作数经度,小数点后位数和四舍五入保留位数
final int p = numType.getPrecision();
final int s = numType.getScale();
final int r = lenVal.intValueExact();
// 计算出新的Decimal类型,包含precision和scale
DecimalType dt = LogicalTypeMerging.findRoundDecimalType(p, s, r);
return opBinding
.getTypeFactory()
.createSqlType(SqlTypeName.DECIMAL, dt.getPrecision(), dt.getScale());
}
private BigDecimal getArg1Literal(SqlOperatorBinding opBinding) {
try {
return opBinding.getOperandLiteralValue(1, BigDecimal.class);
} catch (Throwable e) {
return null;
}
}
};
FunctionGenerator
FunctionGenerator
负责代码生成,即解析SQL的时候,将SQL中的方法调用翻译为Java/Scala代码中的真实可执行的处理逻辑。它通过addSqlFunctionMethod
方法告诉SQL解析器,如何将某个SQL function翻译为对应的处理逻辑。
addSqlFunctionMethod(
ROUND,
Seq(DECIMAL, INTEGER),
BuiltInMethods.ROUND_DEC)
第一个参数ROUND为前面定义的SqlFunction
。第二个参数为SQL中ROUND函数调用的两个参数类型,第三个参数为ROUND方法的真正执行逻辑,该逻辑是Method类型,通过Java反射来获取。
BuiltInMethods
定义如何去查找实现方法。需要指定方法所在class,方法名和参数列表。比如说上面例子中的ROUND_DEC方法,它负责四舍五入BigDecimal类型数据。通过Types.lookupMethod
使用Java反射方式找到Method对象并返回。
val ROUND_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "sround",
classOf[DecimalData], classOf[Int])
通过代码不难理解,要查找的ROUND方法位于SqlFunctionUtils
类中,名称为sround,参数列表的两个参数分别为(DecimalData, Int)类型。
我们去SqlFunctionUtils
类,找到复合条件的那个方法:
public static BigDecimal sround(BigDecimal b0, int b1) {
return b0.movePointRight(b1).setScale(0, RoundingMode.HALF_UP).movePointLeft(b1);
}
在上面的代码中,我们可以看到Decimal类型的四舍五入操作具体是怎么操作的。
本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。
网友评论