美文网首页
flink catalog 之 flink-catalog-in

flink catalog 之 flink-catalog-in

作者: 山里小龙 | 来源:发表于2022-08-16 23:42 被阅读0次

    开发思路

    运行环境全面升级到jdk11后,原基于hive catalog的flink catlog不能再使用,原因是现在hive所有版本都不支持jdk11,但在大趋势下jdk版本已不可能回退到jdk1.8了,在这种情况下是否可以开发一个不依赖hive的flink catalog呢?

    flink默认catalog是保存在内存中,关键类是GenericInMemoryCatalog.java,这个类实现了元数据管理的所有方法,如果利用这些已实现的方法,只增加对内存中元数据持久化方法可以减少很工作量,同时也避免了重复造轮子。

    flink-catalog-in-jdbc基于继承GenericInMemoryCatalog实现,将元数据保存到jdbc中,从而解除了flink对hive的依赖。

    项目地址

    项目地址:https://github.com/jeff-zou/flink-catalog-in-jdbc.git
    无法翻墙:https://gitee.com/jeff-zou/flink-catalog-in-jdbc.git

    使用方法

    1.引入包

    <dependency>
        <groupId>io.github.jeff-zou</groupId>
        <artifactId>flink-catalog-in-jdbc</artifactId>
        <version>1.2</version>
    </dependency>
    

    2.数据库创建表

    
    CREATE TABLE `flink_catalog_databases` (
      `comment` varchar(100) DEFAULT NULL COMMENT '1',
      `properties` varchar(100) DEFAULT NULL,
      `database_name` varchar(100) DEFAULT NULL,
      UNIQUE KEY `flink_catalog_databases_un` (`database_name`)
    ) ;
    
    CREATE TABLE `flink_catalog_tables` (
      `script` varchar(5000) DEFAULT NULL COMMENT '1',
      `object_name` varchar(100) DEFAULT NULL,
      `database_name` varchar(100) DEFAULT NULL,
      `kind` varchar(20) DEFAULT NULL,
      `comment` varchar(200) DEFAULT NULL,
      UNIQUE KEY `flink_catalog_databases_un` (`database_name`,`object_name`)
    );
    
    CREATE TABLE `flink_catalog_functions` (
      `database_name` varchar(100) DEFAULT NULL,
      `object_name` varchar(100) DEFAULT NULL,
      `class_name` varchar(200) DEFAULT NULL COMMENT '1',
      `function_language` varchar(20) DEFAULT NULL,
      UNIQUE KEY `flink_catalog_functions_un` (`database_name`,`object_name`)
    ) ;
    
    CREATE TABLE `flink_catalog_columns` (
      `database_name` varchar(100) COLLATE utf8_unicode_ci DEFAULT NULL,
      `object_name` varchar(100) COLLATE utf8_unicode_ci DEFAULT NULL,
      `column_name` varchar(50) COLLATE utf8_unicode_ci DEFAULT NULL,
      `column_type` varchar(100) COLLATE utf8_unicode_ci DEFAULT NULL,
      `column_comment` varchar(200) COLLATE utf8_unicode_ci DEFAULT NULL,
      UNIQUE KEY `flink_catalog_columns_un` (`database_name`,`object_name`,`column_name`)
    )
    

    3.sql使用

    create catalog  my_catalog with ( 'type'='generic_in_jdbc', 'default-database'='test', 'username'='test', 'password'='****',
    'url'='jdbc:mysql://*****:3306/test_database?useUnicode=true&characterEncoding=utf8&autoReconnect=true');
    
    use catalog my_catalog;
     
    create database if not exists my_database;
    
    use  my_database;
    
    CREATE TABLE if not exists `test` (
                          `c1` VARCHAR(2147483647),
                          `id` INT NOT NULL,
                          `stime` TIMESTAMP(3),
                         `cost` as id * 10, "
                          WATERMARK FOR `stime` AS `stime` - INTERVAL '10' SECOND,
                          CONSTRAINT `PK_3386` PRIMARY KEY (`id`) NOT ENFORCED
                        ) "
                         comment 'test' "
                         partitioned by (c1)"
                         WITH (
                          'connector' = 'print'
                        ) ";
                        
     show create table test;                   
    

    关注个人微信公众号:肌肉码农,回复“好友”讨论问题

    相关文章

      网友评论

          本文标题:flink catalog 之 flink-catalog-in

          本文链接:https://www.haomeiwen.com/subject/scbdgrtx.html