美文网首页首页投稿(暂停使用,暂停投稿)
如何使用Calcite实现一个简单的数据库

如何使用Calcite实现一个简单的数据库

作者: 张海雷_7ff7 | 来源:发表于2017-12-06 14:32 被阅读0次

      说道Calcite你可能有些陌生, 但提及Hive、Kylin、Apache Drill、Flink等一定不会陌生,这些都是在我们日常工作中经常用到的,如上这些都是基于Calcite实现查询引擎,还有Druid和Storm也是使用它来实现SQL功能。按照官方的说法,Calcite是动态数据管理框架,这个解释理解起来有点抽象,通俗一点讲,要使用Calcite实现数据库,只需要关注存储引擎以及元数据管理,其他都交给Ca
    lcite。可能这个说法有些不严谨,文档中还提到了Calcite不提供处理数据的算法,但Calcite-core和Calite-linq都提供了一些算子的实现对于一个简单的数据库足够了。

      对于Calcite的详细介绍推荐大家看一遍文章,本文主要介绍Calcite如何使用,例如已经有一种数据格式的文件存储,如何利用Calcite快速实现SQL查询。我看过kylin、druid的Calcite应用,也是各不相同,这大概也是Calcite的魅力吧。

      Calcite文档有一个指南,介绍使用CSV File作为数据存储格式实现SQL查询,掌握了以后我们可以照猫画虎造出一个其他数据格式的数据库,或者对学习kylin、druid的源码有帮忙,概括地讲,在这个例子使用以下技巧:

    1. 自定义Schema
    2. 自定义Table
    3. 决定Table的字段类型
    4. 使用ScannableTable实现简单的全表扫描
    5. 更高级一点的技巧,使用Filterable Table实现谓词下推
    6. 更酷一点的技巧,基于TranslateTable使用Rule实现逻辑表达式的转换

      前四点是构建一个简单的,采用全表扫描的方式实现查询。5和6属于进阶内容,在案例中,使用Rule转换的方式实现了Project下推,和5实现的谓词下推是常用的SQL优化方式。下面由浅到深介绍这几项技巧。先来看前四项,完成一个简单的只能全表扫描的数据库。

      首先在GitHub上下载Calcite的源码,看calcite-example-csv工程,在src/test/CSVTest中有各种场景的测试用例,例如

    • testFilterableWhere是测试谓词下推
    • testPushDownProject是Project下推
    • testSelect是最简单的全表扫描

      可以先跑一下测试用例感受一下Calcite的魅力,Calcite实现一个数据库,只需要关注存储引擎以及元数据管理。存储格式采用csv,一个CSV文件会映射成一个Table,需要注意的是CSV文件的第一行是Table的元数据信息,采用“FieldName1:FieldType,FieldNameN:FieldType”这样的格式存储,类似excel中的表头信息。以下是sales/SALES.csv的示例。

    DEPTNO:int,NAME:string
    10,"Sales"
    20,"Marketing"
    30,"Accounts"
    

     至此介绍了存储格式以及元数据,接下来介绍如何使用。

     第一步,创建一个json格式的mode文件,描述了如何创建Schema,可以参照test/resource目录下的model.json,

    {
      "version": "1.0",
      "defaultSchema": "SALES",
      "schemas": [
        {
          "name": "SALES",
          "type": "custom",
          "factory": "org.apache.calcite.adapter.csv.CsvSchemaFactory",
          "operand": {
            "directory": "sales"
          }
        }
      ]
    }
    

    在分析model文件之前,我们先了解几个重要的概念:

    1. Schema,是table和function的名称空间,它是一个可嵌套的结构,Schema还可以有subSchema,理论上可以无限嵌套,但一般不会这么做。Schema可以理解成Database,Database下面有table,这样就和传统数据库的概念联系起来了,在Calcite中,顶层的Schema是root,自定义的Schema是root的subSchema,同时还可以设置defaultSchema,类似我们使用数据库时,使用use database命令以后就不用再输入database名字前缀。
    2. Table,就很好理解了,就是数据库中的表。在table描述了字段名以及相应的类型、表的统计信息,例如表有多少条记录等等,这里先不展开讲。另外重要的是数据文件的存储以及如何扫描读取数据文件。

      那么再去看这份model文件,就比较清晰明了。它描述了在数据库中有多少个Schema、每个Schema如何创建以及默认的Schema,这里的Schema可以理解成database。defaultSchema属性设置默认Schema,schemas是数组类型,每一项代表一个Schema描述信息,在描述信息中有一个关键的属性factory,它是创建Schema的工厂类,在这个例子中factory是org.apache.calcite.adapter.csv.CsvSchemaFactory,它实现了SchemaFactory接口。

    要自实现只有全表扫描功能的简单数据库需要做如下几步:

    1. 自定义SchemaFactory
    2. 自定义Schema
    3. 自定义Table
    4. 自定义Enumerator

    先看看SchemaFactory接口,它只有一个方法:

    Schema create(
          SchemaPlus parentSchema,
          String name,
          Map<String, Object> operand);
    

    create用于创建Schema,其参数说明如下:

    • parentSchema,他的父节点,一般为root
    • name schema的名字,它在model中定义的
    • operand,也是在mode中定义的,是Map类型,用于传入自定义参数。

      在这个Model中,CSVSchemaFactory创建一个叫“SALES”的CSVSchema,它会把src/test/resources/sales下所有CSV文件构建成table。所以operand只许设定了一个参数directory,即读取CSV文件的根目录。CSVSchemaFactory的实现比较简单所以就不在展开分析,需要注意是的源码中flavor参数的处理,这个参数涉及优化进阶相关,这里先不用管,默认为SCANNABLE。

      自定义Schema需要实现Schema接口,前面提过Schema是table和function的名称空间,其主要方法如下:

    • Table getTable(String name),根据表名获取Table
    • Set<String> getTableNames(),获取Schema下的所有表名集合
    • Collection<Function> getFunctions(String name),根据函数名获取函数列表,和table不同,这里返回的是集合类型。
    • Set<String> getFunctionNames(),或者所有的函数名集合。

    CsvSchema->AbstractSchema->Schema,AbstractSchema重新设计了一个getTableMap方法,使用tableName->Table的Map结构存储所有table。这样设计的好处是getTable()能够快速查找。CSVSchema的实现也比较简单,遍历读取根目录下的每个
    文件创建成表,因为上面的model.json中flavor没有设置,采用默认值SCANNABLE,创建成CsvScannableTable。

    自定义Table是本文中最复杂的,先看下图:


    image.jpg

    如图可知,CSVScannableTable主要实现了两个接口ScannableTable和Table。右边部分,CSVTable实现了Table接口,它的作用是定义Table的字段以及字段类型,左侧的ScannableTable是实现如何遍历读取CSV文件的数据。Table接口有如下三个方法:

    • RelDataType getRowType(RelDataTypeFactory typeFactory); 这个方法就是定义Table行记录的字段以及字段类型。
    • Statistic getStatistic(); 获取统计信息
    • Schema.TableType getJdbcTableType(); table的类型,table的类型有很多种,例如table和view。

    AbstractTable默认实现了getStatistic和getJdbcTableType,所以我们只需要实现getRowType方法。首先需要定义type,规范我们这个数据库支持的数据类型。例如字符串是采用String还是VarChar,具体实现在CsvFieldType枚举类,它内部维护了一个Map结构用来存储type的

      STRING(String.class, "string"),
      BOOLEAN(Primitive.BOOLEAN),
      BYTE(Primitive.BYTE),
      CHAR(Primitive.CHAR),
      //只列举部分类型
    

    由如上代码可知,type并不都是标准的SQL Type,例如String。Calcite中设计了RelDataTypeFactory,不仅支持标准的SQL TYPE,也支持java类型以及Array、Map等集合类型。该实例中,RowType是一个StructType,是集合类型,类似c语言中的struct,非常适合存储行记录中字段名以及类型,这和Hive的方式是一样的。例如SALES文件中的

    DEPTNO:int,NAME:string
    

    则Type为

    struct<DEPTNO:int,NAME:string>
    

    在这个例子中通过读取csv文件的第一行来获取fieldName以及fieldType的,具体实现在CsvEnumerator的deduceRowType()方法。
    在calcite中一般有两种执行模型,解释和编译,这一点类似Java。编译模式更好理解一些,会把逻辑执行计划通过字节码技术生成java code然后编译执行。解释模式则省掉生成代码编译的过程。关于解释执行。我看过一些基于Calcite的应用,大部分还是采用编译模式的,所以你看完这篇文章以后再去看其他使用calicite的项目,可能找不到熟悉的身影,如果table实现了如下三个接口之一,Calcite则会使用解释模式执行

    • ScannableTable
    • FilterableTable
    • ProjectableFilterableTable

    ScannableTable用于简单的全表扫描,FilterableTable用于谓词下推,ProjectableFilterableTable更酷一些既能支持谓词下推又能支持project下推。他们都有一个scan,但是参数不同

    • ScannableTable
    Enumerable<Object[]> scan(DataContext root);
    
    • FilterableTable
    Enumerable<Object[]> scan(DataContext root, List<RexNode> filters);
    

    因为要做谓词下推,比ScannableTable多了filters。filters是where语句中的filter。

    • ProjectableFilterableTable
     Enumerable<Object[]> scan(DataContext root, List<RexNode> filters,
          int[] projects);
    

    又增加了projects,投影字段顺序的数组。

    Enumerable支持linq和java的迭代器

    //返回java的迭代器
    Iterator<T> it = enumerable.iterator();
    //LINQ风格的迭代器
    Enumerator<T> enumerator =enumerable.enumerator();
    

    要使用这两种迭代器之前,必须要实现它!AbstractEnumerable借助Linq4j实现了enumerator和iterator的转换

    public Iterator<T> iterator() {
        return Linq4j.enumeratorIterator(enumerator());
      }
    

    所以我们仅需实现enumerator方法。

    Enumerator是Linq风格的迭代器,它有4个方法:

    1. current()
    2. moveNext()
    3. reset()
    4. close()

    current返回游标所指的当前记录,需要注意的是current并不会改变游标的位置,这一点和iterator是不同的,在iterator相对应的是next方法,每一次调用都会将游标移动到下一条记录,current则不会,Enumerator是在调用moveNext方法时才会移动游标。moveNext方法将游标指向下一条记录,并获取当前记录供current方法调用,如果没有下一条记录则返回false。

    CsvEnumerator是读取csv文件的迭代器,它还得需要一个RowConverter,因为csv中都是String类型,使用RowConverter转化成相应的类型。在moreNext方法中,有Stream和谓词下推filter部分的实现,在本文只关注如下几行代码:

    final String[] strings = reader.readNext();
    if (strings == null) {
       current = null;
       return false;
       
    }
    
    current = rowConverter.convertRow(strings);
    return true;
    

    至此,我们完成了使用csv文件存储的数据库全部工作,你可以在CsvTest中使用所有的名为“model”的模型进行测试,

    checkSql("model", "select * from EMPS");
    //smart模型的会在后续的文中介绍
    checkSql("smart", "select name from EMPS");
    

    总结一下:

    1. 创建模型,model.json
    2. 自定义SchemaFactory,CsvSchemaFactory
    3. 自定义Schema,CsvSchema
    4. 自定义Table,CsvTable、CsvScannableTable
    5. 自定义Enumerator,CsvEnumerator

    分享的过程也是学习的过程,在写本文过程,也了解了不少以前自以为懂了的细节,但也有可能还存在不正确的认识,欢迎指正交流。微信号:zhl5919

    参照资料:

    1. http://calcite.apache.org/docs/tutorial.html
    2. http://www.infoq.com/cn/articles/new-big-data-hadoop-query-engine-apache-calcite
    3. http://events.linuxfoundation.org/sites/events/files/slides/ApacheCon2016ChristianTzolov.v4.pdf

    相关文章

      网友评论

        本文标题:如何使用Calcite实现一个简单的数据库

        本文链接:https://www.haomeiwen.com/subject/mgitixtx.html