美文网首页
如何在Calcite中注册函数

如何在Calcite中注册函数

作者: ni_d58f | 来源:发表于2019-05-12 12:37 被阅读0次

    很多同学在使用Calcite的过程中需要自定义函数, 现在讲讲如何定自义函数

    1. Calcite 内置函数和对应的流程

    Calcite中内置的函数主要在SqlStdOperatorTable中, 包括常见的算术运算符、时间函数等。现在就以一个列子来说明在SqlStdOperatorTable 中添加函数以达到注册函数的功能

    1. SqlStdOperatorTable.java 中添加对应函数
    public static final SqlFunction TIMESTR2LONG = new SqlFunction(
              new SqlIdentifier("TIMESTR2LONG", SqlParserPos.ZERO),
              //返回值为Long, 可以为NULL
              ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.BIGINT), SqlTypeTransforms.TO_NULLABLE),
              //输入类型推测为Varchar, 即java中的String
              InferTypes.VARCHAR_1024,
             //类型检查,如果类型不是String, 报错
              OperandTypes.family(SqlTypeFamily.STRING),
              Lists.newArrayList(new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT).createSqlType(SqlTypeName.VARCHAR)),
              SqlFunctionCategory.USER_DEFINED_FUNCTION);
    
    1. 在自已的代码中将Parser的OperatorTable 设置成上述 SqlStdOperatorTable
    public class FunctionTestOne {
        public static final SqlTypeFactoryImpl TYPE_FACTORY = new SqlTypeFactoryImpl(
                RelDataTypeSystem.DEFAULT);
        public static final RelDataTypeSystem TYPE_SYSTEM = RelDataTypeSystem.DEFAULT;
    
        public static void main(String[] args) {
            CalciteSchema rootSchema = CalciteSchema
                    .createRootSchema(false, false);
    
            //添加表Test
            rootSchema.add("test", new AbstractTable() {
                @Override
                public RelDataType getRowType(RelDataTypeFactory typeFactory) {
                    RelDataTypeFactory.Builder builder = new RelDataTypeFactory
                            .Builder(TYPE_FACTORY);
                    //列id, 类型int
                    builder.add("id", new BasicSqlType(TYPE_SYSTEM, SqlTypeName.INTEGER));
                    //列name, 类型为varchar
                    builder.add("name", new BasicSqlType(TYPE_SYSTEM, SqlTypeName.VARCHAR));
                    builder.add("time_str", new BasicSqlType(TYPE_SYSTEM, SqlTypeName.VARCHAR));
                    return builder.build();
                }
            });
    
            SqlParser.ConfigBuilder builder = SqlParser.configBuilder();
            //以下需要设置成大写并且忽略大小写
            builder.setQuotedCasing(Casing.TO_UPPER);
            builder.setUnquotedCasing(Casing.TO_UPPER);
            builder.setCaseSensitive(false);
    
            final FrameworkConfig config = Frameworks.newConfigBuilder()
                    .defaultSchema(rootSchema.plus())
                    .parserConfig(builder.build())
                    //注意用你自已的SqlStdOperatorTable, 此处之所以同名
                    //目的是覆盖calcite中SqlStdOperatorTable
                    .operatorTable(SqlStdOperatorTable.instance())
                    .build();
    
            Planner planner = Frameworks.getPlanner(config);
    
            //now start to parser
    
            try {
                SqlNode originSqlNode = planner.parse("select name, timestr2long(time_str) from test where id < 5");
                SqlNode sqlNode = planner.validate(originSqlNode);
                RelRoot root = planner.rel(sqlNode);
                System.out.println(RelOptUtil.toString(root.rel, ALL_ATTRIBUTES));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
    

    稍微解释一下上述代码中主要步骤

    1. 创建rootSchema, 你可以把其理解为Linux中root目录, 当往rootSchema添加table是相当于添加一个普通文件,当往rootSchema添加schema时相当于添加一个目录, 相应的rootSchema就是整个数据库的根,可以往根数据库中添加函数、数据库、表等,添加的子数据库又可以把函数、数据库、表放置其中,不断递归

    2. 然后往rootSchema添加表后,然后设置Parser和OperatorTable 后生成Planner, 至于中间是怎么用的,后面会详细说明

    3. 最后进行Parser/Validate并转化成RelNode

    现在简要说明一下函数注册主要过程

    Parser阶段,在Parser.jj文件中 有以下内容:

    LOOKAHEAD( [<SPECIFIC>] FunctionName() <LPAREN>)
    e = NamedFunctionCall()
    
    // NamedFunctionCall的主要内容如下
    ...
    SqlNode NamedFunctionCall() :
        qualifiedName = FunctionName() 
        createCall(qualifiedName, s.end(this), funcType, quantifier, args);
    ...
    

    FunctionName() 主要是获取函数名,如例中的函数名 timestr2long或者是Calcite内置的一些固定的函数名,如ABS, AVG等
    createCall()是生成SqlCall(函数的主要形式), 而createCall()最终会调用

     protected SqlCall createCall(
          SqlIdentifier funName,
          SqlParserPos pos,
          SqlFunctionCategory funcType,
          SqlLiteral functionQualifier,
          SqlNode[] operands) {
        SqlOperator fun = null;
    
        // First, try a half-hearted resolution as a builtin function.
        // If we find one, use it; this will guarantee that we
        // preserve the correct syntax (i.e. don't quote builtin function
        /// name when regenerating SQL).
        if (funName.isSimple()) {
          final List<SqlOperator> list = new ArrayList<>();
          //这里opTab的值为SqlStdOperatorTable.instance();
          opTab.lookupOperatorOverloads(funName, funcType, SqlSyntax.FUNCTION, list);
          if (list.size() == 1) {
            fun = list.get(0);
          }
        }
    
        // Otherwise, just create a placeholder function.  Later, during
        // validation, it will be resolved into a real function reference.
        if (fun == null) {
          fun = new SqlUnresolvedFunction(funName, null, null, null, null,
              funcType);
        }
    
        return fun.createCall(functionQualifier, pos, operands);
      }
    

    经过以上步骤可以发现注册过程流程为:

    1. 在SqlStdOperatorTable按照格式注册自已的函数名
    2. 在Parser过程中,会根据函数名自动在SqlStdOperatorTable查找对应函数
    3. 如果没有找到,会自动将函数解析成SqlUnresolvedFunction

    那问题来了,SqlUnresolvedFunction什么时候会解析成具体的函数?答案是: Validate阶段, 在Validate阶段,Calcite 会针对函数做以下工作

    1. 根据函数名在所有函数注册表中查找函数(之所以说是所有函数注册表是因为Calcite还可以在其它地方注册函数, 后面的ChainedSqlOperatorTable, ListSqlOperatorTable等)查找函数
    2. 验证函数的入参:包话参数类型、个数、函数类型等
    3. 递归验证每个入参是否合法,具体过程后续会详细说明

    其它方式注册函数

    上述注册函数略微tricky, 需要要本地搭建一个org.apache.calcite.sql.fun包并且从calcite-core中复制SqlStdOperatorTable.java的内容以覆盖内置的SqlStdOperatorTable,最后添加自已的函数,通过这种方式实现并不那么优雅,其实还有其它相对优雅的方式进行函数注册,主要有以下两种方式

    1. 修改Parser.jj 注册

    假如我要实现以下两个方法

        public static Integer func1(String s) {
            return s == null ? null : Integer.valueOf(s);
        }
    
        public static String func2(Integer i) {
            return i == null ? null : String.valueOf(i);
        }
    

    要在Calcite中注册以上两个函数,只需要在parser.jj中添加以下内容, 见代码

    /*
    *   Create User-defined function
    */
    SqlNode extraFunction() :
    {
        SqlNode node;
        SqlNode e;
        List<SqlNode> args = null;
        final Span s;
    }
    {   (
            (
                <FUNC1> { s = span(); }
                <LPAREN>
                e = Expression(ExprContext.ACCEPT_NON_QUERY)
                {
                    startList(e);
                }
                <LPAREN>
                {
                    node = FunctionUtil.FUNC1.createCall(s.end(this), args);
                }
            )
            |
            (
                <FUNC2> { s = span(); }
                <LPAREN>
                e = Expression(ExprContext.ACCEPT_NON_QUERY)
                {
                    startList(e);
                }
                <LPAREN>
                {
                    node = FunctionUtil.FUNC2.createCall(s.end(this), args);
                }
    
            )
        )
        {
            return node;
        }
    }
    
    
    <DEFAULT, DQID, BTID> TOKEN :
    ...
    |   < FUNCTION: "FUNCTION" >
    |   < FUNC1: "FUNC1"> //添加的内容
    |   < FUNC2: "FUNC2"> //添加的内容
    ...
    

    FunctionUtil 内容如下, 见代码

    public class FunctionUtil {
    
        public static final SqlFunction FUNC1 = new SqlFunction(
                new SqlIdentifier("FUNC1", SqlParserPos.ZERO),
                ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.INTEGER), SqlTypeTransforms.TO_NULLABLE),
                InferTypes.VARCHAR_1024,
                OperandTypes.family(SqlTypeFamily.STRING),
                Lists.newArrayList(TYPE_FACTORY.createSqlType(SqlTypeName.VARCHAR)),
                SqlFunctionCategory.USER_DEFINED_FUNCTION);
    
        public static final SqlFunction FUNC2 = new SqlFunction(
                new SqlIdentifier("FUNC2", SqlParserPos.ZERO),
                ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.VARCHAR), SqlTypeTransforms.TO_NULLABLE),
                InferTypes.FIRST_KNOWN,
                OperandTypes.family(SqlTypeFamily.INTEGER),
                Lists.newArrayList(TYPE_FACTORY.createSqlType(SqlTypeName.INTEGER)),
                SqlFunctionCategory.USER_DEFINED_FUNCTION);
    }
    

    最后的测试代码如下,见代码

                //now test func
                //将创建的函数放入SqlOperatorTable()中
                ListSqlOperatorTable listSqlOperatorTable = new ListSqlOperatorTable();
                listSqlOperatorTable.add(FUNC1);
                listSqlOperatorTable.add(FUNC2);
    
                final FrameworkConfig funcConfig = Frameworks.newConfigBuilder()
                        .defaultSchema(rootSchema.plus())
                        .parserConfig(builder.build())
                        //添加一个专们用于添加函数的 listSqlOperatorTable
                        .operatorTable(ChainedSqlOperatorTable.of(listSqlOperatorTable,
                                SqlStdOperatorTable.instance()))
                        .build();
    
                Planner planner2 = Frameworks.getPlanner(funcConfig);
                SqlNode func1SqlNodeOrg = planner2.parse("select func1(name) from test where id > 4");
                SqlNode func1SqlNode = planner2.validate(func1SqlNodeOrg);
                RelRoot func1Root = planner2.rel(func1SqlNode);
                System.out.println("-------- func1 test -------");
                System.out.println(RelOptUtil.toString(func1Root.rel, ALL_ATTRIBUTES));
    
                Planner planner3 = Frameworks.getPlanner(funcConfig);
                SqlNode func2SqlNodeOrg = planner3.parse("select func2(id) from test where id > 4");
                SqlNode func2SqlNode = planner3.validate(func2SqlNodeOrg);
                RelRoot func2Root = planner3.rel(func2SqlNode);
                System.out.println("-------- func2 test -------");
                System.out.println(RelOptUtil.toString(func2Root.rel, ALL_ATTRIBUTES));
    

    2. Schema 注册

    细心的读者可以发现,以上两种方式都要对calcite做侵入式修改。内置方法需要覆盖SqlStdOpeartorTable, 而修改Parser.jj则需要自已书写Parser逻辑,并在Validate阶段注册函数。考虑到一下场景,需要类似于Hive 注册UDF那样,动态的注册函数,上述两种方式是无法实现的,那如何实现动态注册函数呢? 用schema 注册, 下面以一个例子说明, 代码在

     //主要代码
     public static RelRoot sqlToRelNode(String sql) {
    
            try {
                SchemaPlus plus = ROOT_SCHEMA.plus();
                plus.add("FUNC1", ScalarFunctionImpl.create(
                        FunctionUtil.class, "func1"));
    
                plus.add("FUNC2", ScalarFunctionImpl.create(
                        FunctionUtil.class, "func2"));
    
    
                SqlParser parser = SqlParser.create(sql, config.getParserConfig());
                SqlNode sqlNode = parser.parseStmt();
    
                //这里需要注意大小写问题,否则表会无法找到
                Properties properties = new Properties();
                properties.setProperty(CalciteConnectionProperty.CASE_SENSITIVE.camelName(),
                        String.valueOf(config.getParserConfig().caseSensitive()));
    
                CalciteCatalogReader calciteCatalogReader =  new CalciteCatalogReader(
                        CalciteSchema.from(rootSchema(plus)),
                        CalciteSchema.from(config.getDefaultSchema()).path(null),
                        TYPE_FACTORY,
                        new CalciteConnectionConfigImpl(properties));
    
                //to supported user' define function
                SqlOperatorTable sqlOperatorTable = ChainedSqlOperatorTable
                        .of(config.getOperatorTable(), calciteCatalogReader);
    
                TestSqlValidatorImpl validator = new TestSqlValidatorImpl(
                        sqlOperatorTable,
                        calciteCatalogReader,
                        TYPE_FACTORY,
                        SqlConformanceEnum.DEFAULT);
    
                //try to union trait set
                //addRelTraitDef for is HepPlanner has not effect in fact
                VolcanoPlanner volcanoPlanner = new VolcanoPlanner();
                SqlNode validateSqlNode = validator.validate(sqlNode);
                final RexBuilder rexBuilder = new RexBuilder(TYPE_FACTORY);
                RelOptCluster cluster = RelOptCluster.create(volcanoPlanner, rexBuilder);
    
                final SqlToRelConverter.Config sqlToRelConverterConfig
                        = SqlToRelConverter.configBuilder()
                        .withConfig(config.getSqlToRelConverterConfig())
                        .withTrimUnusedFields(false)
                        .withConvertTableAccess(false)
                        .build();
    
                final SqlToRelConverter sqlToRelConverter =
                        new SqlToRelConverter(null, validator,
                                calciteCatalogReader, cluster, config.getConvertletTable(),
                                sqlToRelConverterConfig);
    
                RelRoot root =
                        sqlToRelConverter.convertQuery(validateSqlNode, false, true);
                root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true));
                final RelBuilder relBuilder = sqlToRelConverterConfig
                        .getRelBuilderFactory().create(cluster, null);
    
                //change trait set of TableScan
                return root.withRel(
                        RelDecorrelator.decorrelateQuery(root.rel, relBuilder));
    
            } catch (Exception e) {
                e.printStackTrace();
                throw new RuntimeException(e);
            }
        }
    

    Schema 注册相对于前两者来说过程复杂一点,相好处是可以定制化, 主要有以下特点

    1. 随时注册 随时使用
    2. 引入了schema空间,同一个function可以注册到不同的schema并可以隔离

    采用Schema 注册的主要步骤在CalciteCatalogReader读取schema, 限于篇幅,我会专门用一个篇文章详细说明CalciteCatalogReader

    3. 总结

    以上为三种方式在Calcite注册函数,那么这三种有什么区别

    1. 从灵活度来说,前两种不如通过schema注册,而且通过schema注册也不需要去修改calcite核心代码,适合于初学者使用。更重要的是通过schema注册可以实现函数隔离,可以实现不同数据库级别的函数之持。
    2. 从实现容易度来说, 第一种更为简单,只需要简单修改一个内置的表即可

    那么对于一个项目来说 采用哪里方式更好?我的建议是

    1. 如果需要动态注册或者schema级别函数隔离,建议采用第三种
    2. 如果函数相对固定而且函数数量较多,建议采用第一种
    3. 如果需要更加细致定制函数且函数数量不多,可以采用第二种

    相关文章

      网友评论

          本文标题:如何在Calcite中注册函数

          本文链接:https://www.haomeiwen.com/subject/yhitaqtx.html