Amoro简介
Amoro 是一个与数据湖仓一体管理和优化相关的技术平台,致力于提升数据湖和数据仓库的性能与管理效率。Amoro 是构建在 Apache Iceberg、Pimon 等开放数据湖表格式之上的湖仓管理系统。具备自动优化机制,可以持续监控数据表上的文件状况。适用于需要进行大量数据分析和查询的场景,如企业数据仓库、大数据分析项目等。
本篇基于本人的使用情况,为大家分享Amoro的部署安装,简单使用和注意事项。
环境信息
- Amoro:0.6.1
- Flink:1.17.1
- Spark:3.3.0
- Hadoop:3.1.1
- MySQL:5.7.x
安装
下载并解压Amoro到服务器任意目录。
下载链接:Download (apache.org)
Flink和Spark访问Amoro的运行时依赖也在此链接下载。
Amoro需要使用RMDBS作为元数据存储。这里我们以MySQL为例。下载mysql-connector-java
到Amoro的lib
目录。
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
使用MySQL命令行创建amoro
数据库,并创建一个用户,赋予amoro
数据库的所有权限。(下面为了演示方便,直接使用root账户,生产环境不要这么使用)。
修改Amoro配置文件(conf/config.yaml
),增加MySQL相关配置。例如:
ams:
database:
type: mysql
jdbc-driver-class: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/amoro?useUnicode=true&characterEncoding=UTF8&autoReconnect=true&useAffectedRows=true&useSSL=false
username: root
password: root
最后执行bin/ams.sh start
启动Amoro服务。
此外Amoro还支持HA(高可用)模式。Amoro的高可用为一主多从,需要依赖外部Zookeeper集群进行主节点选举。HA的相关配置如下:
ams:
ha:
enabled: true # 开启HA模式
cluster-name: default # 指定集群名字。Amoro允许同一个Zookeeper集群负责多个Amoro集群的选主,这些Amoro集群需要使用cluster-name来区分
zookeeper-address: 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 # Zookeeper服务的地址和端口号
表格式
Amoro支持的格式和特性记录如下所示。
- Iceberg。不允许指定主键。
- Mixed Iceberg。除了iceberg 之外还包含logstore。Logstore为append only,可加速流式数据消费过程。允许指定主键。在minor optimize运行的时候去重。Minor Optimize运行之前可查询到upsert之前的数据,存在重复。Minor Optimize之后历史数据消失。
- Mixed Hive。使用Hive作为base store,iceberg作为change store。
使用
Catalog配置
创建Optimizer Group
和Hudi类似,Amoro的iceberg表在写入的时候会产生一系列的碎片文件。会存在大量的冗余和过期的数据。如果不定期优化表的话,表服务性能会严重降低,甚至不可用。
Optimizer是Amoro专门为表优化设计的服务。可以类比为Hudi的compaction和clean等服务。相比Hudi的优点是Amoro支持这些服务的可视化管理。这些服务的资源隔离,资源消耗,运行状态等可以通过Amoro管理界面查看和操作。
在Amoro中,一个Optimizer Group代表了一部分资源。Catalog和Optimizer Group存在1对多的关系。通过这种关系可以实现Optimizer资源的共享和隔离。
在创建Catalog之前需要先创建optimizer group。打开左侧菜单的Optimizing,点击右侧的Optimizer Groups标签。然后点击Add Group按钮。输入名字,选择一个之前配置的container。主要注意的是属性必须配置memory
,value的单位是MB。它代表了一个并行度占用的内存大小。
创建完毕之后点击Operation中的Scale-Out,指定并行度之后点击确定。最后在Optimizers标签中可以看到正在运行的Optimizer。
创建Catalogs
打开左侧菜单的Catalogs,点击Catalog List中的+
按钮。主要指定的配置项为:
- Name: Catalog的名字。
- Type:使用内部Catalog还是外部Catalog。如果使用外部Hive metastore,需要上传
hive-site.xml
。 - Table Format:Catalog中表的格式,对于内部Catalog目前支持的是Mixed Iceberg和Iceberg。
- Optimizer Group:运行表优化服务的Optimizer Group(可以理解为资源池),选择上一步创建好的optimizer。
- Storage的Type:存储类型。目前支持Hadoop和S3。对于Hadoop类型,需要上传Hadoop的
core-site.xml
和hdfs-site.xml
两个文件。 - Authentication的Type:认证类型。SIMPLE类型需要提供Hadoop的用户名。KERBEROS类型需要提供Principal,Keytab文件和krb5.conf文件。
- Properties:属性设置。必须的属性配置为
warehouse
。含义为本Catalog下表数据在分布式存储中的路径。如果使用的是External Catalog,无需配置warehouse
,例如配置Hive Metastore,表文件所在的位置为hdfs://ip:port/warehouse/tablespace/managed/hive
。相当于Hive managed table。 - Table Properties:如果该Catalog下所有表具有相同的属性配置,可以将这些配置写在此处。
Terminal操作Amoro
为方便用户轻量级使用,Amoro管理页面提供了一个可视化终端。该终端后台使用的是Spark。
创建数据库:
create database default;
创建表:
create table test1 (id int, data string, ts timestamp);
注意
使用界面Terminal的时候,建表语句不要使用use arctic
。
其余操作和使用Spark SQL完全相同。
Flink操作Amoro
首先需要引入依赖。添加如下jar包到$FLINK_HOME/lib
目录中:
- amoro-flink-runtime-1.17-0.6.1.jar
- flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
- httpclient5-5.3.1.jar
- httpcore5-5.1.5.jar
- httpcore5-h2-5.2.4.jar
需要注意的是,如果缺少httpclient5和httpcore5相关依赖,执行创建catalog语句的时候会出现类似如下错误:
java.lang.NoClassDefFoundError: org/apache/hc/core5/http/ParseException
java.lang.NoClassDefFoundError: org/apache/hc/client5/http/classic/methods/HttpUriRequest
配置flink-conf.yaml
,增加checkpoint配置。只有触发checkpoint的时候,数据才写入到文件。否则写入的数据不可见。
execution.checkpointing.interval: 10s
接下来进入Flink SQL。执行如下命令创建Catalog:
create catalog hdfs_iceberg with (
'type' = 'arctic',
'metastore.url' = 'thrift://amoro_ip:1260/hdfs_iceberg',
'default-database' = 'default'
);
命令执行成功。接下来我们可以按照Flink SQL的使用方式创建Amoro表和插入查询表数据。只要操作的表位于上面例子中的hdfs_iceberg
catalog 范围内,会自动被Amoro纳管。
如果配置了Amoro HA,metastore.url
需要配置为:zookeeper://{zookeeper-server}/{cluster-name}/{catalog-name}
。
注意
如果Flink连接的catalog使用Hive metastore。需要将hive-exec-xxx.jar
复制到$Flink_HOME/lib
目录中。这种情况Flink只能使用JDK 8运行,否则会报错。
创建表:
CREATE TABLE `hdfs_iceberg`.`default`.`flink_test` (
id BIGINT,
name STRING,
op_time TIMESTAMP
);
在不创建catalog的情况下直接创建表:
CREATE TABLE `test_table` (
id BIGINT,
name STRING,
op_time TIMESTAMP
) WITH (
'connector' = 'arctic',
'metastore.url' = 'thrift://amoro_ip:1260/',
'arctic.catalog' = 'hdfs_iceberg',
'arctic.database' = 'default',
'arctic.table' = 'test_table'
);
注意
在Flink sql-client试用的时候,发现如果长时间没有操作,再次操作时会出现异常,无法查询出数据。为了解决这个问题,需要增加Flink配置classloader.check-leaked-classloader: false
。
Spark操作Amoro
和Flink类似,首先需要引入依赖。
添加如下jar到$SPARK_HOME/jars
目录:
- amoro-spark-3.3-runtime-0.6.1.jar
- httpclient5-5.3.1.jar
- httpcore5-5.1.5.jar
- httpcore5-h2-5.2.4.jar
使用下面的命令进入spark-sql。命令中需要指定Amoro catalog的thrift连接URL。示例如下:
./spark-sql \
--conf spark.sql.extensions=com.netease.arctic.spark.ArcticSparkExtensions \
--conf spark.sql.catalog.hdfs_iceberg=com.netease.arctic.spark.ArcticSparkCatalog \
--conf spark.sql.catalog.hdfs_iceberg.url=thrift://amoro_ip:1260/hdfs_iceberg
其中参数spark.sql.catalog.hdfs_iceberg
中的hdfs_iceberg
为Spark SQL中指定的catalog名称。格式为spark.sql.catalog.<catalog名称>
。
进入spark-sql之后第一次执行show catalogs;
,尽管返回结果没有hdfs_iceberg
这个catalog,但是我们仍可以通过use hdfs_iceberg
切换到这个catalog。
创建表:
CREATE TABLE hdfs_iceberg.default.sample (
id bigint COMMENT "unique id",
data string
) USING arctic;
查询不带时区timetamp字段类型的时候会出现如下错误:
java.lang.IllegalArgumentException: Cannot handle timestamp without timezone fields in Spark. Spark does not natively support this type but if you would like to handle all timestamps as timestamp with timezone set 'spark.sql.iceberg.handle-timestamp-without-timezone' to true. This will not change the underlying values stored but will change their displayed values in Spark. For more information please see https://docs.databricks.com/spark/latest/dataframes-datasets/dates-timestamps.html#ansi-sql-and-spark-sql-timestamps
解决方法:
SET `spark.sql.iceberg.handle-timestamp-without-timezone`=`true`;
Hive操作Amoro
Amoro mixed-hive格式同时支持使用Hive和Flink/Spark格式查询。
Hive查询mixed-hive格式表无需增加任何配置,无需引入任何依赖。
在Amoro创建表之后,Hive不需要事先创建表,会自动从Amoro同步(Hive只有同步过来的数据库和表,没有catalog)。
需要注意的是,Hive查询到iceberg格式的增量数据是有延迟的。必须等到full optimize运行完毕之后才能查询到全量结果。
网友评论