美文网首页
flink catalog 之 flink-catalog-in

flink catalog 之 flink-catalog-in

作者: 山里小龙 | 来源:发表于2022-08-16 23:42 被阅读0次

开发思路

运行环境全面升级到jdk11后,原基于hive catalog的flink catlog不能再使用,原因是现在hive所有版本都不支持jdk11,但在大趋势下jdk版本已不可能回退到jdk1.8了,在这种情况下是否可以开发一个不依赖hive的flink catalog呢?

flink默认catalog是保存在内存中,关键类是GenericInMemoryCatalog.java,这个类实现了元数据管理的所有方法,如果利用这些已实现的方法,只增加对内存中元数据持久化方法可以减少很工作量,同时也避免了重复造轮子。

flink-catalog-in-jdbc基于继承GenericInMemoryCatalog实现,将元数据保存到jdbc中,从而解除了flink对hive的依赖。

项目地址

项目地址:https://github.com/jeff-zou/flink-catalog-in-jdbc.git
无法翻墙:https://gitee.com/jeff-zou/flink-catalog-in-jdbc.git

使用方法

1.引入包

<dependency>
    <groupId>io.github.jeff-zou</groupId>
    <artifactId>flink-catalog-in-jdbc</artifactId>
    <version>1.2</version>
</dependency>

2.数据库创建表


CREATE TABLE `flink_catalog_databases` (
  `comment` varchar(100) DEFAULT NULL COMMENT '1',
  `properties` varchar(100) DEFAULT NULL,
  `database_name` varchar(100) DEFAULT NULL,
  UNIQUE KEY `flink_catalog_databases_un` (`database_name`)
) ;

CREATE TABLE `flink_catalog_tables` (
  `script` varchar(5000) DEFAULT NULL COMMENT '1',
  `object_name` varchar(100) DEFAULT NULL,
  `database_name` varchar(100) DEFAULT NULL,
  `kind` varchar(20) DEFAULT NULL,
  `comment` varchar(200) DEFAULT NULL,
  UNIQUE KEY `flink_catalog_databases_un` (`database_name`,`object_name`)
);

CREATE TABLE `flink_catalog_functions` (
  `database_name` varchar(100) DEFAULT NULL,
  `object_name` varchar(100) DEFAULT NULL,
  `class_name` varchar(200) DEFAULT NULL COMMENT '1',
  `function_language` varchar(20) DEFAULT NULL,
  UNIQUE KEY `flink_catalog_functions_un` (`database_name`,`object_name`)
) ;

CREATE TABLE `flink_catalog_columns` (
  `database_name` varchar(100) COLLATE utf8_unicode_ci DEFAULT NULL,
  `object_name` varchar(100) COLLATE utf8_unicode_ci DEFAULT NULL,
  `column_name` varchar(50) COLLATE utf8_unicode_ci DEFAULT NULL,
  `column_type` varchar(100) COLLATE utf8_unicode_ci DEFAULT NULL,
  `column_comment` varchar(200) COLLATE utf8_unicode_ci DEFAULT NULL,
  UNIQUE KEY `flink_catalog_columns_un` (`database_name`,`object_name`,`column_name`)
)

3.sql使用

create catalog  my_catalog with ( 'type'='generic_in_jdbc', 'default-database'='test', 'username'='test', 'password'='****',
'url'='jdbc:mysql://*****:3306/test_database?useUnicode=true&characterEncoding=utf8&autoReconnect=true');

use catalog my_catalog;
 
create database if not exists my_database;

use  my_database;

CREATE TABLE if not exists `test` (
                      `c1` VARCHAR(2147483647),
                      `id` INT NOT NULL,
                      `stime` TIMESTAMP(3),
                     `cost` as id * 10, "
                      WATERMARK FOR `stime` AS `stime` - INTERVAL '10' SECOND,
                      CONSTRAINT `PK_3386` PRIMARY KEY (`id`) NOT ENFORCED
                    ) "
                     comment 'test' "
                     partitioned by (c1)"
                     WITH (
                      'connector' = 'print'
                    ) ";
                    
 show create table test;                   

关注个人微信公众号:肌肉码农,回复“好友”讨论问题

相关文章

网友评论

      本文标题:flink catalog 之 flink-catalog-in

      本文链接:https://www.haomeiwen.com/subject/scbdgrtx.html