美文网首页
构建 Streaming Lakehouse:使用 Paimon

构建 Streaming Lakehouse:使用 Paimon

作者: Flink中文社区 | 来源:发表于2024-02-01 13:23 被阅读0次

    一、背景信息

    数据湖与传统的数据仓库相比,可以更灵活地处理各种类型的数据,并支持高度可扩展的存储,通常被用于大数据分析。为了支持准实时乃至实时的数据处理,数据湖需要能够快速地接收和存储数据(数据入湖),同时提供低延迟的查询性能以满足分析需求。

    Apache Paimon 和 Apache Hudi 作为数据湖存储格式,有着高吞吐的写入和低延迟的查询性能,是构建数据湖的常用组件。本文将在阿里云EMR上,针对数据实时入湖场景,对 Paimon 和 Hudi 的性能进行比对,然后分别以 Paimon 和 Hudi 作为统一存储搭建准实时数仓。

    二、集群环境

    本文使用的集群环境是最新的阿里云 EMR 5.16.0,集群节点的属性如下:

    • master: 1 * ecs.g7.2xlarge 8 vCPU 32 GiB

    • core: 4 * ecs.g7.6xlarge 24 vCPU 96 GiB

    使用的组件及版本如下:

    • Paimon: 0.7-SNAPSHOT(Paimon社区0.6 release版本)

    • Hudi: 0.14.0

    • Flink: 1.15

    • Spark: 3.3.1

    • OSS-HDFS: 1.0.0

    本文主要由两部分组成,分别是 Paimon 和 Hudi 数据实时入湖性能测试(Flink),以及 Paimon 和 Hudi 准实时数仓全链路搭建(Flink + Spark),测试数据均存储在 EMR 的 OSS-HDFS 中。

    三、数据实时入湖

    数据实时入湖是数据湖格式的一个重要应用场景,也是构建实时湖仓的第一步。本节测试参考的是 paimon-cluster-benchmark。按照实际的业务情况,划分了两个具体场景:upsert 场景(数据存在更新与订正)和纯 append 场景,在两个场景上分别检验 Paimon 和 Hudi 的读写能力。

    本节测试将使用 Flink 流式入湖,部署模式是 Flink Standalone 模式,Flink 配置如下,由于 TM 运行内存对测试结果影响较大,分别统计 8g/16g/20g 下的测试结果。并且由于本测试不需要用到 TM 的 managed 内存,将其设为 1m。

    parallelism.default: 16
    jobmanager.memory.process.size: 4g
    taskmanager.numberOfTaskSlots: 1
    taskmanager.memory.process.size: 8g/16g/20g
    execution.checkpointing.interval: 2min
    execution.checkpointing.max-concurrent-checkpoints: 3
    taskmanager.memory.managed.size: 1m
    state.backend: rocksdb
    state.backend.incremental: true
    table.exec.sink.upsert-materialize: NONE
    

    3.1 upsert 场景

    数据湖 upsert 用于更新或插入新数据。在进行 upsert 时,会检查待写的数据是否已存在于数据湖中。如果数据已存在,则更新该数据;如果数据不存在,则插入新数据。upsert 通常是基于某种唯一标识符或主键来判断数据是否已存在。

    本节测试数据源由 Flink datagen 产生,随机生成主键范围为 0~100,000,000 的数据,然后使用 Flink 将数据分别流式写入 Paimon 和 Hudi 表中,并统计写入 5 亿条数据(经统计,此时单个 bucket 内的 parquet 文件总大小在 2GB 内)的总耗时。同时,我们还使用 Flink 以批读的方式读取写入的 Paimon 和 Hudi 表,并统计总耗时。

    对于 upsert 场景,Paimon 选择 primary-key 表,Hudi 选择 merge-on-read 表,由于它们都支持 compaction,所以测试将进一步划分为关闭和开启 compaction。

    1. 关闭 compaction

    Paimon 表的配置如下,bucket 个数与 Flink 的并行度一致,设置为 16。由于 Hudi 默认文件格式为 parquet 格式,为了与 Hudi 保持一致,后续均采用 parquet 作为文件输出格式,压缩方式统一设为 snappy。

    'bucket' = '16',
    'file.format' = 'parquet',
    'file.compression' = 'snappy',
    'write-only' = 'true'
    

    Hudi 表的配置如下,采用 BUCKETindex,桶个数为 16,与 Flink 并行度一致。由于 Hudi MOR 表的读取会受到参数 compaction.max_memory 的影响,将其配置为 taskmanager.memory.process.size 的一半。

    'table.type' = 'MERGE_ON_READ',
    'metadata.enabled' = 'false',
    'index.type' = 'BUCKET',
    'hoodie.bucket.index.num.buckets' = '16',
    'write.operation' = 'upsert',
    'write.tasks' = '16',
    'hoodie.parquet.compression.codec' = 'snappy',
    'read.tasks' = '16',
    'compaction.schedule.enabled' = 'false',
    'compaction.async.enabled' = 'false',
    'compaction.max_memory' = '4096/8192/10240' -- TM process memory的一半
    

    测试结果如下:

    可以发现在 upsert 场景,关闭 compaction 时,Paimon 读写性能均优于 Hudi,且 Hudi 对 TM 的内存要求更高。

    1. 开启 compaction

    Paimon 配置:

    'bucket' = '16',
    'file.format' = 'parquet',
    'file.compression' = 'snappy',
    'num-sorted-run.compaction-trigger' = '5' -- 默认配置
    

    Hudi 配置:

    由于测试所需的总耗时不多(checkpoint 个数也相应较少),并且随着未 compaction 的 log 文件增加,Hudi 需要的 compaction 内存将变得更大,因此配置 compaction.delta_commits 为 2 来保证在写入期间有 compaction 执行完成。

    'table.type' = 'MERGE_ON_READ',
    'metadata.enabled' = 'false',
    'index.type' = 'BUCKET',
    'hoodie.bucket.index.num.buckets' = '16',
    'write.operation' = 'upsert',
    'write.tasks' = '16',
    'hoodie.parquet.compression.codec' = 'snappy',
    'read.tasks' = '16',
    'compaction.schedule.enabled' = 'true',
    'compaction.async.enabled' = 'true',
    'compaction.tasks' = '16',
    'compaction.delta_commits' = '2'
    'compaction.max_memory' = '4096/8192/10240' -- TM process memory的一半
    

    测试结果如下:

    在 upsert 场景,开启 compaction 时,Paimon 读写性能均优于 Hudi。对比前面的关闭 compaction 测试,Paimon 和 Hudi 的写性能均有所下降,但读性能得到提升。

    Hudi 的 compaction 比较消耗内存,运行时间较长,并且它是异步执行,当写入任务完成时,未完成的 compaction 是不会继续执行的。观察发现,当 TM 内存给到 20G 时,Hudi 仍有 4 个 delta commits 未被 compaction(即使配置了 compaction.delta_commits=2)。并且, Paimon 的 compaction 默认也不是 full compaction。因此,我们还做了以下补充测试,手动对 Paimon 和 Hudi 做一次 full compaction,然后对比读取数据的时间,结果如下:

    3.2 append 场景

    数据入湖的另一种场景是数据 append 写,比如日志入湖。

    本节测试数据源同样由 Flink datagen 产生,然后使用 Flink 写入 Paimon 和 Hudi 表中,同样统计使用 Flink 写入 5 亿条数据(在 append 场景 Paimon 和 Hudi 均不需要 bucket)的总耗时;以及使用 Flink 批读已写入的 Paimon 和 Hudi 表的总耗时。

    Paimon 表的配置:

    'bucket' = '-1',
    'file.format' = 'parquet',
    'file.compression' = 'snappy'
    

    Hudi 表的配置:

    由于单个批次数据量足够大,不存在小文件问题,因此关闭 clustering:

    'table.type' = 'COPY_ON_WRITE',
    'metadata.enabled' = 'false',
    'write.operation' = 'insert',
    'write.tasks' = '16',
    'hoodie.parquet.compression.codec' = 'snappy',
    'read.tasks' = '16',
    'write.insert.cluster' = 'false',
    'clustering.schedule.enabled' = 'false',
    'clustering.async.enabled' = 'false'
    

    测试结果如下:

    在 append 场景,Paimon 读写性能优于 Hudi,且二者都对 TM 内存要求均不高。

    四、准实时数仓

    在数据入湖之后,基于数据湖格式+流式引擎的强大能力,可以进一步构建一体化实时数仓。本节将分别以 Paimon 和 Hudi 为统一存储,在经典的电商场景下搭建一套准实时数仓,数仓具体有以下几层:

    1. ODS 层:通过 Flink 的 datagen connector 产生 orders(订单表,包含原始订单信息),再通过 Flink 实时写入,作为 ODS 层。

    2. DWM 层:通过 Spark streaming 实时消费 ODS 层,产出 DWM 层 dwm_shop_users(用户-商户聚合中间表,包含中间聚合指标)。

    3. DWS 层:通过 Spark streaming 实时消费 DWM 层的 changelog 数据,构建 DWS 层 dws_users(用户聚合指标表)以及 dws_shops(商户聚合指标表)。

    4.1 datagen -> ODS

    该层使用 Flink 实时入湖,为了更贴近生产环境,Flink 以 Yarn Session 模式启动,同时由于数据链路的增加,为了合理分配资源,对内存和并行度做出以下调整:

    yarn-session.sh -Dparallelism.default=8 \
                    -Djobmanager.memory.process.size=2g \
                    -Dtaskmanager.numberOfTaskSlots=2 \
                    -Dtaskmanager.memory.process.size=8g \
                    -Dtaskmanager.memory.managed.size=1m \
                    -Dexecution.checkpointing.interval=2min \
                    -Dexecution.checkpointing.max-concurrent-checkpoints=3 \
                    -Dstate.backend=rocksdb \
                    -Dstate.backend.incremental=true \
                    -Dtable.exec.sink.upsert-materialize=NONE \
                    --detached
    

    datagen 建表语句如下,rows-per-second 调整为10000

    CREATE TEMPORARY TABLE datagen_orders
    (
      order_name         STRING
      ,order_user_id     BIGINT
      ,order_shop_id     BIGINT
      ,order_product_id  BIGINT
      ,order_fee         DECIMAL(20, 2)
      ,order_state       INT
    )
    WITH (
      'connector' = 'datagen'
      ,'rows-per-second' = '10000'
      ,'fields.order_user_id.kind' = 'random'
      ,'fields.order_user_id.min' = '1'
      ,'fields.order_user_id.max' = '10000'
      ,'fields.order_shop_id.kind' = 'random'
      ,'fields.order_shop_id.min' = '1'
      ,'fields.order_shop_id.max' = '10000'
      ,'fields.order_product_id.kind' = 'random'
      ,'fields.order_product_id.min' = '1'
      ,'fields.order_product_id.max' = '1000'
      ,'fields.order_fee.kind' = 'random'
      ,'fields.order_fee.min' = '0.1'
      ,'fields.order_fee.max' = '10.0'
      ,'fields.order_state.kind' = 'random'
      ,'fields.order_state.min' = '1'
      ,'fields.order_state.max' = '5'
    );
    

    Paimon 建表和写入语句如下:

    CREATE TABLE IF NOT EXISTS paimon_catalog.order_dw.ods_orders
    (
      order_id           STRING
      ,order_name        STRING
      ,order_user_id     BIGINT
      ,order_shop_id     BIGINT
      ,order_product_id  BIGINT
      ,order_fee         DECIMAL(20, 2)
      ,order_create_time TIMESTAMP(3)
      ,order_update_time TIMESTAMP(3)
      ,order_state       INT
    )
    WITH (
      'bucket' = '-1',
      'file.format' = 'parquet',
      'file.compression' = 'snappy'
    );
    
    INSERT INTO paimon_catalog.order_dw.ods_orders
    SELECT
      UUID() AS order_id
      ,order_name
      ,order_user_id
      ,order_shop_id
      ,order_product_id
      ,order_fee
      ,NOW() AS order_create_time
      ,NOW() AS order_update_time
      ,order_state
    FROM datagen_orders;
    

    Hudi 建表和写入语句如下:

    create TEMPORARY table ods_orders
    (
      order_id           STRING
      ,order_name        STRING
      ,order_user_id     BIGINT
      ,order_shop_id     BIGINT
      ,order_product_id  BIGINT
      ,order_fee         DECIMAL(20, 2)
      ,order_create_time TIMESTAMP(3)
      ,order_update_time TIMESTAMP(3)
      ,order_state       INT
    )
    WITH (
        'connector' = 'hudi'
        ,'path' = '/xxx/hudi/order_dw.db/ods_orders'
        ,'precombine.field' = 'order_update_time'
        ,'table.type' = 'COPY_ON_WRITE'
        ,'hoodie.database.name' = 'order_dw'
        ,'hoodie.table.name' = 'ods_orders'
        ,'hoodie.datasource.write.recordkey.field' = 'order_id'
        ,'metadata.enabled' = 'false'
        ,'write.operation' = 'insert'
        ,'write.tasks' = '8'
        ,'hoodie.parquet.compression.codec' = 'snappy'
        ,'write.insert.cluster' = 'false'
        ,'clustering.schedule.enabled' = 'false'
        ,'clustering.async.enabled' = 'false'
    )
    ;
    
    INSERT INTO ods_orders
    SELECT
      UUID() AS order_id
      ,order_name
      ,order_user_id
      ,order_shop_id
      ,order_product_id
      ,order_fee
      ,NOW() AS order_create_time
      ,NOW() AS order_update_time
      ,order_state
    FROM datagen_orders;
    

    4.2 ODS -> DWM

    对于 Paimon 表,依靠其本身的聚合引擎能力,通过简单的配置(merge-engine)即可方便地聚合消费 pv 和总金额,从而构建用户-商户聚合中间表。同时由于下游需要读取 changelog,配置 changelog-producer 为 lookup。

    CREATE TABLE paimon_catalog.order_dw.dwm_shop_users
    (
      shop_id  BIGINT
      ,user_id BIGINT
      ,ds      STRING COMMENT '小时'
      ,pv      BIGINT COMMENT '该小时内,该用户在该商户的消费次数'
      ,fee_sum DECIMAL(20, 2) COMMENT '该小时内,该用户在该商户的消费总金额'
    )
    tblproperties (
      'primary-key' = 'shop_id, user_id, ds'
      ,'bucket' = '8'
      ,'changelog-producer' = 'lookup'
      ,'file.format' = 'parquet'
      ,'file.compression' = 'snappy'
      ,'merge-engine' = 'aggregation'
      ,'fields.pv.aggregate-function' = 'sum'
      ,'fields.fee_sum.aggregate-function' = 'sum'
      ,'metadata.stats-mode' = 'none'
    );
    

    Paimon Spark Streaming 作业示例代码如下:

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions.{date_format, lit}
    
    object PaimonOds2DwmJob {
    
      def main(args: Array[String]): Unit = {
    
        val spark = SparkSession.builder().getOrCreate()
        val sourceLocation = "/xxx/paimon/order_dw.db/ods_orders"
        val targetLocation = "/xxx/paimon/order_dw.db/dwm_shop_users"
        val checkpointDir = "/xxx/paimon/order_dw.db/dwm_shop_users_checkpoint"
        import spark.implicits._
    
        spark.readStream
          .format("paimon")
          .load(sourceLocation)
          .select(
            $"order_shop_id",
            $"order_user_id",
            date_format($"order_create_time", "yyyyMMddHH").alias("ds"),
            lit(1L),
            $"order_fee"
          )
          .writeStream
          .format("paimon")
          .option("checkpointLocation", checkpointDir)
          .start(targetLocation)
        
        spark.streams.awaitAnyTermination()
      }
    }
    

    对于 Hudi 表,想要实现类似的聚合操作,则需要依赖于自定义 Payload 或者 Merger 来实现,本文采用自定义 merger 实现,对 key 相同的记录的 uv、pv、fee_sum 字段进行聚合,核心逻辑如下:

    public class OrdersLakeHouseMerger extends HoodieAvroRecordMerger {
      @Override
      public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException {
        // ...
        Object oldData = older.getData();
        GenericData.Record oldRecord = (oldData instanceof HoodieRecordPayload)
            ? (GenericData.Record) ((HoodieRecordPayload) older.getData()).getInsertValue(oldSchema).get()
            : (GenericData.Record) oldData;
    
        Object newData = newer.getData();
        GenericData.Record newRecord = (newData instanceof HoodieRecordPayload)
            ? (GenericData.Record) ((HoodieRecordPayload) newer.getData()).getInsertValue(newSchema).get()
            : (GenericData.Record) newData;
    
        // merge uv
        if (HoodieAvroUtils.getFieldVal(newRecord, "uv") != null && HoodieAvroUtils.getFieldVal(oldRecord, "uv") != null) {
          newRecord.put("uv", (Long) oldRecord.get("uv") + (Long) newRecord.get("uv"));
        }
    
        // merge pv
        if (HoodieAvroUtils.getFieldVal(newRecord, "pv") != null && HoodieAvroUtils.getFieldVal(oldRecord, "pv") != null) {
          newRecord.put("pv", (Long) oldRecord.get("pv") + (Long) newRecord.get("pv"));
        }
    
        // merge fee_sum
        if (HoodieAvroUtils.getFieldVal(newRecord, "fee_sum") != null && HoodieAvroUtils.getFieldVal(oldRecord, "fee_sum") != null) {
          BigDecimal l = new BigDecimal(new BigInteger(((GenericData.Fixed) oldRecord.get("fee_sum")).bytes()), 2);
          BigDecimal r = new BigDecimal(new BigInteger(((GenericData.Fixed) newRecord.get("fee_sum")).bytes()), 2);
          byte[] bytes = l.add(r).unscaledValue().toByteArray();
          byte[] paddedBytes = new byte[9];
          System.arraycopy(bytes, 0, paddedBytes, 9 - bytes.length, bytes.length);
          newRecord.put("fee_sum", new GenericData.Fixed(((GenericData.Fixed) newRecord.get("fee_sum")).getSchema(), paddedBytes));
        }
        HoodieAvroIndexedRecord hoodieAvroIndexedRecord = new HoodieAvroIndexedRecord(newRecord);
        return Option.of(Pair.of(hoodieAvroIndexedRecord, newSchema));
      }
    }
    

    Hudi Spark Streaming 作业示例代码如下,由于下游需要读取 changelog,配置 hoodie.table.cdc.enabled 为 true。

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions.{date_format, lit}
    
    object Ods2DwmJob {
    
      def main(args: Array[String]): Unit = {
        
        val spark = SparkSession.builder().getOrCreate()
        val sourceLocation ="/xxx/hudi/order_dw.db/ods_orders"
        val targetLocation = "/xxx/hudi/order_dw.db/dwm_shop_users"
        val checkpointDir = "/xxx/hudi/order_dw.db/dwm_shop_users_checkpoint"
    
        import spark.implicits._
    
        spark.readStream
          .format("hudi")
          .load(sourceLocation)
          .select(
            $"order_shop_id".alias("shop_id"),
            $"order_user_id".alias("user_id"),
            date_format($"order_create_time", "yyyyMMddHH").alias("ds"),
            lit(1L).alias("pv"),
            $"order_fee".alias("fee_sum")
          )
          .writeStream
          .format("hudi")
          .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
          .option("hoodie.datasource.write.recordkey.field", "shop_id, user_id, ds")
          .option("hoodie.datasource.write.precombine.field", "ds")
          .option("hoodie.database.name", "order_dw")
          .option("hoodie.table.name", "dwm_shop_users")
          .option("hoodie.metadata.enable", "false")
          .option("hoodie.index.type", "BUCKET")
          .option("hoodie.bucket.index.num.buckets", "8")
          .option("hoodie.datasource.write.operation", "upsert")
          .option("hoodie.datasource.write.record.merger.impls", "org.apache.hudi.common.model.merger.OrdersLakeHouseMerger")
          .option("hoodie.parquet.compression.codec", "snappy")
          .option("hoodie.table.cdc.enabled", "true")
          .option("hoodie.table.cdc.supplemental.logging.mode", "data_before_after")
          .option("checkpointLocation", checkpointDir)
          .start(targetLocation)
        
        spark.streams.awaitAnyTermination()
      }
    }
    

    最后将作业分别提交任务到 yarn:

    spark-submit --class Ods2DwmJob \
                 --master yarn \
                 --deploy-mode cluster \
                 --name PaimonOds2DwmJob \
                 --conf spark.driver.memory=2g \
                 --conf spark.driver.cores=2 \
                 --conf spark.executor.instances=4 \
                 --conf spark.executor.memory=16g \
                 --conf spark.executor.cores=2 \
                 --conf spark.yarn.submit.waitAppCompletion=false \
                 ./paimon-spark-streaming-example.jar
    
    spark-submit --class Ods2DwmJob \
                 --master yarn \
                 --deploy-mode cluster \
                 --name HudiOds2DwmJob \
                 --conf spark.driver.memory=2g \
                 --conf spark.driver.cores=2 \
                 --conf spark.executor.instances=4 \
                 --conf spark.executor.memory=16g \
                 --conf spark.executor.cores=2 \
                 --conf spark.yarn.submit.waitAppCompletion=false \
                 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
                 --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
                 --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \
                 ./hudi-spark-streaming-example.jar
    
    • 性能对比

    在上述资源下,当作业稳定运行 100 个 batch(3小时左右)后的 Streaming 作业 UI 如下:

    此时,Paimon 单个 batch 写入时间为 40s 左右

    Hudi 单个 batch 写入时间为 65s 左右

    4.3 DWM -> DWS

    Paimon SparkSQL 建表语句如下,仍然配置聚合引擎对指定字段进行聚合:

    CREATE TABLE paimon_catalog.order_dw.dws_users
    (
      user_id  BIGINT
      ,ds      STRING COMMENT '小时'
      ,fee_sum DECIMAL(20, 2) COMMENT '该小时内,该用户的消费总金额'
    )
    tblproperties (
      'primary-key' = 'user_id, ds'
      ,'bucket' = '8'
      ,'merge-engine' = 'aggregation'
      ,'fields.fee_sum.aggregate-function' = 'sum'
    );
    
    CREATE TABLE paimon_catalog.order_dw.dws_shops
    (
      shop_id  BIGINT
      ,ds      STRING COMMENT '小时'
      ,uv      BIGINT COMMENT '该小时内,该商户的消费总人数'
      ,pv      BIGINT COMMENT '该小时内,该商户的消费总次数'
      ,fee_sum DECIMAL(20, 2) COMMENT '该小时内,该商户的消费总金额'
    )
    tblproperties (
      'primary-key' = 'shop_id, ds'
      ,'bucket' = '8'
      ,'merge-engine' = 'aggregation'
      ,'fields.uv.aggregate-function' = 'sum'
      ,'fields.pv.aggregate-function' = 'sum'
      ,'fields.fee_sum.aggregate-function' = 'sum'
    );
    

    Paimon Spark Streaming Dwm2DwsJob 如下,由于需要流读上游 changelog,配置 read.changelog 为 true。

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions.{lit, when}
    
    object Dwm2DwsJob {
      def main(args: Array[String]): Unit = {
        
        val spark = SparkSession.builder().getOrCreate()
        val sourceLocation = "/xxx/paimon/order_dw.db/dwm_shop_users"
        val targetLocation1 = "/xxx/paimon/order_dw.db/dws_users"
        val checkpointDir1 = "/xxx/paimon/order_dw.db/dws_users_checkpoint"
        val targetLocation2 = "/xxx/paimon/order_dw.db/dws_shops"
        val checkpointDir2 = "/xxx/paimon/order_dw.db/dws_shops_checkpoint"
        
        import spark.implicits._
    
        val df = spark.readStream
          .format("paimon")
          .option("read.changelog", "true")
          .load(sourceLocation)
    
        df.select(
          $"user_id",
          $"ds",
          when($"_row_kind" === "+I" || $"_row_kind" === "+U", $"fee_sum")
            .otherwise($"fee_sum" * -1)
            .alias("fee_sum"))
          .writeStream
          .format("paimon")
          .option("checkpointLocation", checkpointDir1)
          .start(targetLocation1)
    
        df.select(
          $"shop_id",
          $"ds",
          when($"_row_kind" === "+I" || $"_row_kind" === "+U", lit(1L)).otherwise(lit(-1L)).alias("uv"),
          when($"_row_kind" === "+I" || $"_row_kind" === "+U", $"pv").otherwise($"pv" * -1).alias("pv"),
          when($"_row_kind" === "+I" || $"_row_kind" === "+U", $"fee_sum")
            .otherwise($"fee_sum" * -1)
            .alias("fee_sum")
          .writeStream
          .format("paimon")
          .option("checkpointLocation", checkpointDir2)
          .start(targetLocation2)
        
        spark.streams.awaitAnyTermination()
      }
    }
    

    Hudi Spark Streaming Dwm2DwsJob 如下,可复用上一层定义的 Merger。由于 Hudi 也需要流读 changelog,配置 hoodie.datasource.query.type 为 incremental 以及 hoodie.datasource.query.incremental.format 为 cdc。Hudi 的 changelog 格式和 Paimon 不同,数据处理逻辑和 Paimon 略有不同。

    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions.{col, get_json_object, lit, when}
    import org.apache.spark.sql.types.{DecimalType, LongType}
    
    object Dwm2DwsJob {
    
      def main(args: Array[String]): Unit = {
        
        val spark = SparkSession.builder().getOrCreate()
        val sourceLocation ="/xxx/hudi/order_dw.db/dwm_shop_users"
        val targetLocation1 = "/xxx/hudi/order_dw.db/dws_users"
        val checkpointDir1 = "/xxx/hudi/order_dw.db/dws_users_checkpoint"
        val targetLocation2 = "/xxx/hudi/order_dw.db/dws_shops"
        val checkpointDir2 = "/xxx/hudi/order_dw.db/dws_shops_checkpoint"
    
        import spark.implicits._
    
        val df = spark.readStream
          .format("hudi")
          .option("hoodie.datasource.query.type", "incremental")
          .option("hoodie.datasource.query.incremental.format", "cdc")
          .load(sourceLocation)
    
        df.select(
          get_json_object($"after", "$.user_id").cast(LongType).alias("user_id"),
          get_json_object($"after", "$.ds").alias("ds"),
          when(get_json_object($"before", "$.fee_sum").isNotNull, get_json_object($"after", "$.fee_sum").cast(DecimalType(20, 2)) - get_json_object($"before", "$.fee_sum").cast(DecimalType(20, 2)))
            .otherwise(get_json_object($"after", "$.fee_sum").cast(DecimalType(20, 2)))
            .alias("fee_sum"))
          .writeStream
          .format("hudi")
          .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
          .option("hoodie.datasource.write.recordkey.field", "user_id, ds")
          .option("hoodie.datasource.write.precombine.field", "ds")
          .option("hoodie.database.name", "order_dw")
          .option("hoodie.table.name", "dws_users")
          .option("hoodie.metadata.enable", "false")
          .option("hoodie.index.type", "BUCKET")
          .option("hoodie.bucket.index.num.buckets", "8")
          .option("hoodie.datasource.write.operation", "upsert")
          .option("hoodie.datasource.write.record.merger.impls", "org.apache.hudi.common.model.merger.OrdersLakeHouseMerger")
          .option("hoodie.parquet.compression.codec", "snappy")
          .option("checkpointLocation", checkpointDir1)
          .start(targetLocation1)
    
        df.select(
          get_json_object($"after", "$.shop_id").cast(LongType).alias("shop_id"),
          get_json_object($"after", "$.ds").alias("ds"),
          when(get_json_object($"before", "$.fee_sum").isNotNull, lit(0L)).otherwise(lit(1L)).alias("uv"),
          when(get_json_object($"before", "$.fee_sum").isNotNull, get_json_object($"after", "$.pv").cast(LongType) - get_json_object($"before", "$.pv").cast(LongType))
            .otherwise(get_json_object($"after", "$.pv").cast(LongType))
            .alias("pv"),
          when(get_json_object($"before", "$.fee_sum").isNotNull, get_json_object($"after", "$.fee_sum").cast(DecimalType(20, 2)) - get_json_object($"before", "$.fee_sum").cast(DecimalType(20, 2)))
            .otherwise(get_json_object($"after", "$.fee_sum").cast(DecimalType(20, 2)))
            .alias("fee_sum"))
          .writeStream
          .format("hudi")
          .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
          .option("hoodie.datasource.write.recordkey.field", "shop_id, ds")
          .option("hoodie.datasource.write.precombine.field", "ds")
          .option("hoodie.database.name", "order_dw")
          .option("hoodie.table.name", "dws_shops")
          .option("hoodie.metadata.enable", "false")
          .option("hoodie.index.type", "BUCKET")
          .option("hoodie.bucket.index.num.buckets", "8")
          .option("hoodie.datasource.write.operation", "upsert")
          .option("hoodie.datasource.write.record.merger.impls", "org.apache.hudi.common.model.merger.OrdersLakeHouseMerger")
          .option("hoodie.parquet.compression.codec", "snappy")
          .option("checkpointLocation", checkpointDir2)
          .start(targetLocation2)
        
        spark.streams.awaitAnyTermination()
      }
    }
    

    最后将作业分别提交任务到 yarn:

    spark-submit --class Dwm2DwsJob \
                 --master yarn \
                 --deploy-mode cluster \
                 --name PaimonDwm2DwsJob \
                 --conf spark.driver.memory=2g \
                 --conf spark.driver.cores=2 \
                 --conf spark.executor.instances=4 \
                 --conf spark.executor.memory=8g \
                 --conf spark.executor.cores=2 \
                 --conf spark.yarn.submit.waitAppCompletion=false \
                 ./paimon-spark-streaming-example.jar
    
    spark-submit --class Dwm2DwsJob \
                 --master yarn \
                 --deploy-mode cluster \
                 --name HudiDwm2DwsJob \
                 --conf spark.driver.memory=2g \
                 --conf spark.driver.cores=2 \
                 --conf spark.executor.instances=4 \
                 --conf spark.executor.memory=8g \
                 --conf spark.executor.cores=2 \
                 --conf spark.yarn.submit.waitAppCompletion=false \
                 --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
                 --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
                 --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \
                 ./hudi-spark-streaming-example.jar
    
    • 性能对比

    在上述资源下,当作业稳定运行 100 个 batch(3 小时左右)后的 Streaming 作业 UI(以 dws_shops 表为例)如下:

    此时,Paimon 单个 batch 写入时间为 10s 左右

    Hudi 单个 batch 写入时间为 13s 左右

    4.4 SparkSQL 查询

    在该场景下,我们可以查询 DWM 层的 dwm_shop_users 表作为其他业务场景的上游表,也可以查询 DWS 层数据直接用于应用分析或者报表展示,使用如下两个 SQL 查询:

    -- SparkSQL 查询 ods_orders
    select order_id, order_user_id, order_shop_id, order_fee, order_create_time
    from order_dw.ods_orders 
    order by order_create_time desc limit 10;
    
    -- SparkSQL 查询 dws_shops
    select shop_id, ds, uv, pv, fee_sum 
    from order_dw.dws_shops 
    where ds = '2023120100' order by ds, shop_id limit 10;
    

    以上,我们分别以 Paimon 和 Hudi 完成了每小时增加 4 千万条记录(压缩后 10 GB)量级的实时 ETL 链路的搭建,均可以满足分钟级的生产场景的需求。

    五、总结

    1. 在实时入湖场景中,Paimon 具有比 Hudi 更强的读写性能,并且对内存的需求更小。

    2. 在数仓 DWM、DWS 层构建过程中,由于 Paimon 内置了 mergeFunction 功能,可以通过配置参数直接构建聚合指标,而 Hudi 需要通过手动编写自定义 Payload 或者 Merger 来实现。

    3. 在基于 Spark 构建的准实时数仓的各层链路中,Paimon 计算单个 batch 的耗时均比 Hudi 更短。

    相关文章

      网友评论

          本文标题:构建 Streaming Lakehouse:使用 Paimon

          本文链接:https://www.haomeiwen.com/subject/xofzodtx.html