美文网首页Flink学习指南Flinkspark
Flink 源码之 SQL 内置function定义方式

Flink 源码之 SQL 内置function定义方式

作者: AlienPaul | 来源:发表于2021-11-12 16:43 被阅读0次

    Flink源码分析系列文档目录

    请点击:Flink 源码分析系列文档目录

    前言

    近期使用Flink SQL内置函数的时候遇到了点问题,函数的返回结果和想象中的不一致。于是阅读了Flink内置函数的调用流程源代码。下面把这些内容分享给大家。

    BuiltInFunctionDefinitions

    首先我们从函数定义部分开始。函数定义在解析AST语法树的时候会用到。
    BuiltInFunctionDefinitions中定义了很多内置的function。我们以ROUND四舍五入方法为例,分析SQL函数从SQL语法树中定义到转换为Java可执行代码的整个执行流程。

    下面给出ROUND方法定义的过程:

    public static final BuiltInFunctionDefinition ROUND =
            BuiltInFunctionDefinition.newBuilder()
                    .name("round") // 方法名称为round
                    .kind(SCALAR) // 类型为标量函数(只有一个输出)
                    .inputTypeStrategy( // 参数类型,参数可以是一个数字类型,或者一个数字类型和一个整数类型
                            or(
                                    sequence(logical(LogicalTypeFamily.NUMERIC)),
                                    sequence(
                                            logical(LogicalTypeFamily.NUMERIC),
                                            logical(LogicalTypeRoot.INTEGER))))
                    .outputTypeStrategy(nullableIfArgs(SpecificTypeStrategies.ROUND)) // 输出类型推断策略(flink table中使用)
                    .build();
    

    DirectConvertRule

    只有函数的定义,我们怎么知道它是和那个函数关联起来的呢?

    DirectConvertRule正是用来保存这个关联关系,它定义了上面BuiltInFunctionDefinitionsFlinkSqlOperatorTable中function的转换关系(一对一关系)。

    例如ROUND函数的对应关系:。

    DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.ROUND, FlinkSqlOperatorTable.ROUND);
    

    BuiltInFunctionDefinitions为上面所述的SQL方法的定义,FlinkSqlOperatorTable保存了Flink的内置SQL函数。DEFINITION_OPERATOR_MAP维护了方法的定义和SqlFunction的对应关系。

    通过这一行Flink可以得知将SQL中的函数翻译为Flink哪一个内置的函数执行。

    FlinkSqlOperatorTable

    在这个类中定义了一系列内置的SQL function。FlinkSqlOperatorTable中创建了一系列static的SQLFunction对象。

    我们以ROUND方法为例:

    public static final SqlFunction ROUND =
            new SqlFunction(
                    "ROUND",
                    SqlKind.OTHER_FUNCTION,
                    FlinkReturnTypes.ROUND_FUNCTION_NULLABLE,
                    null,
                    OperandTypes.or(OperandTypes.NUMERIC_INTEGER, OperandTypes.NUMERIC),
                    SqlFunctionCategory.NUMERIC);
    

    猛一看SqlFunction的定义和BuiltInFunctionDefinitions非常相似,但是使用的位置不同。BuiltInFunctionDefinitions用于SQL解析的时候,FlinkSqlOperatorTable在SQL翻译为可执行代码的时候使用。

    我们查看下SqlFunction的构造函数,代码如下:

    public SqlFunction(
        String name,
        SqlKind kind,
        @Nullable SqlReturnTypeInference returnTypeInference,
        @Nullable SqlOperandTypeInference operandTypeInference,
        @Nullable SqlOperandTypeChecker operandTypeChecker,
        SqlFunctionCategory category) {
        // We leave sqlIdentifier as null to indicate
        // that this is a built-in.
        this(name, null, kind, returnTypeInference, operandTypeInference,
             operandTypeChecker, category);
    
        assert !((category == SqlFunctionCategory.USER_DEFINED_CONSTRUCTOR)
                 && (returnTypeInference == null));
    }
    

    参数的含义为:

    • 函数名称
    • 函数细分类型
    • 返回值推断策略。包含数据转换规则和数据类型转换规则
    • 操作数类型推断策略。根据callBinding, returnType, operandTypes来推断操作数的类型
    • SQL函数的归类,比SQLKind的分类要粗

    FlinkReturnTypes

    定义返回类型推断方式。有些特殊的函数,比如说ROUND四舍五入,他的返回类型可能为BigDecimal(如果传入数据类型为BigDecimal的话)。BigDecimal是一个复合类型,它有两个参数precision(精度)和scale(小数点后面位数)。这两个参数是需要根据ROUND函数传入的数据和参数实时计算的。因此需要一个专门定义一段返回值类型推断逻辑。

    /** ROUND(num [,len]) type inference. */
    public static final SqlReturnTypeInference ROUND_FUNCTION =
        new SqlReturnTypeInference() {
            @Override
            public RelDataType inferReturnType(SqlOperatorBinding opBinding) {
                // 获取操作数的类型
                final RelDataType numType = opBinding.getOperandType(0);
                // 如果操作数的类型不是BigDecimal,无需推断,直接返回原类型
                if (numType.getSqlTypeName() != SqlTypeName.DECIMAL) {
                    return numType;
                }
                // lenVal为需要四舍五入到第几位
                final BigDecimal lenVal;
                // 获取参数长度,ROUND函数两个参数,第一个为操作数,第二个为四舍五入保留小数位数
                if (opBinding.getOperandCount() == 1) {
                    // 如果ROUND没指明四舍五入到第几位,则只保留整数部分
                    lenVal = BigDecimal.ZERO;
                } else if (opBinding.getOperandCount() == 2) {
                    // 获取保留小数点位数
                    lenVal = getArg1Literal(opBinding); // may return null
                } else {
                    throw new AssertionError();
                }
                if (lenVal == null) {
                    return numType;
                }
                // ROUND( decimal(p,s), r )
                // 分别获取原操作数经度,小数点后位数和四舍五入保留位数
                final int p = numType.getPrecision();
                final int s = numType.getScale();
                final int r = lenVal.intValueExact();
                // 计算出新的Decimal类型,包含precision和scale
                DecimalType dt = LogicalTypeMerging.findRoundDecimalType(p, s, r);
                return opBinding
                        .getTypeFactory()
                        .createSqlType(SqlTypeName.DECIMAL, dt.getPrecision(), dt.getScale());
            }
    
            private BigDecimal getArg1Literal(SqlOperatorBinding opBinding) {
                try {
                    return opBinding.getOperandLiteralValue(1, BigDecimal.class);
                } catch (Throwable e) {
                    return null;
                }
            }
        };
    

    FunctionGenerator

    FunctionGenerator负责代码生成,即解析SQL的时候,将SQL中的方法调用翻译为Java/Scala代码中的真实可执行的处理逻辑。它通过addSqlFunctionMethod方法告诉SQL解析器,如何将某个SQL function翻译为对应的处理逻辑。

    addSqlFunctionMethod(
        ROUND,
        Seq(DECIMAL, INTEGER),
        BuiltInMethods.ROUND_DEC)
    

    第一个参数ROUND为前面定义的SqlFunction。第二个参数为SQL中ROUND函数调用的两个参数类型,第三个参数为ROUND方法的真正执行逻辑,该逻辑是Method类型,通过Java反射来获取。

    BuiltInMethods

    定义如何去查找实现方法。需要指定方法所在class,方法名和参数列表。比如说上面例子中的ROUND_DEC方法,它负责四舍五入BigDecimal类型数据。通过Types.lookupMethod使用Java反射方式找到Method对象并返回。

    val ROUND_DEC = Types.lookupMethod(classOf[SqlFunctionUtils], "sround",
        classOf[DecimalData], classOf[Int])
    

    通过代码不难理解,要查找的ROUND方法位于SqlFunctionUtils类中,名称为sround,参数列表的两个参数分别为(DecimalData, Int)类型。

    我们去SqlFunctionUtils类,找到复合条件的那个方法:

    public static BigDecimal sround(BigDecimal b0, int b1) {
        return b0.movePointRight(b1).setScale(0, RoundingMode.HALF_UP).movePointLeft(b1);
    }
    

    在上面的代码中,我们可以看到Decimal类型的四舍五入操作具体是怎么操作的。

    本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

    相关文章

      网友评论

        本文标题:Flink 源码之 SQL 内置function定义方式

        本文链接:https://www.haomeiwen.com/subject/radozltx.html