Catalog 提供了元数据信息
,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。
数据处理最关键的方面之一是管理元数据
。 元数据可以是临时的,例如临时表、或者通过 TableEnvironment 注册的 UDF。 元数据也可以是持久化的,例如 Hive Metastore 中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。
前面用到Connector其实就是在使用Catalog
Catalog类型
-
GenericInMemoryCatalog
GenericInMemoryCatalog 是基于内存实现的 Catalog,所有元数据只在 session 的生命周期内可用。 -
JdbcCatalog
JdbcCatalog 使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。PostgresCatalog 是当前实现的唯一一种 JDBC Catalog。 -
HiveCatalog
HiveCatalog 有两个用途:作为原生 Flink 元数据的持久化存储,以及作为读写现有 Hive 元数据的接口。 Flink 的 Hive 文档 提供了有关设置 HiveCatalog 以及访问现有 Hive 元数据的详细信息。
HiveCatalog
- 导入相关的依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>1.13.1</version>
</dependency>
<!-- Hive Dependency -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
-
启动 Hadoop
-
copy hive-site.xml
-
启动元数据服务
<property>
<name>hive.metastore.uris</name>
<value>thrift://hadoop162:9083</value>
</property>
在hive-site.xml中配置元数据服务,所以需要启动该服务,否则无法读取数据。
hive service -metestore
元数据服务的作用:若没有元数据服务器,所有的请求将直接访问到hive,这样可能导致hive吃不消,有了元数据服务之后,请求将全部交由元数据服务器,元数据服务再去请求hive,起到一个缓冲的作用。
- 创建对应的表
随便创建一个库
create database gmall;
随便创建一张表
use gmall;
create table student(
id int,
name string,
age int,
sex string
);
往表里插入一些数据
insert into student(id, name, age, sex)
values
(1,'张三',18,'男'),
(2,'李四',56,'女'),
(3,'王五',34,'男'),
(4,'赵六',33,'女'),
(5,'孙七',78,'男');
- 连接hive
@Test
public void test1(){
// 环境准备
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
/**
* 创建 hivecatalog
*
* my_hive catalog 名称(随意
* gmall 指定数据库
* input/ 配置hive-site.xml 的目录。
*/
HiveCatalog hiveCatalog = new HiveCatalog("my_hive", "gmall", "input/");
/**
* 注册Catalog
* my_hive2 名称随意,通常和 new HiveCatalog 中的一致
*/
tableEnv.registerCatalog("my_hive2",hiveCatalog);
/**
* 指定数据库,查询
* my_hive tableEnv.registerCatalog 中指定的
* gmall :库名
* student:表名
*/
tableEnv.sqlQuery("select * from my_hive2.gmall.student").execute().print();
}
- 查询结果
+----+-------------+--------------------------------+-------------+--------------------------------+
| op | id | name | age | sex |
+----+-------------+--------------------------------+-------------+--------------------------------+
| +I | 1 | 张三 | 18 | 男 |
| +I | 2 | 李四 | 56 | 女 |
| +I | 3 | 王五 | 34 | 男 |
| +I | 4 | 赵六 | 33 | 女 |
| +I | 5 | 孙七 | 78 | 男 |
+----+-------------+--------------------------------+-------------+--------------------------------+
- 全局设置
select * from my_hive2.gmall.student
这样写比较麻烦(如上),通常我们可以将其他提出去,设置成全局
tableEnv.useCatalog("my_hive2");
tableEnv.useDatabase("gmall");
/**
* 指定数据库,查询
* my_hive tableEnv.registerCatalog 中指定的
* gmall :库名
* student:表名
*/
tableEnv.sqlQuery("select * from student").execute().print();
网友评论