美文网首页Flink精选学习
【Flink SQL】如何利用 Calcite 扩展 Flink

【Flink SQL】如何利用 Calcite 扩展 Flink

作者: 熊本极客 | 来源:发表于2022-08-11 23:13 被阅读0次

    1.Calcite 如何实现 sql 语法的解析

    Calcite 使用 javacc 作为语法解析器。如下所示,freemarker 将配置文件 config.fmpp 即指定文件为 codegen/data/Parser.tdd、附加模板文件 codegen/includes/parserImpls.ftl、模板文件 codegen/templates/Parser.jj。上述文件输入 FMPP 后, 会组合生成一个可用的 Parser.jj 文件,即Calcite 的 SQL 解析器语法规则文件。Parser.jj 文件输入 javacc 会生成一个继承自 SqlAbstractParserImplSqlParserImpl 类,即 Calcite 中真正负责解析 SQL 语句并生成 SqlNode 树的类。

    codegen
    ├── config.fmpp
    ├── default_config.fmpp
    ├── includes
    │   ├── compoundIdentifier.ftl
    │   └── parserImpls.ftl
    └── templates
        └── Parser.jj
    
    calcite扩展sql语法.JPG

    扩展内容JavaCC 语法解析并生成抽象语法树

    2.如何扩展 Flink Function SQL

    如何扩展语法 CREAT FUNCTION ?

    CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
      [IF NOT EXISTS] [[catalog_name.]db_name.]function_name
      AS identifier [LANGUAGE JAVA|SCALA|PYTHON]
    

    在指定 catalog.database 中创建 function ,需指定一个 identifier ,可指定 language。 若 catalog 中,已经有同名的函数注册了,则无法注册。

    ① 如果 language 是 JAVA 或者 SCALA ,则 identifier 是 UDF 实现类的全限定名。关于 JAVA/SCALA UDF 的实现,请参考 自定义函数
    ② 如果 language 是 PYTHON,则 identifier 是 UDF 对象的全限定名,例如 pyflink.table.tests.test_udf.add。关于 PYTHON UDF 的实现,请参考 Python UDFs
    ③ 如果 language 是 PYTHON,而当前程序是 Java/Scala 程序或者纯 SQL 程序,则需要配置 Python 相关的依赖

    步骤 1:添加 codegen 文件夹到 src/main 目录,如下所示。

    codegen
    ├── config.fmpp
    ├── default_config.fmpp
    ├── includes
    │   ├── compoundIdentifier.ftl
    │   └── parserImpls.ftl
    └── templates
        └── Parser.jj
    

    步骤 2:扩展语法 CREAT FUNCTION
    ① 在 codegen/includes/parserImpls.ftl 定义 create function 的语法规则,如下所示

    SqlCreate SqlCreateFunction(Span s, boolean replace, boolean isTemporary) :
    {
        SqlIdentifier functionIdentifier = null;
        SqlCharStringLiteral functionClassName = null;
        String functionLanguage = null;
        boolean ifNotExists = false;
        boolean isSystemFunction = false;
    }
    {
        (
            <SYSTEM> <FUNCTION>
            ifNotExists = IfNotExistsOpt()
            functionIdentifier = SimpleIdentifier()
            {  isSystemFunction = true; }
        |
            <FUNCTION>
            ifNotExists = IfNotExistsOpt()
            functionIdentifier = CompoundIdentifier()
        )
    
        <AS> <QUOTED_STRING> {
            String p = SqlParserUtil.parseString(token.image);
            functionClassName = SqlLiteral.createCharString(p, getPos());
        }
        [<LANGUAGE>
            (
                <JAVA>  { functionLanguage = "JAVA"; }
            |
                <SCALA> { functionLanguage = "SCALA"; }
            |
                <SQL>   { functionLanguage = "SQL"; }
            |
                <PYTHON>   { functionLanguage = "PYTHON"; }
            )
        ]
        {
            return new SqlCreateFunction(s.pos(), functionIdentifier, functionClassName, functionLanguage,
                    ifNotExists, isTemporary, isSystemFunction);
        }
    }
    

    ② 扩展 SqlCreate

    规则匹配成功返回一个 SqlCreateFunction 节点,作为解析树中的 SqlNode

    public class SqlCreateFunction extends SqlCreate {
    
        public static final SqlSpecialOperator OPERATOR =
                new SqlSpecialOperator("CREATE FUNCTION", SqlKind.CREATE_FUNCTION);
    
        private final SqlIdentifier functionIdentifier;
    
        private final SqlCharStringLiteral functionClassName;
    
        private final String functionLanguage;
    
        private final boolean isTemporary;
    
        private final boolean isSystemFunction;
    
        public SqlCreateFunction(
                SqlParserPos pos,
                SqlIdentifier functionIdentifier,
                SqlCharStringLiteral functionClassName,
                String functionLanguage,
                boolean ifNotExists,
                boolean isTemporary,
                boolean isSystemFunction) {
            super(OPERATOR, pos, false, ifNotExists);
            this.functionIdentifier = requireNonNull(functionIdentifier);
            this.functionClassName = requireNonNull(functionClassName);
            this.isSystemFunction = isSystemFunction;
            this.isTemporary = isTemporary;
            this.functionLanguage = functionLanguage;
        }
    
        @Override
        public SqlOperator getOperator() {
            return OPERATOR;
        }
    
        @Nonnull
        @Override
        public List<SqlNode> getOperandList() {
            return ImmutableNullableList.of(functionIdentifier, functionClassName);
        }
    
        @Override
        public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
            writer.keyword("CREATE");
            if (isTemporary) {
                writer.keyword("TEMPORARY");
            }
            if (isSystemFunction) {
                writer.keyword("SYSTEM");
            }
            writer.keyword("FUNCTION");
            if (ifNotExists) {
                writer.keyword("IF NOT EXISTS");
            }
            functionIdentifier.unparse(writer, leftPrec, rightPrec);
            writer.keyword("AS");
            functionClassName.unparse(writer, leftPrec, rightPrec);
            if (functionLanguage != null) {
                writer.keyword("LANGUAGE");
                writer.keyword(functionLanguage);
            }
        }
       //...省略
    }
    

    ③ 将新增的语法规则,添加到配置文件 codegen/data/Parser.tdd
    imports 增加 SqlCreateFunction 类,statementParserMethods 增加定义的规则方法

      package: "org.apache.flink.sql.parser.impl",
      class: "FlinkSqlParserImpl",
    
     imports: [
        "org.apache.flink.sql.parser.ddl.SqlCreateFunction"
     ]
    
      statementParserMethods: [
        "SqlCreateFunction()"
      ]
    
      implementationFiles: [
        "parserImpls.ftl"
      ]
    

    mvn clean compile 编译后,自动生成 FlinkSqlParserImpl.java

    相关文章

      网友评论

        本文标题:【Flink SQL】如何利用 Calcite 扩展 Flink

        本文链接:https://www.haomeiwen.com/subject/iekjwrtx.html