美文网首页
Amoro简介,部署配置和使用

Amoro简介,部署配置和使用

作者: AlienPaul | 来源:发表于2024-06-23 16:37 被阅读0次

    Amoro简介

    Amoro 是一个与数据湖仓一体管理和优化相关的技术平台,致力于提升数据湖和数据仓库的性能与管理效率。Amoro 是构建在 Apache Iceberg、Pimon 等开放数据湖表格式之上的湖仓管理系统。具备自动优化机制,可以持续监控数据表上的文件状况。适用于需要进行大量数据分析和查询的场景,如企业数据仓库、大数据分析项目等。
    本篇基于本人的使用情况,为大家分享Amoro的部署安装,简单使用和注意事项。

    环境信息

    • Amoro:0.6.1
    • Flink:1.17.1
    • Spark:3.3.0
    • Hadoop:3.1.1
    • MySQL:5.7.x

    安装

    下载并解压Amoro到服务器任意目录。
    下载链接:Download (apache.org)
    Flink和Spark访问Amoro的运行时依赖也在此链接下载。

    Amoro需要使用RMDBS作为元数据存储。这里我们以MySQL为例。下载mysql-connector-java到Amoro的lib目录。

    wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
    

    使用MySQL命令行创建amoro数据库,并创建一个用户,赋予amoro数据库的所有权限。(下面为了演示方便,直接使用root账户,生产环境不要这么使用)。

    修改Amoro配置文件(conf/config.yaml),增加MySQL相关配置。例如:

    ams:
      database:
        type: mysql
        jdbc-driver-class: com.mysql.cj.jdbc.Driver
        url: jdbc:mysql://127.0.0.1:3306/amoro?useUnicode=true&characterEncoding=UTF8&autoReconnect=true&useAffectedRows=true&useSSL=false
        username: root
        password: root
    

    最后执行bin/ams.sh start启动Amoro服务。

    此外Amoro还支持HA(高可用)模式。Amoro的高可用为一主多从,需要依赖外部Zookeeper集群进行主节点选举。HA的相关配置如下:

    ams:
      ha:
        enabled: true  # 开启HA模式
        cluster-name: default # 指定集群名字。Amoro允许同一个Zookeeper集群负责多个Amoro集群的选主,这些Amoro集群需要使用cluster-name来区分
        zookeeper-address: 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 # Zookeeper服务的地址和端口号
    

    表格式

    Amoro支持的格式和特性记录如下所示。

    • Iceberg。不允许指定主键。
    • Mixed Iceberg。除了iceberg 之外还包含logstore。Logstore为append only,可加速流式数据消费过程。允许指定主键。在minor optimize运行的时候去重。Minor Optimize运行之前可查询到upsert之前的数据,存在重复。Minor Optimize之后历史数据消失。
    • Mixed Hive。使用Hive作为base store,iceberg作为change store。

    使用

    Catalog配置

    创建Optimizer Group

    和Hudi类似,Amoro的iceberg表在写入的时候会产生一系列的碎片文件。会存在大量的冗余和过期的数据。如果不定期优化表的话,表服务性能会严重降低,甚至不可用。
    Optimizer是Amoro专门为表优化设计的服务。可以类比为Hudi的compaction和clean等服务。相比Hudi的优点是Amoro支持这些服务的可视化管理。这些服务的资源隔离,资源消耗,运行状态等可以通过Amoro管理界面查看和操作。
    在Amoro中,一个Optimizer Group代表了一部分资源。Catalog和Optimizer Group存在1对多的关系。通过这种关系可以实现Optimizer资源的共享和隔离。
    在创建Catalog之前需要先创建optimizer group。打开左侧菜单的Optimizing,点击右侧的Optimizer Groups标签。然后点击Add Group按钮。输入名字,选择一个之前配置的container。主要注意的是属性必须配置memory,value的单位是MB。它代表了一个并行度占用的内存大小。
    创建完毕之后点击Operation中的Scale-Out,指定并行度之后点击确定。最后在Optimizers标签中可以看到正在运行的Optimizer。

    创建Catalogs

    打开左侧菜单的Catalogs,点击Catalog List中的+按钮。主要指定的配置项为:

    • Name: Catalog的名字。
    • Type:使用内部Catalog还是外部Catalog。如果使用外部Hive metastore,需要上传hive-site.xml
    • Table Format:Catalog中表的格式,对于内部Catalog目前支持的是Mixed Iceberg和Iceberg。
    • Optimizer Group:运行表优化服务的Optimizer Group(可以理解为资源池),选择上一步创建好的optimizer。
    • Storage的Type:存储类型。目前支持Hadoop和S3。对于Hadoop类型,需要上传Hadoop的core-site.xmlhdfs-site.xml两个文件。
    • Authentication的Type:认证类型。SIMPLE类型需要提供Hadoop的用户名。KERBEROS类型需要提供Principal,Keytab文件和krb5.conf文件。
    • Properties:属性设置。必须的属性配置为warehouse。含义为本Catalog下表数据在分布式存储中的路径。如果使用的是External Catalog,无需配置warehouse,例如配置Hive Metastore,表文件所在的位置为hdfs://ip:port/warehouse/tablespace/managed/hive。相当于Hive managed table。
    • Table Properties:如果该Catalog下所有表具有相同的属性配置,可以将这些配置写在此处。

    Terminal操作Amoro

    为方便用户轻量级使用,Amoro管理页面提供了一个可视化终端。该终端后台使用的是Spark。

    创建数据库:

    create database default;
    

    创建表:

    create table test1 (id int, data string, ts timestamp);
    

    注意
    使用界面Terminal的时候,建表语句不要使用use arctic

    其余操作和使用Spark SQL完全相同。

    Flink操作Amoro

    首先需要引入依赖。添加如下jar包到$FLINK_HOME/lib目录中:

    • amoro-flink-runtime-1.17-0.6.1.jar
    • flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
    • httpclient5-5.3.1.jar
    • httpcore5-5.1.5.jar
    • httpcore5-h2-5.2.4.jar

    需要注意的是,如果缺少httpclient5和httpcore5相关依赖,执行创建catalog语句的时候会出现类似如下错误:

    java.lang.NoClassDefFoundError: org/apache/hc/core5/http/ParseException
    
    java.lang.NoClassDefFoundError: org/apache/hc/client5/http/classic/methods/HttpUriRequest
    

    配置flink-conf.yaml,增加checkpoint配置。只有触发checkpoint的时候,数据才写入到文件。否则写入的数据不可见。

    execution.checkpointing.interval: 10s
    

    接下来进入Flink SQL。执行如下命令创建Catalog:

    create catalog hdfs_iceberg with (
        'type' = 'arctic', 
        'metastore.url' = 'thrift://amoro_ip:1260/hdfs_iceberg', 
        'default-database' = 'default'
    );
    

    命令执行成功。接下来我们可以按照Flink SQL的使用方式创建Amoro表和插入查询表数据。只要操作的表位于上面例子中的hdfs_iceberg catalog 范围内,会自动被Amoro纳管。

    如果配置了Amoro HA,metastore.url需要配置为:zookeeper://{zookeeper-server}/{cluster-name}/{catalog-name}

    注意
    如果Flink连接的catalog使用Hive metastore。需要将hive-exec-xxx.jar复制到$Flink_HOME/lib目录中。这种情况Flink只能使用JDK 8运行,否则会报错。

    创建表:

    CREATE TABLE `hdfs_iceberg`.`default`.`flink_test` (
        id BIGINT,
        name STRING,
        op_time TIMESTAMP
    );
    

    在不创建catalog的情况下直接创建表:

    CREATE TABLE `test_table` (
        id BIGINT,
        name STRING,
        op_time TIMESTAMP
    ) WITH (
        'connector' = 'arctic',
        'metastore.url' = 'thrift://amoro_ip:1260/',
        'arctic.catalog' = 'hdfs_iceberg',
        'arctic.database' = 'default',
        'arctic.table' = 'test_table'
    );
    

    注意
    在Flink sql-client试用的时候,发现如果长时间没有操作,再次操作时会出现异常,无法查询出数据。为了解决这个问题,需要增加Flink配置classloader.check-leaked-classloader: false

    Spark操作Amoro

    和Flink类似,首先需要引入依赖。

    添加如下jar到$SPARK_HOME/jars目录:

    • amoro-spark-3.3-runtime-0.6.1.jar
    • httpclient5-5.3.1.jar
    • httpcore5-5.1.5.jar
    • httpcore5-h2-5.2.4.jar

    使用下面的命令进入spark-sql。命令中需要指定Amoro catalog的thrift连接URL。示例如下:

    ./spark-sql \
        --conf spark.sql.extensions=com.netease.arctic.spark.ArcticSparkExtensions \
        --conf spark.sql.catalog.hdfs_iceberg=com.netease.arctic.spark.ArcticSparkCatalog \
        --conf spark.sql.catalog.hdfs_iceberg.url=thrift://amoro_ip:1260/hdfs_iceberg
    

    其中参数spark.sql.catalog.hdfs_iceberg中的hdfs_iceberg为Spark SQL中指定的catalog名称。格式为spark.sql.catalog.<catalog名称>
    进入spark-sql之后第一次执行show catalogs;,尽管返回结果没有hdfs_iceberg这个catalog,但是我们仍可以通过use hdfs_iceberg切换到这个catalog。

    创建表:

    CREATE TABLE hdfs_iceberg.default.sample (
        id bigint  COMMENT "unique id",
        data string
    ) USING arctic;
    

    查询不带时区timetamp字段类型的时候会出现如下错误:

    java.lang.IllegalArgumentException: Cannot handle timestamp without timezone fields in Spark. Spark does not natively support this type but if you would like to handle all timestamps as timestamp with timezone set 'spark.sql.iceberg.handle-timestamp-without-timezone' to true. This will not change the underlying values stored but will change their displayed values in Spark. For more information please see https://docs.databricks.com/spark/latest/dataframes-datasets/dates-timestamps.html#ansi-sql-and-spark-sql-timestamps
    

    解决方法:

    SET `spark.sql.iceberg.handle-timestamp-without-timezone`=`true`;
    

    Hive操作Amoro

    Amoro mixed-hive格式同时支持使用Hive和Flink/Spark格式查询。
    Hive查询mixed-hive格式表无需增加任何配置,无需引入任何依赖。
    在Amoro创建表之后,Hive不需要事先创建表,会自动从Amoro同步(Hive只有同步过来的数据库和表,没有catalog)。
    需要注意的是,Hive查询到iceberg格式的增量数据是有延迟的。必须等到full optimize运行完毕之后才能查询到全量结果。

    相关文章

      网友评论

          本文标题:Amoro简介,部署配置和使用

          本文链接:https://www.haomeiwen.com/subject/guqrcjtx.html