背景
Spark/Flink可以使用Hive的metastore,但是Hive无法通过Hive metastore中的Spark/Flink表直接查询数据。为了解决这个问题,可以配置使用Hive sync。在Spark/Flink操作表的时候,自动同步Hive的元数据。这样就可以通过Hive查询Hudi表的内容。
Hive metastore通过目录结构的来维护元数据,数据的更新是通过覆盖来保证事务。但是数据湖是通过追踪文件来管理元数据,一个目录中可以包含多个版本的文件。这一点和Hive元数据管理是不同的。所以说为了兼容Hive metastore,Hudi需要实时从Timeline同步元数据到Hive metastore。
环境信息
- Flink 1.13.2
- Hudi 0.11.1
- Spark 3.1.1
- Hive 3.1.2
Hive 兼容Hudi格式
复制编译后的packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.11.1.jar
到各节点Hive安装目录的auxlib
目录中。
进入beeline
后执行:
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
set hive.stats.autogather=false;
Flink Hudi Hive Sync
如果要使用Hive Sync功能,编译时候需要激活flink-bundle-shade-hive3
profile。编译命令如下所示:
mvn clean package -Dflink1.13 -Dscala2.11 -DskipTests -Pflink-bundle-shade-hive3 -T 4
Flink Hive Sync支持两种模式连接Hive:
- Hive Metastore(hms): 连接Hive Metastore 9083端口。
- JDBC: 连接HiveServer 10000端口。
两种使用方式如下所示:
-- hms 模式模板
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = '${db_path}/t1',
'table.type' = 'COPY_ON_WRITE', -- 如果使用MERGE_ON_READ,只有生成parquet文件之后,Hive才能查出数据
'hive_sync.enable' = 'true', -- 必须。启用Hive sync
'hive_sync.mode' = 'hms', -- 必须。设置模式未hms,默认为jdbc
'hive_sync.metastore.uris' = 'thrift://${ip}:9083' -- 必须。端口需要在 hive-site.xml上配置
);
-- jdbc 模式模板
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = '${db_path}/t1',
'table.type' = 'COPY_ON_WRITE', -- 如果使用MERGE_ON_READ,只有生成parquet文件之后,Hive才能查出数据
'hive_sync.enable' = 'true', -- 必须。启用Hive sync
'hive_sync.mode' = 'jdbc', -- 必须。设置模式未hms,默认为jdbc
'hive_sync.metastore.uris' = 'thrift://${ip}:9083', -- 必须。端口需要在hive-site.xml上配置
'hive_sync.jdbc_url'='jdbc:hive2://${ip}:10000', -- 必须。hiveServer端口
'hive_sync.table'='${table_name}', -- 必须。同步过去的hive表名
'hive_sync.db'='${db_name}', -- 必须。同步过去的hive表所在数据库名
'hive_sync.username'='${user_name}', -- 必须。JDBC 用户名
'hive_sync.password'='${password}' -- 必须。JDBC 密码
);
例如使用HMS方式配置Hive Sync:
CREATE TABLE t1(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs:///zy/hudi/',
'table.type' = 'COPY_ON_WRITE',
'hive_sync.enable' = 'true',
'hive_sync.mode' = 'hms',
'hive_sync.metastore.uris' = 'thrift://manager127:9083',
'hive_sync.table'='t1',
'hive_sync.db'='default'
);
-- 插入测试数据
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
然后我们进入beeline,执行:
show tables;
我们可以看到同步过来的t1
表。
然后执行:
select * from t1;
可以从Hive中查出Hudi表数据。
FAQ
如果执行Flink的时候遇到如下错误:
java.lang.NoSuchMethodError: org.apache.parquet.schema.Types$PrimitiveBuilder.as(Lorg/apache/parquet/schema/LogicalTypeAnnotation;)Lorg/apache/parquet/schema/Types$Builder
需要修改packaging/hudi-flink-bundle/pom.xml
,在relocations
标签中加入:
<relocation>
<pattern>org.apache.parquet</pattern>
<shadedPattern>${flink.bundle.shade.prefix}org.apache.parquet</shadedPattern>
</relocation>
然后重新编译。
参考链接:
https://github.com/apache/hudi/issues/3042
Spark Hudi Hive Sync
Spark Hive Sync目前只支持DataFrame API。下面使用官网的例子插入数据到hudi_cow
表:
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val tableName = "hudi_cow"
val basePath = "/user/hive/warehouse/hudi_cow"
val schema = StructType(Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("preComb", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("toBeDeletedStr", StringType,true),
StructField("intToLong", IntegerType,true),
StructField("longToInt", LongType,true)
))
val data0 = Seq(Row("row_1", "2021/01/01",0L,"bob","v_0","toBeDel0",0,1000000L),
Row("row_2", "2021/01/01",0L,"john","v_0","toBeDel0",0,1000000L),
Row("row_3", "2021/01/02",0L,"tom","v_0","toBeDel0",0,1000000L))
var dfFromData0 = spark.createDataFrame(data0,schema)
dfFromData0.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
option(RECORDKEY_FIELD_OPT_KEY, "rowId").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
option(TABLE_NAME, tableName).
option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL).
option(OPERATION_OPT_KEY, "upsert").
option("hoodie.index.type","SIMPLE").
option("hoodie.datasource.write.hive_style_partitioning","true").
option("hoodie.datasource.hive_sync.jdbcurl","jdbc:hive2://manager127:10000/").
option("hoodie.datasource.hive_sync.database","default").
option("hoodie.datasource.hive_sync.table","hudi_cow").
option("hoodie.datasource.hive_sync.partition_fields","partitionId").
option("hoodie.datasource.hive_sync.enable","true").
option("hoodie.datasource.hive_sync.username","hdfs").
mode(Overwrite).
save(basePath)
Spark Hudi Hive Sync配置项含义如下:
- hoodie.datasource.hive_sync.jdbcurl: Hive metastore连接JDBC URL。
- hoodie.datasource.hive_sync.database: Hive database名称。
- hoodie.datasource.hive_sync.table: Hive table名称。
- hoodie.datasource.hive_sync.partition_fields: Hive分区字段名。
- hoodie.datasource.hive_sync.enable: 是否启动Hive sync。
- hoodie.datasource.hive_sync.username:访问Hive时使用的用户名。
- hoodie.datasource.hive_sync.password:访问Hive使用的用户对应的密码。
和Flink一样,执行成功后可以使用Hive通过beeline
查询Hudi表数据。
网友评论