Hudi Hive sync 使用

作者: AlienPaul | 来源:发表于2022-07-21 09:13 被阅读0次

背景

Spark/Flink可以使用Hive的metastore,但是Hive无法通过Hive metastore中的Spark/Flink表直接查询数据。为了解决这个问题,可以配置使用Hive sync。在Spark/Flink操作表的时候,自动同步Hive的元数据。这样就可以通过Hive查询Hudi表的内容。

Hive metastore通过目录结构的来维护元数据,数据的更新是通过覆盖来保证事务。但是数据湖是通过追踪文件来管理元数据,一个目录中可以包含多个版本的文件。这一点和Hive元数据管理是不同的。所以说为了兼容Hive metastore,Hudi需要实时从Timeline同步元数据到Hive metastore。

环境信息

  • Flink 1.13.2
  • Hudi 0.11.1
  • Spark 3.1.1
  • Hive 3.1.2

Hive 兼容Hudi格式

复制编译后的packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.11.1.jar到各节点Hive安装目录的auxlib目录中。

进入beeline后执行:

set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
set hive.stats.autogather=false;

Flink Hudi Hive Sync

如果要使用Hive Sync功能,编译时候需要激活flink-bundle-shade-hive3profile。编译命令如下所示:

mvn clean package -Dflink1.13 -Dscala2.11 -DskipTests -Pflink-bundle-shade-hive3 -T 4

Flink Hive Sync支持两种模式连接Hive:

  • Hive Metastore(hms): 连接Hive Metastore 9083端口。
  • JDBC: 连接HiveServer 10000端口。

两种使用方式如下所示:

-- hms 模式模板
CREATE TABLE t1(
  uuid VARCHAR(20),
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = '${db_path}/t1',
  'table.type' = 'COPY_ON_WRITE',  -- 如果使用MERGE_ON_READ,只有生成parquet文件之后,Hive才能查出数据
  'hive_sync.enable' = 'true',     -- 必须。启用Hive sync
  'hive_sync.mode' = 'hms',        -- 必须。设置模式未hms,默认为jdbc
  'hive_sync.metastore.uris' = 'thrift://${ip}:9083' -- 必须。端口需要在 hive-site.xml上配置
);


-- jdbc 模式模板
CREATE TABLE t1(
  uuid VARCHAR(20),
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = '${db_path}/t1',
  'table.type' = 'COPY_ON_WRITE',  -- 如果使用MERGE_ON_READ,只有生成parquet文件之后,Hive才能查出数据
  'hive_sync.enable' = 'true',     -- 必须。启用Hive sync
  'hive_sync.mode' = 'jdbc',       -- 必须。设置模式未hms,默认为jdbc
  'hive_sync.metastore.uris' = 'thrift://${ip}:9083', -- 必须。端口需要在hive-site.xml上配置
  'hive_sync.jdbc_url'='jdbc:hive2://${ip}:10000',    -- 必须。hiveServer端口
  'hive_sync.table'='${table_name}',                  -- 必须。同步过去的hive表名
  'hive_sync.db'='${db_name}',                        -- 必须。同步过去的hive表所在数据库名
  'hive_sync.username'='${user_name}',                -- 必须。JDBC 用户名
  'hive_sync.password'='${password}'                  -- 必须。JDBC 密码
);

例如使用HMS方式配置Hive Sync:

CREATE TABLE t1(
  uuid VARCHAR(20),
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs:///zy/hudi/',
  'table.type' = 'COPY_ON_WRITE',
  'hive_sync.enable' = 'true',
  'hive_sync.mode' = 'hms',
  'hive_sync.metastore.uris' = 'thrift://manager127:9083',
  'hive_sync.table'='t1', 
  'hive_sync.db'='default'
);

-- 插入测试数据

INSERT INTO t1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

然后我们进入beeline,执行:

show tables;

我们可以看到同步过来的t1表。

然后执行:

select * from t1;

可以从Hive中查出Hudi表数据。

FAQ

如果执行Flink的时候遇到如下错误:

java.lang.NoSuchMethodError: org.apache.parquet.schema.Types$PrimitiveBuilder.as(Lorg/apache/parquet/schema/LogicalTypeAnnotation;)Lorg/apache/parquet/schema/Types$Builder

需要修改packaging/hudi-flink-bundle/pom.xml,在relocations标签中加入:

<relocation>
  <pattern>org.apache.parquet</pattern>
  <shadedPattern>${flink.bundle.shade.prefix}org.apache.parquet</shadedPattern>
</relocation>

然后重新编译。

参考链接:
https://github.com/apache/hudi/issues/3042

Spark Hudi Hive Sync

Spark Hive Sync目前只支持DataFrame API。下面使用官网的例子插入数据到hudi_cow表:

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row


val tableName = "hudi_cow"
val basePath = "/user/hive/warehouse/hudi_cow"

val schema = StructType(Array(
StructField("rowId", StringType,true),
StructField("partitionId", StringType,true),
StructField("preComb", LongType,true),
StructField("name", StringType,true),
StructField("versionId", StringType,true),
StructField("toBeDeletedStr", StringType,true),
StructField("intToLong", IntegerType,true),
StructField("longToInt", LongType,true)
))

val data0 = Seq(Row("row_1", "2021/01/01",0L,"bob","v_0","toBeDel0",0,1000000L), 
               Row("row_2", "2021/01/01",0L,"john","v_0","toBeDel0",0,1000000L), 
               Row("row_3", "2021/01/02",0L,"tom","v_0","toBeDel0",0,1000000L))

var dfFromData0 = spark.createDataFrame(data0,schema)

dfFromData0.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "preComb").
  option(RECORDKEY_FIELD_OPT_KEY, "rowId").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionId").
  option(TABLE_NAME, tableName).
  option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL).
  option(OPERATION_OPT_KEY, "upsert").
  option("hoodie.index.type","SIMPLE").
  option("hoodie.datasource.write.hive_style_partitioning","true").
  option("hoodie.datasource.hive_sync.jdbcurl","jdbc:hive2://manager127:10000/").
  option("hoodie.datasource.hive_sync.database","default").
  option("hoodie.datasource.hive_sync.table","hudi_cow").
  option("hoodie.datasource.hive_sync.partition_fields","partitionId").
  option("hoodie.datasource.hive_sync.enable","true").
  option("hoodie.datasource.hive_sync.username","hdfs").
  mode(Overwrite).
  save(basePath)

Spark Hudi Hive Sync配置项含义如下:

  • hoodie.datasource.hive_sync.jdbcurl: Hive metastore连接JDBC URL。
  • hoodie.datasource.hive_sync.database: Hive database名称。
  • hoodie.datasource.hive_sync.table: Hive table名称。
  • hoodie.datasource.hive_sync.partition_fields: Hive分区字段名。
  • hoodie.datasource.hive_sync.enable: 是否启动Hive sync。
  • hoodie.datasource.hive_sync.username:访问Hive时使用的用户名。
  • hoodie.datasource.hive_sync.password:访问Hive使用的用户对应的密码。

和Flink一样,执行成功后可以使用Hive通过beeline查询Hudi表数据。

参考链接

https://hudi.apache.org/docs/syncing_metastore

相关文章

网友评论

    本文标题:Hudi Hive sync 使用

    本文链接:https://www.haomeiwen.com/subject/pokuirtx.html