美文网首页实时数据相关
blink hive catalog吐血整理

blink hive catalog吐血整理

作者: 岳过山丘 | 来源:发表于2019-03-12 11:24 被阅读40次

1.blink在flink的基础上做了大量的优化,其中有两点:

1.1Catalog

在catalog上做了如下修改和优化:

  • 通过引入全新的 ReadableCatalog and ReadableWritableCatalog 接口统一了 Flink 的内部和外部 catalog。Flink 所有的 catalog 会被 TableEnvironment 中的 CatalogManager管理。
  • 实现了两种新的 catalog - FlinkInMemoryCatalog and HiveCatalog。FlinkInMemoryCatalog 会将所有元数据存在内存中。HiveCatalog 会连接 Hive metastore 并桥接 Flink 和 Hive 之间的元数据。目前,这个HiveCatalog 可以提供读取 Hive 元数据的能力,包括数据库(databases),表(tables),表分区(table partitions), 简单的数据类型(simple data types), 表和列的统计信息(table and column stats)。
  • 重新清晰定义了引用目标的层级,即 'mycatalog.mydatabase.mytable'。通过定义默认 catalog 和默认数据库,用户可以将引用层级简单化为 'mytable’。

未来,我们还将加入对更多类型的元数据以及catalog的支持。

1.2Hive兼容性

我们的目标是在元数据(meta data)和数据层将 Flink 和 Hive 对接和打通。

  • 在这个版本上,Flink可以通过上面提到的HiveCatalog读取Hive的metaData。
  • 这个版本实现了HiveTableSource,使得Flink job可以直接读取Hive中普通表和分区表的数据,以及做分区的裁剪。

通过这个版本,用户可以使用Flink SQL读取已有的Hive meta和data,做数据处理。未来我们将在Flink上继续加大对Hive兼容性的支持,包括支持Hive特有的data type,和Hive UDF等等。

2.如何连接hive 源数据

2.1 代码

通过flink-sql连接外部数据源(比如hive),需要写一些代码声明。


image.png

2.2 blink sql-client

image.png

3. 环境准备

3.1安装hadoop

参考https://blog.csdn.net/hubin232/article/details/76769265

 cd /usr/local/Cellar/hadoop/3.1.1/libexec/sbin
 ./start-all.sh //即可启动 hadoop namenode,secondnamenode,datanode,resource mananger组件

3.2 安装hive

mac 环境下brew install hive 即可安装最新版 (需要先装mysql或者一个能连的上mysql也行)

3.3 配置

cd /usr/local/Cellar/hive/3.1.1/libexec/conf
cp hive-default.xml.template hive-site.xml

编辑hive-site.xml及后续参考https://www.jianshu.com/p/5c11073d19d3
安装后,建表,插数据。

image.png

3.4.metastore server开启

注意!!! 一定要确保hive开启了 metastore server
lsof -i:9083 查询是否开启了。
开启方式有两种

1

/usr/local/Cellar/hive/3.1.1/libexec/hcatalog/sbin/hcat_server.sh start
提示

Started metastore server init, testing if initialized correctly...
Metastore initialized successfully on port[9083].

就说明成功了。

2 hive --service metastore (这个没试过)

lsof -i:9083


image.png

3.5 为什么要开启metastore 呢?

blink catalog架构图


image.png

红框内就是连接的hive metastore,所以需要先开启 hive 的metastore server。

4. blink源码修改

1. sql-client Environment 类

 修改位置:
org.apache.flink.table.client.config.Environment
enrich 方法
enrichedEnv.deployment = DeploymentEntry.enrich(env.deployment, properties);
下方加入
enrichedEnv.catalogs = new HashMap<>(env.catalogs);

这块没有将catalogs复制过去,会导致从环境中读取到的catalogs丢失,用户永远没发定义catalog。

2.hive connector

官方支持的hive版本是2.4,我的是3.1.1,会报错

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.BatchTableSourceFactory' in
the classpath.

Reason: 
The matching factory 'org.apache.flink.streaming.connectors.hive.HiveTableFactory' doesn't support 'bucketing_version'.

所以要在 flink-connector-hive模块

在类org.apache.flink.table.catalog.hive.config.HiveTableConfig
加入:
public static final String DEFAULT_TABLE_BUCKETING_VERSION = "bucketing_version";
public static final String DEFAULT_TABLE_COLUMN_STATS_ACCURATE = "column_stats_accurate";
在类org.apache.flink.streaming.connectors.hive.HiveTableFactory 
supportedProperties 方法加入:
properties.add(HiveTableConfig.DEFAULT_TABLE_BUCKETING_VERSION);
properties.add(HiveTableConfig.DEFAULT_TABLE_COLUMN_STATS_ACCURATE);


修改pom


image.png
image.png

否则会报找不到HadoopInputFormatCommonBase 这个类。

3 jar包替换。

代码修改后
在sql-client模块mvn clean install -Dmaven.test.skip -Dcheckstyle.skip -Drat.ignoreErrors=true
将flink-sql-client-1.5.1.jar包拷到 /apache-flink/build-target/opt/sql-client
在flink-connector-hive模块 mvn clean install -Dmaven.test.skip -Dcheckstyle.skip -Drat.ignoreErrors=true
将flink-connector-hive_2.11-1.5.1.jar拷到/apache-flink/build-target/opt/connectors

5.应用

5.1配置

进入/apache-flink/build-target/bin
cp ../conf/sql-client-default.ymal sql-client-hive.ymal
修改sql-client-hive.ymal
execution配置:
(streaming模式不支持,应该可以通过修改flink-connector-hive模块代码支持。)


image.png

catalogs配置:

image.png

5.2.执行./sql-client.sh embedded -e sql-client-hive.yaml

image.png image.png

至此就打通blink的 sql-client与 hive源数据了。

总结:在hive环境的配置比较耗时间,blink源码catalog的bug得debug才能发现。过程比较繁琐,但是结果是很好的,通过打通catalog,flink就可以像spark一样读取并处理hive数据了,对于小公司或者数据量不大的情况下,只选用flink技术栈就可以做到批流处理。

相关文章

网友评论

    本文标题:blink hive catalog吐血整理

    本文链接:https://www.haomeiwen.com/subject/ohfgpqtx.html