美文网首页flinkFlink
Flink(1.13) Catalog

Flink(1.13) Catalog

作者: 万事万物 | 来源:发表于2021-08-26 11:39 被阅读0次

    Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。

    数据处理最关键的方面之一是管理元数据。 元数据可以是临时的,例如临时表、或者通过 TableEnvironment 注册的 UDF。 元数据也可以是持久化的,例如 Hive Metastore 中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。

    前面用到Connector其实就是在使用Catalog


    Catalog类型

    • GenericInMemoryCatalog
      GenericInMemoryCatalog 是基于内存实现的 Catalog,所有元数据只在 session 的生命周期内可用。

    • JdbcCatalog
      JdbcCatalog 使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。PostgresCatalog 是当前实现的唯一一种 JDBC Catalog。

    • HiveCatalog
      HiveCatalog 有两个用途:作为原生 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口。 Flink 的 Hive 文档 提供了有关设置 HiveCatalog 以及访问现有 Hive 元数据的详细信息。


    HiveCatalog

    • 导入相关的依赖
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-hive_2.12</artifactId>
        <version>1.13.1</version>
    </dependency>
    <!-- Hive Dependency -->
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>3.1.2</version>
    </dependency>
    
    • 启动 Hadoop

    • copy hive-site.xml

    • 启动元数据服务

        <property>
            <name>hive.metastore.uris</name>
            <value>thrift://hadoop162:9083</value>
        </property>
    

    在hive-site.xml中配置元数据服务,所以需要启动该服务,否则无法读取数据。

    hive service -metestore
    

    元数据服务的作用:若没有元数据服务器,所有的请求将直接访问到hive,这样可能导致hive吃不消,有了元数据服务之后,请求将全部交由元数据服务器,元数据服务再去请求hive,起到一个缓冲的作用。

    • 创建对应的表

    随便创建一个库

    create database gmall;
    

    随便创建一张表

    use gmall;
    
    create table student(
        id int,
        name string,
        age int,
        sex string
    );
    

    往表里插入一些数据

    insert into student(id, name, age, sex)
    values
    (1,'张三',18,'男'),
    (2,'李四',56,'女'),
    (3,'王五',34,'男'),
    (4,'赵六',33,'女'),
    (5,'孙七',78,'男');
    
    • 连接hive
        @Test
        public void test1(){
    
            // 环境准备
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            /**
             * 创建 hivecatalog
             *
             * my_hive catalog 名称(随意
             * gmall 指定数据库
             * input/ 配置hive-site.xml 的目录。
             */
            HiveCatalog hiveCatalog = new HiveCatalog("my_hive", "gmall", "input/");
    
            /**
             * 注册Catalog
             * my_hive2 名称随意,通常和  new HiveCatalog 中的一致
             */
            tableEnv.registerCatalog("my_hive2",hiveCatalog);
    
            /**
             * 指定数据库,查询
             * my_hive tableEnv.registerCatalog 中指定的
             * gmall :库名
             * student:表名
             */
            tableEnv.sqlQuery("select * from my_hive2.gmall.student").execute().print();
    
        }
    
    • 查询结果
    +----+-------------+--------------------------------+-------------+--------------------------------+
    | op |          id |                           name |         age |                            sex |
    +----+-------------+--------------------------------+-------------+--------------------------------+
    | +I |           1 |                           张三 |          18 |                             男 |
    | +I |           2 |                           李四 |          56 |                             女 |
    | +I |           3 |                           王五 |          34 |                             男 |
    | +I |           4 |                           赵六 |          33 |                             女 |
    | +I |           5 |                           孙七 |          78 |                             男 |
    +----+-------------+--------------------------------+-------------+--------------------------------+
    
    • 全局设置
    select * from my_hive2.gmall.student
    

    这样写比较麻烦(如上),通常我们可以将其他提出去,设置成全局

    tableEnv.useCatalog("my_hive2");
    tableEnv.useDatabase("gmall");
    
    /**
     * 指定数据库,查询
     * my_hive tableEnv.registerCatalog 中指定的
     * gmall :库名
     * student:表名
    */
    tableEnv.sqlQuery("select * from student").execute().print();
    
    

    相关文章

      网友评论

        本文标题:Flink(1.13) Catalog

        本文链接:https://www.haomeiwen.com/subject/poxqiltx.html