美文网首页
FlinkSQL 语法扩展

FlinkSQL 语法扩展

作者: todd5167 | 来源:发表于2020-12-05 17:56 被阅读0次

    FlinkSQL 语法扩展

    参考flink-sql-parser模块了解下Flink如何扩展Calcite语法,创建空项目进行语法扩展。首先,拷贝codegen文件夹下的内容,调整生成解析器实现类包名称和类名称,删除imports中的内容及Flink引入的解析方法。然后,将pom中javcc编译、freemarker相关的插件全部拷贝过来。

    DQL

    show tables

    扩展show tables语法,当匹配到show tables语句后最终生成SqlShowTables。因此,需要扩展SqlNode构建SqlShowTables,同时从parserImpls.ftl中定义解析规则,还要从Parser.tdd中增加TABLES 关键字不然定义的规则无法识别,同时statementParserMethods中引入解析规则方法。

    扩展SqlNode

    规则匹配成功返回一个SqlShowTables节点,作为解析树中的sqlnode。

    package cn.todd.flink.sql.parser.dql;
    // show tables 语句解析后生成的SqlNode
    public class SqlShowTables extends SqlCall {
        public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("SHOW TABLES", SqlKind.OTHER);
        public SqlShowTables(SqlParserPos pos) {
            super(pos);
        }
        @Override
        public SqlOperator getOperator() {
            return OPERATOR;
        }
        @Override
        public List<SqlNode> getOperandList() {
            return Collections.emptyList();
        }
        @Override
        public void unparse(
                SqlWriter writer,
                int leftPrec,
                int rightPrec) {
            writer.keyword("SHOW TABLES");
        }
    }
    

    定义规则

    从parserImpls.ftl定义解析show tables的规则,包含了方法名及对应的规则。 如果javacc不想花太多时间系统学习的,可以根据生成的SqlParserImpl类生成的方法逆向学习。
    SqlShowTables()理解成包裹解析规则的方法,输出的字符匹配 show tables关键字,如果匹配到则返回SqlShowTables(Sqlnode)。

    /**
    * Parse a "Show Tables" metadata query command.
    * 匹配 SHOW TABLES 关键字,如果匹配到返回SqlShowTables sqlNode
    */
    SqlShowTables SqlShowTables() :
    {
    }
    {
        <SHOW> <TABLES>
        {
            return new SqlShowTables(getPos());
        }
    }
    

    应用规则

    SHOW关键字已经从Parser.jj文件中定义,所以需要扩展TABLES关键字才能正确匹配SHOW TABLES。Parser.tdd中的扩展:

    1. 新增TABLES关键字。
    2. imports增加SqlShowTables类。因为sql语法解析全部由SqlParserImpl类完成,SqlShowTables()返回值为自己定义的SqlShowTables。
    3. statementParserMethods增加定义的规则方法。新规则给Parser使用。
     keywords: ["TABLES"] 
     imports: [
        "cn.todd.flink.sql.parser.dql.SqlShowTables"
      ]
    statementParserMethods:[
       "SqlShowTables()"
    ]
     
    

    执行mvn -X clean compile生成定义的解析类SqlParserImpl.java。重点看生成的SqlShowTables(),词法解析器成功消费两个连续的TOKEN SHOW``TABLES 则创建SqlShowTables

    /**
    * Parse a "Show Tables" metadata query command.
    * 匹配 SHOW TABLES 关键字,如果匹配到返回SqlShowTables sqlNode
    */
      final public SqlShowTables SqlShowTables() throws ParseException {
        jj_consume_token(SHOW);
        jj_consume_token(TABLES);
        {if (true) return new SqlShowTables(getPos());}
        throw new Error("Missing return statement in function");
      }
    
    

    测试解析

    protected SqlNode parseStmtAndHandleEx(String sql) {
        final SqlNode sqlNode;
        try {
            sqlNode = getSqlParser(sql).parseStmt();
        } catch (SqlParseException e) {
            throw new RuntimeException("Error while parsing SQL: " + sql, e);
        }
        return sqlNode;
    }
    
    private SqlParser getSqlParser(String sql) {
        return SqlParser.create(sql, SqlParser.configBuilder()
                                //myself SqlParserImpl
                                .setParserFactory(SqlParserImpl.FACTORY)
                                .setQuoting(Quoting.BACK_TICK)
                                .setUnquotedCasing(Casing.UNCHANGED)
                                .setQuotedCasing(Casing.UNCHANGED)
                                .build());
    }
    
    
    public class dqlStatementTest extends BaseParser {
    
        @Test
        public void testShowTables() throws SqlParseException {
            String sql = "show tables";
            SqlNode sqlNode = parseStmtAndHandleEx(sql);
            Assert.assertTrue(sqlNode instanceof SqlShowTables);
        }
    }
    

    show current catalog(database)

    解析show current catalog,show current database。

    扩展SqlNode

    同org.apache.flink.sql.parser.dql.SqlShowCurrentCatalog,
    同org.apache.flink.sql.parser.dql.SqlShowCurrentDatabase

    定义规则

    ( <TokenA> do.. | <TokenB> do.. ) javacc if语法格式。

    SqlCall SqlShowCurrentCatalogOrDatabase() :
    {
    }
    {
        <SHOW> <CURRENT> ( <CATALOG>
            {
                return new SqlShowCurrentCatalog(getPos());
            }
        | <DATABASE>
            {
                return new SqlShowCurrentDatabase(getPos());
            }
        )
    }
    

    应用规则

    Parser.tdd中的扩展:

    1. import: ["xxx.SqlShowCurrentCatalog" "xxx.SqlShowCurrentDatabase"]
    2. statementParserMethods:[ "SqlShowCurrentCatalogOrDatabase()" ]

    执行mvn -X clean compile生成定义的解析类SqlParserImpl.java,看 SqlShowCurrentCatalogOrDatabase()方法具体的解析流程。

    /**
    * Parse a "show current catalog or show current  database" metadata query command.
    *
    */
      final public SqlCall SqlShowCurrentCatalogOrDatabase() throws ParseException {
        // 连续消费 SHOW CURRENT
        jj_consume_token(SHOW);
        jj_consume_token(CURRENT);
        // 下一个Token如果是CATALOG 生成SqlShowCurrentDatabase,否则生成SqlShowCurrentDatabase
        switch ((jj_ntk==-1)?jj_ntk():jj_ntk) {
        case CATALOG:
          jj_consume_token(CATALOG);
                {if (true) return new SqlShowCurrentCatalog(getPos());}
          break;
        case DATABASE:
          jj_consume_token(DATABASE);
            {if (true) return new SqlShowCurrentDatabase(getPos());}
          break;
        default:
          jj_la1[22] = jj_gen;
          jj_consume_token(-1);
          throw new ParseException();
        }
        throw new Error("Missing return statement in function");
      }
    

    测试解析

    @Test
    public void testShowCurrentCatalogOrDatabase() throws SqlParseException {
        String catalogSql = "show current catalog";
        SqlNode catalogSqNode = parseStmtAndHandleEx(catalogSql);
        Assert.assertTrue(catalogSqNode instanceof SqlShowCurrentCatalog);
    
    
        String databaseSql = "show current database";
        SqlNode databaseSqlNode = parseStmtAndHandleEx(databaseSql);
        Assert.assertTrue(databaseSqlNode instanceof SqlShowCurrentDatabase);
    }
    

    describe catalog

    扩展SqlNode

    同org.apache.flink.sql.parser.dql.SqlDescribeCatalog

    定义规则

    SimpleIdentifier()规则为parser.jj已经定义,DESCRIBE CATALOG后边提取catalogName,并标识解析位置getPos();

    /**
    *  Parse  describe catalog xx.   SimpleIdentifier() defined in parser.jj
    */
    SqlDescribeCatalog SqlDescribeCatalog() :
    {
        SqlIdentifier catalogName;
        SqlParserPos pos;
    }
    {
        <DESCRIBE> <CATALOG> { pos = getPos();}
        catalogName = SimpleIdentifier()
        {
            return new SqlDescribeCatalog(pos, catalogName);
        }
    }
    

    应用规则

    Parser.tdd中的扩展:xx 为包名

    1. import: [ "xxx.SqlDescribeCatalog"]
    2. statementParserMethods:[ "SqlDescribeCatalog()" ]
      执行mvn -X clean compile生成定义的解析类SqlParserImpl.java,查看 SqlDescribeCatalog()方法具体的解析流程。
    /**
    *
    *  Parse  describe catalog xx.   SimpleIdentifier() defined in parser.jj
    */
      final public SqlDescribeCatalog SqlDescribeCatalog() throws ParseException {
        SqlIdentifier catalogName;
        SqlParserPos pos;
        
        jj_consume_token(DESCRIBE);
        jj_consume_token(CATALOG);
        // 匹配 DESCRIBE CATALOG后,提取pos,解析出catalogName
        pos = getPos();
        catalogName = SimpleIdentifier();
         {if (true) return new SqlDescribeCatalog(pos, catalogName);}
        throw new Error("Missing return statement in function");
      }
    

    测试解析

      @Test
        public void testDescribeCatalog() throws SqlParseException {
            String sql = "describe catalog testCatalog";
            SqlNode sqlNode = parseStmtAndHandleEx(sql);
            Assert.assertTrue(sqlNode instanceof SqlDescribeCatalog);
    
            String catalogName = ((SqlDescribeCatalog) sqlNode).getCatalogName();
            Assert.assertEquals(catalogName,"testCatalog");
        }
    

    DDL

    period for system

    首先实现最简单的DDL语句,只包含对表名、列名、类型、属性、维表标识字段的解析,有了最初的框架后扩展watermark字段、计算列、数据类型等语法规则。period for system是Blink最初的维表标识字段,用来区分源表和结果表。语法结构:

    CREATE TABLE user_info (
        userId BIGINT,
        userName VARCHAR(10),
        userAge BIGIN
        PERIOD FOR SYSTEM_TIME
    ) WITH (
        'connector' = 'kafka',
        'properties.bootstrap.servers' = 'localhost:9092',
        'topic' = 'tp01',
        'format' = 'json',
        'scan.startup.mode' = 'latest-offset'
    );
    

    扩展SqlNode

    1. 参考org.apache.flink.sql.parser.ddl.SqlCreateTable实现,侧重于SQL解析部分。只保留最基础的属性信息。
    2. 未实现ExtendedSqlNode#validate,暂时不需要对sqlNode进行验证。Flink执行时会对ExtendedSqlNode进行验证,具体查看org.apache.flink.table.planner.calcite.FlinkPlannerImpl#validate。
    3. unparse方法输出SqlNode具体内容, 如果不重写的话DEBUG或者toString时看不到具体sql语句。
    public class SqlCreateTable extends SqlCreate {
    
        public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE TABLE", SqlKind.CREATE_TABLE);
    
        private final SqlIdentifier tableName;
    
        private final SqlNodeList columnList;
    
        private final SqlNodeList propertyList;
    
        private final boolean sideFlag;
    
        public SqlCreateTable(
                SqlParserPos pos,
                SqlIdentifier tableName,
                SqlNodeList columnList,
                SqlNodeList propertyList,
                boolean sideFlag) {
            super(OPERATOR, pos, false, false);
            this.tableName = requireNonNull(tableName, "Table name is missing");
            this.columnList = requireNonNull(columnList, "Column list should not be null");
            this.propertyList = propertyList;
            this.sideFlag = sideFlag;
        }
    
        @Override
        public SqlOperator getOperator() {
            return OPERATOR;
        }
    
        @Override
        public List<SqlNode> getOperandList() {
            return ImmutableNullableList.of(tableName, columnList, propertyList);
        }
        ...........
        /**
         * Table creation context.
         */
        public static class TableCreationContext {
            public List<SqlNode> columnList = new ArrayList<>();
            public boolean sideFlag;
        }
    }
    

    参考引入:org.apache.flink.sql.parser.ddl.SqlTableColumn代表解析后的字段,包含名称和类型。
    参考引入:org.apache.flink.sql.parser.ddl.SqlTableOption 代表属性值
    创建FlinkSqlDataTypeSpec代表数据类型,使用calcite原始的类型,不做任何数据类型的扩展。

    public class FlinkSqlDataTypeSpec extends SqlDataTypeSpec {
        public FlinkSqlDataTypeSpec(SqlIdentifier typeName, int precision, int scale,
                                    String charSetName, TimeZone timeZone,
                                    SqlParserPos pos) {
            super(typeName, precision, scale, charSetName, timeZone, pos);
        }
    
        public FlinkSqlDataTypeSpec(SqlIdentifier collectionsTypeName, SqlIdentifier typeName, int precision, int scale,
                                    String charSetName, SqlParserPos pos) {
            super(collectionsTypeName, typeName, precision, scale, charSetName, pos);
        }
    
        public FlinkSqlDataTypeSpec(SqlIdentifier collectionsTypeName, SqlIdentifier typeName, int precision, int scale,
                                    String charSetName, TimeZone timeZone, Boolean nullable,
                                    SqlParserPos pos) {
            super(collectionsTypeName, typeName, precision, scale, charSetName, timeZone, nullable, pos);
        }
    
        public FlinkSqlDataTypeSpec(SqlIdentifier collectionsTypeName, SqlIdentifier typeName,
                                    SqlIdentifier baseTypeName, int precision, int scale, String charSetName,
                                    TimeZone timeZone, Boolean nullable, SqlParserPos pos) {
            super(collectionsTypeName, typeName, baseTypeName, precision, scale, charSetName, timeZone, nullable, pos);
        }
    }
    

    定义规则

    围绕create table ..with()结构进行匹配。TableCreationContext对象封装相关属性,在SqlCreateTable中定义。

    // 解析create table语法方法,calcite规定参数必须为(Span s, boolean replace)
    SqlCreate SqlCreateTable(Span s, boolean replace) :
    {   //  定义相关属性,在解析时进行赋值
        final SqlParserPos startPos = s.pos();
        SqlIdentifier tableName;
        SqlNodeList columnList = SqlNodeList.EMPTY;
        SqlNodeList propertyList = SqlNodeList.EMPTY;
        SqlParserPos pos = startPos;
        boolean sideFlag = false;
    }
    {
        <TABLE>
        tableName = CompoundIdentifier()
        
        //<LPAREN> 左括号    <RPAREN>右括号
        <LPAREN> { pos = getPos(); TableCreationContext ctx = new TableCreationContext();}
        TableColumn(ctx)
        (
           //  <COMMA> 表示逗号, 属性列可以有多个使用 ()*表示
            <COMMA> TableColumn(ctx)
        )*
        {
            pos = pos.plus(getPos());
            //  根据解析出的columnList为SqlCreate绑定columnList信息
            columnList = new SqlNodeList(ctx.columnList, pos);
            sideFlag = ctx.sideFlag;
        }
        <RPAREN>
            
        [
            <WITH>
            //  解析出表中的属性值
            propertyList = TableProperties()
        ]
    
        {
            return new SqlCreateTable(startPos.plus(getPos()),
                tableName,
                columnList,
                propertyList,
                sideFlag);
        }
    }
    
    /**
    *  解析表字段列.
    *  LOOKAHEAD(3)规则查看https://blog.csdn.net/qingmengwuhen1/article/details/83313303
    * 
    */
    void TableColumn(TableCreationContext context) :
    {
    }
    {
        // 向下扫描3个token值,进行正确匹配,用来解决选择点冲突
        // 列可能是正常列或者维表标识,Flink还定义了watermark相关规则
        (LOOKAHEAD(3)
            TableColumn2(context.columnList)
        |
            context.sideFlag = SideFlag()
        )
    }
    //  对普通列的解析,e.g: name varchar(10)
    void TableColumn2(List<SqlNode> list) :
    {
        SqlIdentifier name;
        SqlDataTypeSpec type;
    }
    {
        // 复合类型解析xx.xxx.xxx格式
        name = CompoundIdentifier()
        type = FlinkDataType()
        {
            SqlTableColumn tableColumn = new SqlTableColumn(name, type, getPos());
            list.add(tableColumn);
        }
    }
    //   引入对类型规则的解析
    SqlDataTypeSpec FlinkDataType() :
    {
        final SqlIdentifier typeName;
        int scale = -1;
        int precision = -1;
        String charSetName = null;
        boolean nullable = true;
        final Span s;
    }
    {
        typeName = FlinkTypeName() {
            s = span();
        }
        [ // 类型的精确度,egg:(1024) ,(10,12)
            <LPAREN>     
                precision = UnsignedIntLiteral()
                [
                    <COMMA>
                    scale = UnsignedIntLiteral()
                ]
            <RPAREN>
        ]
    
        {
            // 返回对数据类型节点
            return new FlinkSqlDataTypeSpec(typeName,
            precision,
            scale,
            charSetName,
            null,
            s.end(this));
        }
    }
    
    // 对类型名称的解析,egg: bigint,varchar
    SqlIdentifier FlinkTypeName() :
    {
        final SqlTypeName sqlTypeName;
        final SqlIdentifier typeName;
        final Span s = Span.of();
    }
    {
        (
            LOOKAHEAD(2)
            // Types used for JDBC and ODBC scalar conversion function
            sqlTypeName = SqlTypeName(s) 
            {
                typeName = new SqlIdentifier(sqlTypeName.name(), s.end(this));
            }
        |
            typeName = CompoundIdentifier() {
                throw new ParseException("UDT in DDL is not supported yet.");
            }
        )
        {
            return typeName;
        }
    }
    
    /**
    * 维表标识
    */
    boolean SideFlag() :
    {
        SqlParserPos pos;
        SqlIdentifier columnName;
    }
    {
        <PERIOD> { pos = getPos(); } <FOR> <SYSTEM_TIME> { return true; }
        |
        { return false; }
    }
    
    /** Parse a table properties. */
    SqlNodeList TableProperties():
    {
        SqlNode property;
        final List<SqlNode> proList = new ArrayList<SqlNode>();
        final Span span;
    }
    {
        <LPAREN> { span = span(); }
        [
            //  对具体某一个KEY VALUE的解析
            property = TableOption()
            {
                proList.add(property);
            }
            (   // 属性可以有多个,使用逗号分隔
                <COMMA> property = TableOption()
                {
                    proList.add(property);
                }
            )*
        ]
        <RPAREN>
        {  return new SqlNodeList(proList, span.end(this)); }
    }
    
    SqlNode TableOption() :
    {
        SqlNode key;
        SqlNode value;
        SqlParserPos pos;
    }
    {
        key = StringLiteral()
        // TODO SqlParserPos位置有啥要求?    
        { pos = getPos(); }
        //  <EQ> 等号
        <EQ> value = StringLiteral()
        {
            return new SqlTableOption(key, value, getPos());
        }
    }
    

    应用规则

    parser.tdd引入扩展的类以及规则:

    1. imports导入相关类:
      imports: [
        "org.apache.calcite.sql.SqlCreate"
        "cn.todd.flink.sql.parser.ddl.SqlCreateTable"
        "cn.todd.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext"
        "cn.todd.flink.sql.parser.ddl.SqlTableColumn"
        "cn.todd.flink.sql.parser.ddl.SqlTableOption"
        "cn.todd.flink.sql.parser.FlinkSqlDataTypeSpec"
      ]
    
    1. createStatementParserMethods引入对create table解析的规则方法。
    # List of methods for parsing extensions to "CREATE [OR REPLACE]" calls.
    # Each must accept arguments "(SqlParserPos pos, boolean replace)".  
    createStatementParserMethods: [
      "SqlCreateTable" 
    ]
    

    执行mvn -X clean compile生成定义的解析类SqlParserImpl.java,查看方法具体执行逻辑。编译失败时,根据提示修改parserImpls.ftl中的内容,提示信息通常为parser.jj的具体行,可以直接打开generated-sources生成的parser.jj文件内容查看。

    测试解析

    public class ddlStatementTest extends BaseParser {
        private final static Logger LOG = LoggerFactory.getLogger(ddlStatementTest.class);
    
        @Test
        public void testSimpleCreateTable() throws SqlParseException {
            String ddlSql = "CREATE TABLE abc.user_info (\n"
                    + "    userId BIGINT,\n"
                    + "    a.b.userName VARCHAR(10),\n"
                    + "    userAge BIGINT,"
                    + "    PERIOD FOR SYSTEM_TIME"
                    + ") WITH (\n"
                    + "    'connector' = 'kafka',\n"
                    + "    'properties.bootstrap.servers' = 'localhost:9092',\n"
                    + "    'topic' = 'tp01',\n"
                    + "    'format' = 'json',\n"
                    + "    'scan.startup.mode' = 'latest-offset'\n"
                    + ")";
    
            SqlNode ddlSqlNode = parseStmtAndHandleEx(ddlSql);
            Assert.assertTrue(ddlSqlNode instanceof SqlCreateTable);
    
            SqlCreateTable createTable = (SqlCreateTable) ddlSqlNode;
            SqlNodeList columns = createTable.getColumnList();
    
            System.out.println(String.format("tableName: %s",createTable.getTableName()));
            for (SqlNode column : columns) {
                SqlTableColumn tableColumn = (SqlTableColumn) column;
                String columnName = tableColumn.getName().toString();
                String typeName = tableColumn.getType().getTypeName().getSimple();
                System.out.println(String.format("columnName: %s, typeName: %s", columnName, typeName));
            }
    
            SqlNodeList properties = createTable.getPropertyList();
            for (SqlNode sqlNode: properties) {
                SqlNode key = ((SqlTableOption) sqlNode).getKey();
                SqlNode value = ((SqlTableOption) sqlNode).getValue();
                System.out.println(String.format("properties: key:%s, value:%s ", key, value));
            }
    
            boolean sideFlag = createTable.isSideFlag();
            System.out.println(String.format("sideFlag: %s", sideFlag));
    
        }
    }
    

    Computed Column

    扩展SqlNode

    SqlTableColumn增加表达式参数。

    public class SqlTableColumn extends SqlCall {
        private static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("COLUMN_DECL", SqlKind.COLUMN_DECL);
    
        private SqlIdentifier name;
        private SqlDataTypeSpec type;
        private SqlNode expr;
    
        public SqlTableColumn(SqlIdentifier name,
                              SqlDataTypeSpec type,
                              SqlParserPos pos) {
            super(pos);
            this.name = requireNonNull(name, "Column name should not be null");
            this.type = requireNonNull(type, "Column type should not be null");
        }
    
        public SqlTableColumn(SqlIdentifier name,
                              SqlNode expr,
                              SqlParserPos pos) {
            super(pos);
            this.name = requireNonNull(name, "Column name should not be null");
            this.expr = requireNonNull(expr, "Column expression should not be null");
        }
    
        @Override
        public SqlOperator getOperator() {
            return OPERATOR;
        }
    
        @Override
        public List<SqlNode> getOperandList() {
            return isGenerated() ?
                    ImmutableNullableList.of(name, expr) :
                    ImmutableNullableList.of(name, type);
        }
    
        public boolean isGenerated() {
            return expr != null;
        }
    
        @Override
        public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
            this.name.unparse(writer, leftPrec, rightPrec);
            if (isGenerated()) {
                writer.keyword("AS");
                this.expr.unparse(writer, leftPrec, rightPrec);
            } else {
                writer.print(" ");
                this.type.unparse(writer, leftPrec, rightPrec);
            }
        }
    
        public SqlNode getExpr() {
            return expr;
        }
    
        public SqlIdentifier getName() {
            return name;
        }
    
        public void setName(SqlIdentifier name) {
            this.name = name;
        }
    
        public SqlDataTypeSpec getType() {
            return type;
        }
    
        public void setType(SqlDataTypeSpec type) {
            this.type = type;
        }
    }
    

    定义规则

    void TableColumn(TableCreationContext context) :
    {
    }
    {
        (LOOKAHEAD(3)
            TableColumn2(context.columnList)
        |
            ComputedColumn(context)   // 新增规则
        |
            context.sideFlag = SideFlag()
        )
    }
    
    /**
    *  支持计算列 egg:  proctime as proctime()
    */
    void ComputedColumn(TableCreationContext context) :
    {
        SqlIdentifier identifier;
        SqlNode expr;
        SqlParserPos pos;
    }
    {
        identifier = SimpleIdentifier() {pos = getPos();}
        <AS>
        // 解析非查询的行表达式    
        expr = Expression(ExprContext.ACCEPT_NON_QUERY)
        {
            SqlTableColumn computedColumn = new SqlTableColumn(identifier, expr, getPos());
            context.columnList.add(computedColumn);
        }
    }
    

    应用规则

    parser.tdd不需要做变更。

    测试解析

        @Test
        public void testCreateTableContainsExpr() throws SqlParseException {
            String ddlSql = "CREATE TABLE user_info (\n"
                    + "    userId BIGINT,\n"
                    + "    userName VARCHAR(10),\n"
                    + "    proctime AS xxxx() \n"
                    + ") WITH (\n"
                    + "    'connector' = 'kafka',\n"
                    + "    'properties.bootstrap.servers' = 'localhost:9092',\n"
                    + "    'topic' = 'tp01',\n"
                    + "    'format' = 'json',\n"
                    + "    'scan.startup.mode' = 'latest-offset'\n"
                    + ")";
    
            SqlNode ddlSqlNode = parseStmtAndHandleEx(ddlSql);
            Assert.assertTrue(ddlSqlNode instanceof SqlCreateTable);
    
            SqlCreateTable createTable = (SqlCreateTable) ddlSqlNode;
            SqlNodeList columns = createTable.getColumnList();
    
            System.out.println(String.format("tableName: %s",createTable.getTableName()));
            for (SqlNode column : columns) {
                SqlTableColumn tableColumn = (SqlTableColumn) column;
                if (!tableColumn.isGenerated()){
                    String columnName = tableColumn.getName().toString();
                    String typeName = tableColumn.getType().getTypeName().getSimple();
                    System.out.println(String.format("columnName: %s, typeName: %s", columnName, typeName));
                } else {
                    String name = tableColumn.getName().toString();
                    SqlNode expr = tableColumn.getExpr();
                    System.out.println(String.format("name: %s, expr: %s", name, expr));
    
                }
    
            }
        }
    

    watermark for xx as ..

    扩展SqlNode

    新增SqlWatermark类,同org.apache.flink.sql.parser.ddl.SqlWatermark
    SqlCreateTable新增SqlWatermark属性。
    TableCreationContext新增SqlWatermark属性。

    定义规则

    /**
    *  解析表字段列
    */
    void TableColumn(TableCreationContext context) :
    {
    }
    {
        (LOOKAHEAD(3)
            TableColumn2(context.columnList)
        |
            ComputedColumn(context)
        |
            Watermark(context)  // 引入对Watermark字段的解析
        |
            context.sideFlag = SideFlag()
        )
    }
    
    void Watermark(TableCreationContext context) :
    {
        SqlIdentifier eventTimeColumnName;
        SqlParserPos pos;
        SqlNode watermarkStrategy;
    }
    {
        <WATERMARK> {pos = getPos();} <FOR>
        eventTimeColumnName = CompoundIdentifier()
        <AS>
        // 非查询 表达式
        watermarkStrategy = Expression(ExprContext.ACCEPT_NON_QUERY)
        {
            if (context.watermark != null) {
                throw new RuntimeException("Multiple WATERMARK statements is not supported yet.");
            } else {
                context.watermark = new SqlWatermark(pos, eventTimeColumnName, watermarkStrategy);
            }
        }
    }
    

    应用规则

    1. import:"cn.todd.flink.sql.parser.ddl.SqlWatermark"
    2. keywords: "WATERMARK"

    测试解析

    
        @Test
        public void testCreateTableWaterMark() throws SqlParseException {
            String ddlSql = "CREATE TABLE user_info (\n"
                    + "    userId BIGINT,\n"
                    + "    userName VARCHAR(10),\n"
                    + "    ts timestamp(10),\n"
                    + "    WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n"
                    + ") WITH (\n"
                    + "    'connector' = 'kafka',\n"
                    + "    'properties.bootstrap.servers' = 'localhost:9092',\n"
                    + "    'topic' = 'tp01',\n"
                    + "    'format' = 'json',\n"
                    + "    'scan.startup.mode' = 'latest-offset'\n"
                    + ")";
    
            SqlNode ddlSqlNode = parseStmtAndHandleEx(ddlSql);
            Assert.assertTrue(ddlSqlNode instanceof SqlCreateTable);
    
            SqlCreateTable createTable = (SqlCreateTable) ddlSqlNode;
            SqlNodeList columns = createTable.getColumnList();
    
            System.out.println(String.format("tableName: %s",createTable.getTableName()));
            for (SqlNode column : columns) {
                SqlTableColumn tableColumn = (SqlTableColumn) column;
                if (!tableColumn.isGenerated()){
                    String columnName = tableColumn.getName().toString();
                    String typeName = tableColumn.getType().getTypeName().getSimple();
                    System.out.println(String.format("columnName: %s, typeName: %s", columnName, typeName));
                } else {
                    String name = tableColumn.getName().toString();
                    SqlNode expr = tableColumn.getExpr();
                    System.out.println(String.format("name: %s, expr: %s", name, expr));
                }
            }
    
            if (createTable.getWatermark() !=null ) {
                SqlWatermark watermark = createTable.getWatermark();
                String eventTimeColumnName = watermark.getEventTimeColumnName().toString();
                String watermarkStrategy = watermark.getWatermarkStrategy().toString();
                System.out.println(String.format("eventTimeColumnName: %s, watermarkStrategy: %s", eventTimeColumnName, watermarkStrategy));
    
            }
        }
    

    create view

    扩展SqlNode

    同:org.apache.flink.sql.parser.ddl.SqlCreateView

    定义规则

    /**
    * Parses a "IF EXISTS" option, default is false.
    */
    boolean IfExistsOpt() :
    {
    }
    {
        (
            LOOKAHEAD(2)
            <IF> <EXISTS> { return true; }
        |
           { return false; }
        )
    }
    
    /**
    * Parses a "IF NOT EXISTS" option, default is false.
    */
    boolean IfNotExistsOpt() :
    {
    }
    {
        (
            LOOKAHEAD(3)
            <IF> <NOT> <EXISTS> { return true; }
        |
            { return false; }
        )
    }
    
    /**
    *   parse : create view xxx as select ... or create view xxx(..)
    */
    SqlCreate SqlCreateView(Span s, boolean replace) : {
        SqlIdentifier viewName;
        SqlNode query=null;
        SqlNodeList fieldList = SqlNodeList.EMPTY;
        boolean ifNotExists = false;
    }
    {
        <VIEW>
        viewName = CompoundIdentifier()
        ifNotExists = IfNotExistsOpt()
        [
            //  括号中的简单标识符列表(a,b,c)。parser.jj定义
            fieldList = ParenthesizedSimpleIdentifierList()
        ]
        [
            <AS>
            //解析查询表达式 
            query = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
        ]
        {
            return new SqlCreateView(s.pos(), viewName, fieldList, query, replace, ifNotExists);
        }
    }
    
    

    应用规则

    1. import:""cn.todd.flink.sql.parser.ddl.SqlCreateView""
    2. createStatementParserMethods:"SqlCreateView"

    测试解析

        @Test
        public void testCreateView() throws SqlParseException {
            String ddlSql = "CREATE view user_info AS Select * from infos";
            String ddlSql2 = "CREATE view user_info (userId,userName ) AS Select * from infos";
            SqlNode ddlSqlNode = parseStmtAndHandleEx(ddlSql);
            System.out.println(ddlSqlNode);
    
        }
    
        @Test
        public void testCreateViewAsSelect() throws SqlParseException {
            String ddlSql = "create view user_info as select * from infos";
            SqlNode ddlSqlNode = parseStmtAndHandleEx(ddlSql);
            Assert.assertTrue(ddlSqlNode instanceof SqlCreateView);
            System.out.println(ddlSqlNode);
        }
    

    create function

    扩展SqlNode

    同:org.apache.flink.sql.parser.ddl.SqlCreateFunction

    定义规则

    /**
    *   parse : create function name as 'xxx.ccc.cc' LANGUAGE java
    */
    SqlCreate SqlCreateFunction(Span s, boolean replace) :
    {
        SqlIdentifier functionIdentifier = null;
        SqlCharStringLiteral functionClassName = null;
        String functionLanguage = null;
        boolean ifNotExists = false;
    }
    {
        <FUNCTION>
            ifNotExists = IfNotExistsOpt()
            functionIdentifier = CompoundIdentifier()
        <AS> <QUOTED_STRING>
        {
            //  获取字符串中的内容
            String p = SqlParserUtil.parseString(token.image);
            functionClassName = SqlLiteral.createCharString(p, getPos());
        }
        [<LANGUAGE>
            (
                <JAVA>  { functionLanguage = "JAVA"; }
            |
                <SCALA> { functionLanguage = "SCALA"; }
            |
                <SQL>   { functionLanguage = "SQL"; }
            |
                <PYTHON> { functionLanguage = "PYTHON"; }
            )
        ]
        {
           return new SqlCreateFunction(s.pos(), functionIdentifier, functionClassName, functionLanguage, ifNotExists);
        }
    }
    

    应用规则

    1. import:"cn.todd.flink.sql.parser.ddl.SqlCreateFunction"
    2. createStatementParserMethods:"SqlCreateFunction"
    3. keywords: "PYTHON","SCALA"

    测试解析

    public class CreateFunctionTest extends BaseParser {
    
    
        @Test
        public void testCreateFunction() throws SqlParseException {
            String ddlSql = "create function str2Timestamp as 'aaa.b.c'";
            SqlNode ddlSqlNode = parseStmtAndHandleEx(ddlSql);
    
            Assert.assertTrue(ddlSqlNode instanceof SqlCreateFunction);
            SqlCreateFunction createFunction = (SqlCreateFunction) ddlSqlNode;
            String functionClassName = createFunction.getFunctionClassName().toString();
            String functionIdentifier = createFunction.getFunctionIdentifier()[0];
    
            System.out.println(String.format("functionIdentifier:%s , functionClassName: %s,",functionIdentifier, functionClassName));
        }
    
        @Test
        public void testCreateFunctionWithLanguage() throws SqlParseException {
            String ddlSql = "create function str2Timestamp as 'aaa.b.c' language java";
            SqlNode ddlSqlNode = parseStmtAndHandleEx(ddlSql);
    
            Assert.assertTrue(ddlSqlNode instanceof SqlCreateFunction);
            SqlCreateFunction createFunction = (SqlCreateFunction) ddlSqlNode;
            String functionClassName = createFunction.getFunctionClassName().toString();
            String functionIdentifier = createFunction.getFunctionIdentifier()[0];
            String functionLanguage = createFunction.getFunctionLanguage();
    
            System.out.println(String.format("functionIdentifier:%s , functionClassName: %s,functionLanguage: %s",
                    functionIdentifier, functionClassName,functionLanguage));
        }
    }
    

    DML

    insert into..select

    扩展SqlNode

    FlinkSQL为支持HIVE语法引入了partition相关属性,对SqlInsert做了部分扩展。创建RichSqlInsert直接继承SqlInsert。

    定义规则

    完全使用calcite定义的规则,不做任何扩展。

    /**
    * Parses an INSERT statement.  Copy from parser.jj
    */
    SqlNode RichSqlInsert() :
    {
        final List<SqlLiteral> keywords = new ArrayList<SqlLiteral>();
        final SqlNodeList keywordList;
        SqlNode table;
        SqlNodeList extendList = null;
        SqlNode source;
        SqlNodeList columnList = null;
        final Span s;
    }
    {
        (
        <INSERT>
        |
        <UPSERT> { keywords.add(SqlInsertKeyword.UPSERT.symbol(getPos())); }
        )
        { s = span(); }
        SqlInsertKeywords(keywords) {
            keywordList = new SqlNodeList(keywords, s.addAll(keywords).pos());
        }
        <INTO> table = CompoundIdentifier()
        [
            LOOKAHEAD(5)
            [ <EXTEND> ]
            extendList = ExtendList() {
                table = extend(table, extendList);
            }
        ]
    
        [
            LOOKAHEAD(2)
            { final Pair<SqlNodeList, SqlNodeList> p; }
            p = ParenthesizedCompoundIdentifierList() {
                if (p.right.size() > 0) {
                    table = extend(table, p.right);
                }
                if (p.left.size() > 0) {
                    columnList = p.left;
                }
            }
        ]
            source = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY) {
            return new RichSqlInsert(s.end(source), keywordList, table, source, columnList);
        }
    }
    

    应用规则

    1. statementParserMethods: [ "RichSqlInsert()" ]
    2. import:["cn.todd.flink.sql.parser.dml.RichSqlInsert"]

    测试解析

    public class InsertStatementTest extends BaseParser {
        @Test
        public void testSimpleInsertStat() throws SqlParseException {
            String insertSql = "insert into  sinkTable  select name ,age from sourceTable";
            SqlNode insertSqlNode = parseStmtAndHandleEx(insertSql);
            Assert.assertTrue(insertSqlNode instanceof RichSqlInsert);
    
            SqlNode targetTable = ((SqlInsert) insertSqlNode).getTargetTable();
            SqlNode source = ((SqlInsert) insertSqlNode).getSource();
    
            System.out.println(String.format("targetTable: %s, \nsource: %s", targetTable, source));
        }
    }
    
    

    总结

    1. 扩展语法规则:Parser.jj包含了对标准SQL的解析,如果要扩展则在原始规则上做修改,从Parser.jj搜索要使用的方法。例如:Flink在Parser.jj#SqlInsert语法上扩展出RichSqlInsert规则,支持对Partion的解析。
    2. 了解规则含义:引入的规则不清楚含义时,可以通过阅读生成的parserImpl类,找到对应的方法。
    3. 规则调试:从parserImpls.ftl引入的规则在编译时可能会出错,需要打开生成的parserImpl类找到对应的行数进行排查。

    相关文章

      网友评论

          本文标题:FlinkSQL 语法扩展

          本文链接:https://www.haomeiwen.com/subject/gyigwktx.html