美文网首页Java玩转大数据
Hudi Compaction使用和源码分析

Hudi Compaction使用和源码分析

作者: AlienPaul | 来源:发表于2023-09-16 10:43 被阅读0次

简介

Compaction适用于MOR表。将file slice中记录增量数据变更的log文件中的record和base file(parquet文件)合并,最后生成新的base file。

Compaction目的是减少文件数目,清理无用的历史数据,提高查询性能。

Compaction分为两步:schedule和执行。其中schedule步骤判断是否需要执行compaction,并生成compaction计划,在timeline中写入一个requested状态的compaction instant。执行步骤读取之前生成的compaction计划并执行。执行完毕后compaction instant状态转换为completed。

Compaction的触发策略

触发策略对应配置项hoodie.compact.inline.trigger.strategy,他有如下触发策略:

  • NUM_COMMITS: trigger compaction when reach N delta commits。从上次compaction执行完之后,达到N个delta commit触发compaction。
  • NUM_COMMITS_AFTER_LAST_REQUEST: trigger compaction when reach N delta commits since last compaction request。和前面不同的是从上次schedule的compaction开始算N个,如果没有scheduled的compaction,从上次执行完的compaction开始算。
  • TIME_ELAPSED: trigger compaction when time elapsed > N seconds since last compaction。上次compaction执行完N秒后触发compaction。
  • NUM_AND_TIME: trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied。delta commit个数和间隔时间两个条件都要满足。
  • NUM_OR_TIME: trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied。delta commit个数和间隔时间两个条件有一个满足。

Delta commit个数和间隔时间的配置项为:

  • hoodie.compact.inline.max.delta.commits:在上次compaction发生后生成N个delta commit之后,可尝试schedule下一次compaction。默认值为5。
  • hoodie.compact.inline.max.delta.seconds:距离上次compaction N秒钟之后可尝试schedule下一次compaction。默认值为3600秒。

同步和异步

Hudi可以在每次写入完毕的时候schedule或者执行compaction,这称为同步执行。也可以使用外部服务来schedule或者执行compaction,称之为异步执行。

同步schedule同步执行

配置项 设定值
hoodie.compact.inline true
hoodie.compact.schedule.inline false

注意:这里不用开启hoodie.compact.schedule.inline。hoodie.compact.inline包含了schedule过程。schedule和执行compaction是连续运行的。

异步schedule异步执行

配置项 设定值
hoodie.compact.inline false
hoodie.compact.schedule.inline false

通过Spark发起排期(schedule)作业:

sudo -u hadoop spark-submit \
  --jars '/usr/lib/hudi/hudi-spark-bundle.jar' \
  --class 'org.apache.hudi.utilities.HoodieCompactor' \
  /usr/lib/hudi/hudi-utilities-bundle.jar \
  --spark-memory '4g' \
  --mode 'schedule' \
  --base-path "hdfs://..." \
  --table-name "$TABLE_NAME" \
  --hoodie-conf "hoodie.compact.inline.max.delta.commits=3"

通过Spark发起执行(execute)作业:

sudo -u hadoop spark-submit \
  --jars '/usr/lib/hudi/hudi-spark-bundle.jar' \
  --class "org.apache.hudi.utilities.HoodieCompactor" \
  /usr/lib/hudi/hudi-utilities-bundle.jar \
  --spark-memory '4g' \
  --mode 'execute' \
  --base-path "hdfs://..." \
  --table-name "$TABLE_NAME"

同步schedule异步执行

配置项 设定值
hoodie.compact.inline false
hoodie.compact.schedule.inline true

异步执行的方式和上面相同。

使用Flink异步schedule或者执行compaction。

./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink-bundle_2.11-0.9.0.jar --path hdfs://xxx:9000/table

参数和使用方式参考官网:Compaction | Apache Hudi

配置项

inline compaction配置:

  • hoodie.compact.inline:每次写入之后是否触发schedule和执行compaction。默认为false,启用会增大写入延迟。
  • hoodie.compact.schedule.inline:每次写入之后是否会schedule compaction。只是尝试schedule。compaction触发条件仍然是有效的。默认为false。
  • hoodie.log.compaction.inline:每次写入之后是否会触发log compaction。默认为false,启用会增大写入延迟。Flink目前无法使用。

触发策略配置:

  • hoodie.compact.inline.trigger.strategy:compaction触发策略
  • hoodie.compact.inline.max.delta.commits:在上次compaction发生后生成N个delta commit之后,可尝试schedule下一次compaction。默认值为5。
  • hoodie.compact.inline.max.delta.seconds:距离上次compaction N秒钟之后可尝试schedule下一次compaction。默认值为3600秒。

其他配置:

  • hoodie.memory.compaction.max.size: compaction操作可使用的最大内存数
  • hoodie.compaction.lazy.block.read:使用启用log block lazy read,默认启用,会减少内存消耗
  • hoodie.compaction.reverse.log.read:正向读取log file还是反向,默认为正向
  • hoodie.memory.dfs.buffer.max.size:读取文件系统的buffer size,默认值为16 * 1024 * 1024
  • hoodie.memory.spillable.map.path:compaction缓存log内数据内存不足时溢写磁盘路径。默认为/tmp
  • hoodie.common.spillable.diskmap.type:溢写磁盘的数据组织形式。默认为BITCASK
  • hoodie.optimized.log.blocks.scan.enable:是否启用优化log block扫描。默认为false

Schedule代码分析

这一章我们分析compaction计划的生成步骤。

BaseHoodieWriteClient

我们从BaseHoodieWriteClient::scheduleCompaction开始分析。调用链涉及到的方法如下:

public Option<String> scheduleCompaction(Option<Map<String, String>> extraMetadata) throws HoodieIOException {
    String instantTime = HoodieActiveTimeline.createNewInstantTime();
    return scheduleCompactionAtInstant(instantTime, extraMetadata) ? Option.of(instantTime) : Option.empty();
}

public boolean scheduleCompactionAtInstant(String instantTime, Option<Map<String, String>> extraMetadata) throws HoodieIOException {
    return scheduleTableService(instantTime, extraMetadata, TableServiceType.COMPACT).isPresent();
}

public Option<String> scheduleTableService(String instantTime, Option<Map<String, String>> extraMetadata, TableServiceType tableServiceType) {
    return tableServiceClient.scheduleTableService(instantTime, extraMetadata, tableServiceType);
}

这个调用链并不复杂,对于Flink而言我们一路跟踪调用终到达HoodieFlinkMergeOnReadTablescheduleCompaction方法。代码如下:

@Override
public Option<HoodieCompactionPlan> scheduleCompaction(
    HoodieEngineContext context,
    String instantTime,
    Option<Map<String, String>> extraMetadata) {
    ScheduleCompactionActionExecutor scheduleCompactionExecutor = new ScheduleCompactionActionExecutor(
        context, config, this, instantTime, extraMetadata, WriteOperationType.COMPACT);
    return scheduleCompactionExecutor.execute();
}

这里使用了ScheduleCompactionActionExecutor。这个执行器专用于生成compaction计划。接下来我们展开分析。

ScheduleCompactionActionExecutor

该执行器负责生成compaction plan。在生成compaction plan之前,检查该时间点之前是否存在未完成的写入。是否有pending compaction在当前时间点之后。如果有,compaction会发生冲突。成功生成compaction plan之后,在时间线上创建一个requested状态的类型为compaction的instant。

@Override
public Option<HoodieCompactionPlan> execute() {
    // 只有MOR表支持压缩
    ValidationUtils.checkArgument(this.table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ,
                                  "Can only compact table of type " + HoodieTableType.MERGE_ON_READ + " and not "
                                  + this.table.getMetaClient().getTableType().name());
    if (!config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
        && !config.getFailedWritesCleanPolicy().isLazy()) {
        // 如果没有使用多端写入并发控制,并且没有开启失败写入懒清理策略(仅在clean阶段清理)
        // TODO(yihua): this validation is removed for Java client used by kafka-connect.  Need to revisit this.
        if (config.getEngineType() == EngineType.SPARK) {
            // 如果使用的是SPARK引擎
            // 检查最早的inflight状态的instant时间,必须在当前schedule的compaction之后
            // if there are inflight writes, their instantTime must not be less than that of compaction instant time
            table.getActiveTimeline().getCommitsTimeline().filterPendingExcludingMajorAndMinorCompaction().firstInstant()
                .ifPresent(earliestInflight -> ValidationUtils.checkArgument(
                    HoodieTimeline.compareTimestamps(earliestInflight.getTimestamp(), HoodieTimeline.GREATER_THAN, instantTime),
                    "Earliest write inflight instant time must be later than compaction time. Earliest :" + earliestInflight
                    + ", Compaction scheduled at " + instantTime));
        }
        // Committed and pending compaction instants should have strictly lower timestamps
        // 未完成的compaction和commit instant必须在本次schedule时间之前
        // 考虑到连续多次schedule compaction但是没有执行压缩的情况,新schedule的compaction时间必须在原来schedule的之后,否则冲突
        List<HoodieInstant> conflictingInstants = table.getActiveTimeline()
            .getWriteTimeline().filterCompletedAndCompactionInstants().getInstantsAsStream()
            .filter(instant -> HoodieTimeline.compareTimestamps(
                instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime))
            .collect(Collectors.toList());
        ValidationUtils.checkArgument(conflictingInstants.isEmpty(),
                                      "Following instants have timestamps >= compactionInstant (" + instantTime + ") Instants :"
                                      + conflictingInstants);
    }

    // 生成compaction计划,后面分析
    HoodieCompactionPlan plan = scheduleCompaction();
    Option<HoodieCompactionPlan> option = Option.empty();
    if (plan != null && nonEmpty(plan.getOperations())) {
        // 如果需要压缩,compaction plan不会为空
        // 保存额外的metadata
        extraMetadata.ifPresent(plan::setExtraMetadata);
        try {
            if (operationType.equals(WriteOperationType.COMPACT)) {
                // 如果是COMPACT类型,压缩log文件和base file
                // 上面调用链中传入的operationType是COMPACT
                // 写入一个状态为requested,类型为compaction的instant
                HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.REQUESTED,
                                                                    HoodieTimeline.COMPACTION_ACTION, instantTime);
                table.getActiveTimeline().saveToCompactionRequested(compactionInstant,
                                                                    TimelineMetadataUtils.serializeCompactionPlan(plan));
            } else {
                // 如果是LOG_COMPACTION类型(仅压缩log文件)
                // 同样写入一个状态为requested,类型为LOG_COMPACTION的instant
                HoodieInstant logCompactionInstant = new HoodieInstant(HoodieInstant.State.REQUESTED,
                                                                       HoodieTimeline.LOG_COMPACTION_ACTION, instantTime);
                table.getActiveTimeline().saveToLogCompactionRequested(logCompactionInstant,
                                                                       TimelineMetadataUtils.serializeCompactionPlan(plan));
            }
        } catch (IOException ioe) {
            throw new HoodieIOException("Exception scheduling compaction", ioe);
        }
        option = Option.of(plan);
    }

    return option;
}

接下来重点是scheduleCompaction方法,用来生成compaction计划。代码如下:

@Nullable
private HoodieCompactionPlan scheduleCompaction() {
    LOG.info("Checking if compaction needs to be run on " + config.getBasePath());
    // judge if we need to compact according to num delta commits and time elapsed
    // 检查是否需要压缩,根据配置的检查策略(N个delta commit后压缩,还是间隔多长时间之后压缩。也可以是这两个条件的与或关系)来决定
    boolean compactable = needCompact(config.getInlineCompactTriggerStrategy());
    if (compactable) {
        // 如果可以压缩,生成compaction计划
        LOG.info("Generating compaction plan for merge on read table " + config.getBasePath());
        try {
            context.setJobStatus(this.getClass().getSimpleName(), "Compaction: generating compaction plan");
            return planGenerator.generateCompactionPlan();
        } catch (IOException e) {
            throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e);
        }
    }
    return new HoodieCompactionPlan();
}

在生成compaction plan之前,需要判断是否需要压缩。判断逻辑位于needCompact方法。Hudi定义了一些需要执行compaction的判断条件:

  • 从上次执行compaction(已完成的compaction)间隔n个delta commit之后可以尝试schedule新的compaction
  • 从上次request compaction(scheduled的compaction,如果没有scheduled的compaction,和上面的相同) n个delta commit后尝试schedule新的compaction
  • 上次执行compaction n秒之后尝试schedule新的compaction
  • delta commit个数或间隔时间两个条件
  • delta commit个数与间隔时间两个条件

接下来分析代码:

private boolean needCompact(CompactionTriggerStrategy compactionTriggerStrategy) {
    boolean compactable;
    // get deltaCommitsSinceLastCompaction and lastCompactionTs
    // pair第一个参数是最近一次compaction之后的delta commit个数
    // 第二个参数是最近一次compaction的instant time
    Option<Pair<Integer, String>> latestDeltaCommitInfoOption = getLatestDeltaCommitInfo();
    if (!latestDeltaCommitInfoOption.isPresent()) {
        return false;
    }
    Pair<Integer, String> latestDeltaCommitInfo = latestDeltaCommitInfoOption.get();
    // 如果仅压缩log,返回true
    if (WriteOperationType.LOG_COMPACT.equals(operationType)) {
        return true;
    }
    // 获取从上次compaction间隔n个delta commit之后可以尝试schedule新的compaction
    // 对应配置项hoodie.compact.inline.max.delta.commits
    int inlineCompactDeltaCommitMax = config.getInlineCompactDeltaCommitMax();
    // 获取从上次compaction间隔n秒后可以尝试schedule新的compaction
    // 对应配置项hoodie.compact.inline.max.delta.seconds
    int inlineCompactDeltaSecondsMax = config.getInlineCompactDeltaSecondsMax();
    // 检查提交策略
    switch (compactionTriggerStrategy) {
        case NUM_COMMITS:
            // 如果使用从上次执行compaction,n个delta commit后尝试schedule新的compaction的策略
            // 判断delta commit数量是否大于等于inlineCompactDeltaCommitMax
            compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft();
            if (compactable) {
                LOG.info(String.format("The delta commits >= %s, trigger compaction scheduler.", inlineCompactDeltaCommitMax));
            }
            break;
        case NUM_COMMITS_AFTER_LAST_REQUEST:
            // 该策略是在从上次request compaction n个delta commit后尝试schedule新的compaction,而不是从上次执行compaction n个delta commit后尝试schedule新的compaction
            latestDeltaCommitInfoOption = getLatestDeltaCommitInfoSinceLastCompactionRequest();

            if (!latestDeltaCommitInfoOption.isPresent()) {
                return false;
            }
            latestDeltaCommitInfo = latestDeltaCommitInfoOption.get();
            compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft();
            if (compactable) {
                LOG.info(String.format("The delta commits >= %s since the last compaction request, trigger compaction scheduler.", inlineCompactDeltaCommitMax));
            }
            break;
        case TIME_ELAPSED:
            // 上次执行compaction n秒之后尝试schedule新的compaction
            // 比较时间
            compactable = inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - parsedToSeconds(latestDeltaCommitInfo.getRight());
            if (compactable) {
                LOG.info(String.format("The elapsed time >=%ss, trigger compaction scheduler.", inlineCompactDeltaSecondsMax));
            }
            break;
        case NUM_OR_TIME:
            // 前面的两个条件,或关系
            compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft()
                || inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - parsedToSeconds(latestDeltaCommitInfo.getRight());
            if (compactable) {
                LOG.info(String.format("The delta commits >= %s or elapsed_time >=%ss, trigger compaction scheduler.", inlineCompactDeltaCommitMax,
                                       inlineCompactDeltaSecondsMax));
            }
            break;
        case NUM_AND_TIME:
            // 前面的两个条件,与关系
            compactable = inlineCompactDeltaCommitMax <= latestDeltaCommitInfo.getLeft()
                && inlineCompactDeltaSecondsMax <= parsedToSeconds(instantTime) - parsedToSeconds(latestDeltaCommitInfo.getRight());
            if (compactable) {
                LOG.info(String.format("The delta commits >= %s and elapsed_time >=%ss, trigger compaction scheduler.", inlineCompactDeltaCommitMax,
                                       inlineCompactDeltaSecondsMax));
            }
            break;
        default:
            throw new HoodieCompactionException("Unsupported compaction trigger strategy: " + config.getInlineCompactTriggerStrategy());
    }
    return compactable;
}

BaseHoodieCompactionPlanGenerator

BaseHoodieCompactionPlanGenerator负责生成执行计划。generateCompactionPlan方法遍历所有的partition path中的file slice,剔除掉不符合条件的file,加入到压缩计划。详细分析如下:

@Nullable
public HoodieCompactionPlan generateCompactionPlan() throws IOException {
    // Accumulator to keep track of total log files for a table
    HoodieAccumulator totalLogFiles = engineContext.newAccumulator();
    // Accumulator to keep track of total log file slices for a table
    HoodieAccumulator totalFileSlices = engineContext.newAccumulator();

    // TODO : check if maxMemory is not greater than JVM or executor memory
    // TODO - rollback any compactions in flight
    // 获取metaClient
    HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
    // 获取所有的分区
    List<String> partitionPaths = FSUtils.getAllPartitionPaths(engineContext, writeConfig.getMetadataConfig(), metaClient.getBasePath());

    // filter the partition paths if needed to reduce list status
    // 不处理,返回partition paths
    partitionPaths = filterPartitionPathsByStrategy(writeConfig, partitionPaths);

    // 如果没有分区,不用压缩
    if (partitionPaths.isEmpty()) {
        // In case no partitions could be picked, return no compaction plan
        return null;
    }
    LOG.info("Looking for files to compact in " + partitionPaths + " partitions");
    engineContext.setJobStatus(this.getClass().getSimpleName(), "Looking for files to compact: " + writeConfig.getTableName());

    SyncableFileSystemView fileSystemView = (SyncableFileSystemView) this.hoodieTable.getSliceView();
    // 获取文件系统中即将压缩的file group,这些文件后面需要排除
    Set<HoodieFileGroupId> fgIdsInPendingCompactionAndClustering = fileSystemView.getPendingCompactionOperations()
        .map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId())
        .collect(Collectors.toSet());

    // Exclude files in pending clustering from compaction.
    // 需要排除的还有即将clustering操作涉及的文件
    fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));

    // 如果操作类型是log compact
    if (filterLogCompactionOperations()) {
        // 需要排除pending log compaction涉及到的文件
        fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getPendingLogCompactionOperations()
                                                     .map(instantTimeOpPair -> instantTimeOpPair.getValue().getFileGroupId())
                                                     .collect(Collectors.toList()));
    }

    // 获取最后一个completed commit/rollback的instant time
    String lastCompletedInstantTime = hoodieTable.getMetaClient()
        .getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
                                                                            HoodieTimeline.ROLLBACK_ACTION, HoodieTimeline.DELTA_COMMIT_ACTION))
        .filterCompletedInstants().lastInstant().get().getTimestamp();

    // 遍历所有的partition path
    List<HoodieCompactionOperation> operations = engineContext.flatMap(partitionPaths, partitionPath -> fileSystemView
    // 获取分区下的fileSlice,以instant time倒序排列
    // 去掉replaced掉的file slice (replace commit)
    // 过滤掉inflight compaction正在操作的base file,异步compaction可能存在这种情况
                                                                       .getLatestFileSlices(partitionPath)
    // 排除掉fgIdsInPendingCompactionAndClustering包含的文件
                                                                       .filter(slice -> filterFileSlice(slice, lastCompletedInstantTime, fgIdsInPendingCompactionAndClustering))
                                                                       .map(s -> {
    // 获取并排序log文件
                                                                           List<HoodieLogFile> logFiles =
                                                                               s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(toList());
                                                                           totalLogFiles.add(logFiles.size());
                                                                           totalFileSlices.add(1L);
                                                                           // Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO
                                                                           // for Map operations and collecting them finally in Avro generated classes for storing
                                                                           // into meta files.6
    // 获取base file
                                                                           Option<HoodieBaseFile> dataFile = s.getBaseFile();
    // 生成CompactionOperation
                                                                           return new CompactionOperation(dataFile, partitionPath, logFiles,
                                                                                                          writeConfig.getCompactionStrategy().captureMetrics(writeConfig, s));
                                                                       }), partitionPaths.size()).stream()
        .map(CompactionUtils::buildHoodieCompactionOperation).collect(toList());

    LOG.info("Total of " + operations.size() + " compaction operations are retrieved");
    LOG.info("Total number of latest files slices " + totalFileSlices.value());
    LOG.info("Total number of log files " + totalLogFiles.value());
    LOG.info("Total number of file slices " + totalFileSlices.value());

    if (operations.isEmpty()) {
        LOG.warn("No operations are retrieved for " + metaClient.getBasePath());
        return null;
    }

    // Filter the compactions with the passed in filter. This lets us choose most effective compactions only
    // 生成compaction计划
    HoodieCompactionPlan compactionPlan = getCompactionPlan(metaClient, operations);
    ValidationUtils.checkArgument(
        compactionPlan.getOperations().stream().noneMatch(
            op -> fgIdsInPendingCompactionAndClustering.contains(new HoodieFileGroupId(op.getPartitionPath(), op.getFileId()))),
        "Bad Compaction Plan. FileId MUST NOT have multiple pending compactions. "
        + "Please fix your strategy implementation. FileIdsWithPendingCompactions :" + fgIdsInPendingCompactionAndClustering
        + ", Selected workload :" + compactionPlan);
    if (compactionPlan.getOperations().isEmpty()) {
        LOG.warn("After filtering, Nothing to compact for " + metaClient.getBasePath());
    }
    return compactionPlan;
}

以上是生成compaction plan的全过程。下面我们分析compaction的执行过程。

Compact执行过程

入口调用链

入口从BaseHoodieWriteClient::compact开始。一路涉及到的逻辑不多。这里列出调用链。

public HoodieWriteMetadata<O> compact(String compactionInstantTime) {
    if (shouldDelegateToTableServiceManager(config, ActionType.compaction)) {
        throw new UnsupportedOperationException("Compaction should be delegated to table service manager instead of direct run.");
    }
    return compact(compactionInstantTime, config.shouldAutoCommit());
}

HoodieFlinkMergeOnReadTable::compact方法:

@Override
public HoodieWriteMetadata<List<WriteStatus>> compact(
    HoodieEngineContext context, String compactionInstantTime) {
    RunCompactionActionExecutor compactionExecutor = new RunCompactionActionExecutor(
        context, config, this, compactionInstantTime, new HoodieFlinkMergeOnReadTableCompactor(),
        new HoodieFlinkCopyOnWriteTable(config, context, getMetaClient()), WriteOperationType.COMPACT);
    return convertMetadata(compactionExecutor.execute());
}

HoodieFlinkWriteClient::compact方法:

@Override
protected HoodieWriteMetadata<List<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
    // only used for metadata table, the compaction happens in single thread
    return tableServiceClient.compact(compactionInstantTime, shouldComplete);
}

HoodieFlinkTableServiceClient::compact方法在执行完压缩之后调用commitCompaction,将compaction instant从inflight状态转换为completed。

@Override
protected HoodieWriteMetadata<List<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
    // only used for metadata table, the compaction happens in single thread
    HoodieWriteMetadata<List<WriteStatus>> compactionMetadata = getHoodieTable().compact(context, compactionInstantTime);
    commitCompaction(compactionInstantTime, compactionMetadata.getCommitMetadata().get(), Option.empty());
    return compactionMetadata;
}

HoodieFlinkMergeOnReadTable::compact方法:

@Override
public HoodieWriteMetadata<List<WriteStatus>> compact(
    HoodieEngineContext context, String compactionInstantTime) {
    RunCompactionActionExecutor compactionExecutor = new RunCompactionActionExecutor(
        context, config, this, compactionInstantTime, new HoodieFlinkMergeOnReadTableCompactor(),
        new HoodieFlinkCopyOnWriteTable(config, context, getMetaClient()), WriteOperationType.COMPACT);
    return convertMetadata(compactionExecutor.execute());
}

最终到达compaction执行器RunCompactionActionExecutor,接下来详细展开分析。

RunCompactionActionExecutor

RunCompactionActionExecutor负责执行compaction计划。

execute方法内容如下:

@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
    // 获取pending的compaction timeline
    HoodieTimeline pendingMajorOrMinorCompactionTimeline = WriteOperationType.COMPACT.equals(operationType)
        ? table.getActiveTimeline().filterPendingCompactionTimeline()
        : table.getActiveTimeline().filterPendingLogCompactionTimeline();
    // 回滚其中inflight状态的compaction
    compactor.preCompact(table, pendingMajorOrMinorCompactionTimeline, this.operationType, instantTime);

    HoodieWriteMetadata<HoodieData<WriteStatus>> compactionMetadata = new HoodieWriteMetadata<>();
    try {
        // generate compaction plan
        // should support configurable commit metadata
        // 获取前面生成的compaction计划
        HoodieCompactionPlan compactionPlan = operationType.equals(WriteOperationType.COMPACT)
            ? CompactionUtils.getCompactionPlan(table.getMetaClient(), instantTime)
            : CompactionUtils.getLogCompactionPlan(table.getMetaClient(), instantTime);

        // try to load internalSchema to support schema Evolution
        // 读取schema
        HoodieWriteConfig configCopy = config;
        Pair<Option<String>, Option<String>> schemaPair = InternalSchemaCache
            .getInternalSchemaAndAvroSchemaForClusteringAndCompaction(table.getMetaClient(), instantTime);
        if (schemaPair.getLeft().isPresent() && schemaPair.getRight().isPresent()) {
            // should not influence the original config, just copy it
            configCopy = HoodieWriteConfig.newBuilder().withProperties(config.getProps()).build();
            configCopy.setInternalSchemaString(schemaPair.getLeft().get());
            configCopy.setSchema(schemaPair.getRight().get());
        }

        // 使用HoodieCompactor执行压缩过程,后面分析
        HoodieData<WriteStatus> statuses = compactor.compact(
            context, compactionPlan, table, configCopy, instantTime, compactionHandler);

        // 对于Flink,无需持久化写入状态
        compactor.maybePersist(statuses, config);
        context.setJobStatus(this.getClass().getSimpleName(), "Preparing compaction metadata: " + config.getTableName());
        List<HoodieWriteStat> updateStatusMap = statuses.map(WriteStatus::getStat).collectAsList();
        HoodieCommitMetadata metadata = new HoodieCommitMetadata(true);
        for (HoodieWriteStat stat : updateStatusMap) {
            metadata.addWriteStat(stat.getPartitionPath(), stat);
        }
        metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema());
        if (schemaPair.getLeft().isPresent()) {
            metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, schemaPair.getLeft().get());
            metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schemaPair.getRight().get());
        }
        metadata.setOperationType(operationType);
        compactionMetadata.setWriteStatuses(statuses);
        compactionMetadata.setCommitted(false);
        compactionMetadata.setCommitMetadata(Option.of(metadata));
    } catch (Exception e) {
        throw new HoodieCompactionException("Could not compact " + config.getBasePath(), e);
    }

    return compactionMetadata;
}

执行压缩的步骤交给HoodieCompactor完成。

HoodieCompactor

HoodieCompactorcompact方法了实现MOR表的压缩逻辑。该方法将compaction instant从requested转换为inlflight状态,然后执行compaction计划。

public HoodieData<WriteStatus> compact(
    HoodieEngineContext context, HoodieCompactionPlan compactionPlan,
    HoodieTable table, HoodieWriteConfig config, String compactionInstantTime,
    HoodieCompactionHandler compactionHandler) {
    // 没有压缩计划?返回
    if (compactionPlan == null || (compactionPlan.getOperations() == null)
        || (compactionPlan.getOperations().isEmpty())) {
        return context.emptyHoodieData();
    }
    CompactionExecutionHelper executionHelper = getCompactionExecutionStrategy(compactionPlan);

    // Transition requested to inflight file.
    // 将compaction instant状态转换为inflight
    executionHelper.transitionRequestedToInflight(table, compactionInstantTime);
    table.getMetaClient().reloadActiveTimeline();

    HoodieTableMetaClient metaClient = table.getMetaClient();
    TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);

    // Here we firstly use the table schema as the reader schema to read
    // log file.That is because in the case of MergeInto, the config.getSchema may not
    // the same with the table schema.
    // 获取table schema,用来读取log内的数据
    try {
        if (StringUtils.isNullOrEmpty(config.getInternalSchema())) {
            Schema readerSchema = schemaResolver.getTableAvroSchema(false);
            config.setSchema(readerSchema.toString());
        }
    } catch (Exception e) {
        // If there is no commit in the table, just ignore the exception.
    }

    // Compacting is very similar to applying updates to existing file
    List<CompactionOperation> operations = compactionPlan.getOperations().stream()
        .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
    LOG.info("Compactor compacting " + operations + " files");

    String maxInstantTime = getMaxInstantTime(metaClient);

    context.setJobStatus(this.getClass().getSimpleName(), "Compacting file slices: " + config.getTableName());
    TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier();
    // 遍历CompactionOperation,执行compact方法
    return context.parallelize(operations).map(operation -> compact(
        compactionHandler, metaClient, config, operation, compactionInstantTime, maxInstantTime, taskContextSupplier, executionHelper))
        .flatMap(List::iterator);
}

compact扫描compaction plan中需要合并的log,和base file执行数据合并。方法内容如下:

/**
   * Execute a single compaction operation and report back status.
   */
public List<WriteStatus> compact(HoodieCompactionHandler compactionHandler,
                                 HoodieTableMetaClient metaClient,
                                 HoodieWriteConfig config,
                                 CompactionOperation operation,
                                 String instantTime,
                                 String maxInstantTime,
                                 TaskContextSupplier taskContextSupplier,
                                 CompactionExecutionHelper executionHelper) throws IOException {
    // 获取表所在的文件系统
    FileSystem fs = metaClient.getFs();
    Schema readerSchema;
    Option<InternalSchema> internalSchemaOption = Option.empty();
    // 获取最新的表schema(schema evolution)
    if (!StringUtils.isNullOrEmpty(config.getInternalSchema())) {
        readerSchema = new Schema.Parser().parse(config.getSchema());
        internalSchemaOption = SerDeHelper.fromJson(config.getInternalSchema());
        // its safe to modify config here, since we running in task side.
        // 配置compactionHandler,实际类型为HoodieTable
        ((HoodieTable) compactionHandler).getConfig().setDefault(config);
    } else {
        // 获取write schema
        readerSchema = HoodieAvroUtils.addMetadataFields(
            new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());
    }
    LOG.info("Compaction operation started for base file: " + operation.getDataFileName() + " and delta files: " + operation.getDeltaFileNames()
             + " for commit " + instantTime);
    // TODO - FIX THIS
    // Reads the entire avro file. Always only specific blocks should be read from the avro file
    // (failure recover).
    // Load all the delta commits since the last compaction commit and get all the blocks to be
    // loaded and load it using CompositeAvroLogReader
    // Since a DeltaCommit is not defined yet, reading all the records. revisit this soon.

    // 获取compaction可用的最大内存数
    // 如果配置了hoodie.memory.compaction.max.size,使用该值
    // 如果没有,使用hoodie.memory.compaction.fraction
    long maxMemoryPerCompaction = IOUtils.getMaxMemoryPerCompaction(taskContextSupplier, config);
    LOG.info("MaxMemoryPerCompaction => " + maxMemoryPerCompaction);

    // 获取需要压缩的log文件全路径
    List<String> logFiles = operation.getDeltaFileNames().stream().map(
        p -> new Path(FSUtils.getPartitionPath(metaClient.getBasePath(), operation.getPartitionPath()), p).toString())
        .collect(toList());
    // 构造log数据合并扫描器
    HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
        // 配置文件系统(例如HDFS等)
        .withFileSystem(fs)
        // 配置table的base path
        .withBasePath(metaClient.getBasePath())
        // 配置需要压缩的log文件路径
        .withLogFilePaths(logFiles)
        // 配置schema
        .withReaderSchema(readerSchema)
        // 配置最近的instant time,为commit, delta commit和rollback commit中时间最晚的completed状态的instant time
        // log文件中在这个instant time之后的log block不会被扫描到
        .withLatestInstantTime(executionHelper.instantTimeToUseForScanning(instantTime, maxInstantTime))
        // 配置internal schema
        .withInternalSchema(internalSchemaOption.orElse(InternalSchema.getEmptyInternalSchema()))
        // 配置压缩可用的最大内存
        // scanner缓存了扫描到的HoodieRecord
        // 如果超过内存限制,会spill到磁盘上
        .withMaxMemorySizeInBytes(maxMemoryPerCompaction)
        // 使用启用log block lazy read,默认启用,会减少内存消耗
        // 对应配置项为hoodie.compaction.lazy.block.read
        .withReadBlocksLazily(config.getCompactionLazyBlockReadEnabled())
        // 是从头到尾读取log file还是反过来,默认是正向
        // 对应配置项为hoodie.compaction.reverse.log.read
        .withReverseReader(config.getCompactionReverseLogReadEnabled())
        // 读取文件系统的buffer size,对应配置项hoodie.memory.dfs.buffer.max.size
        // 默认值为16 * 1024 * 1024
        .withBufferSize(config.getMaxDFSStreamBufferSize())
        // 前面提到的spill路径配置
        // 对应配置项为hoodie.memory.spillable.map.path,默认值是/tmp
        .withSpillableMapBasePath(config.getSpillableMapBasePath())
        // 溢写磁盘的数据组织形式
        // 对应配置项hoodie.common.spillable.diskmap.type。默认为BITCASK
        .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
        // bitcask形式是否开启数据压缩
        // 对应配置项hoodie.common.diskmap.compression.enabled。默认为true
        .withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
        // 设置是否在meta字段中包含_hoodie_operation
        // 对应配置项hoodie.allow.operation.metadata.field,默认为false
        .withOperationField(config.allowOperationMetadataField())
        // 设置分区
        .withPartition(operation.getPartitionPath())
        // 是否启用优化log block扫描
        // 对应配置项hoodie.optimized.log.blocks.scan.enable。默认为false
        .withOptimizedLogBlocksScan(executionHelper.enableOptimizedLogBlockScan(config))
        // 配置record合并器
        // 根据配置的合并器class和支持的合并策略筛选出可用的合并器
        // 默认为HoodieAvroRecordMerger
        // 通过hoodie.datasource.write.record.merger.impls配置合并器class
        // 通过hoodie.datasource.write.record.merger.strategy配置合并策略
        .withRecordMerger(config.getRecordMerger())
        .build();

    // 获取现有的base file
    Option<HoodieBaseFile> oldDataFileOpt =
        operation.getBaseFile(metaClient.getBasePath(), operation.getPartitionPath());

    // Considering following scenario: if all log blocks in this fileSlice is rollback, it returns an empty scanner.
    // But in this case, we need to give it a base file. Otherwise, it will lose base file in following fileSlice.
    if (!scanner.iterator().hasNext()) {
        // 如果没有log file
        // file slice回滚的时候可能会发生这种情况
        if (!oldDataFileOpt.isPresent()) {
            // 关闭scanner
            // 写入状态返回空集合
            scanner.close();
            return new ArrayList<>();
        } else {
            // TODO: we may directly rename original parquet file if there is not evolution/devolution of schema
            /*
        TaskContextSupplier taskContextSupplier = hoodieCopyOnWriteTable.getTaskContextSupplier();
        String newFileName = FSUtils.makeDataFileName(instantTime,
            FSUtils.makeWriteToken(taskContextSupplier.getPartitionIdSupplier().get(), taskContextSupplier.getStageIdSupplier().get(), taskContextSupplier.getAttemptIdSupplier().get()),
            operation.getFileId(), hoodieCopyOnWriteTable.getBaseFileExtension());
        Path oldFilePath = new Path(oldDataFileOpt.get().getPath());
        Path newFilePath = new Path(oldFilePath.getParent(), newFileName);
        FileUtil.copy(fs,oldFilePath, fs, newFilePath, false, fs.getConf());
        */
        }
    }

    // Compacting is very similar to applying updates to existing file
    Iterator<List<WriteStatus>> result;
    // 执行合并操作,和数据的更新和插入操作非常相似
    // 如果base file存在,执行数据更新
    // 如果base file不存在,写入数据到新的base file
    // 对于Flink而言这里调用的是HoodieFlinkCopyOnWrite的handleUpdate或HandleInsert方法
    result = executionHelper.writeFileAndGetWriteStats(compactionHandler, operation, instantTime, scanner, oldDataFileOpt);
    // 关闭scanner
    scanner.close();
    // 组装写入状态并返回
    Iterable<List<WriteStatus>> resultIterable = () -> result;
    return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream).peek(s -> {
        s.getStat().setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog());
        s.getStat().setTotalLogFilesCompacted(scanner.getTotalLogFiles());
        s.getStat().setTotalLogRecords(scanner.getTotalLogRecords());
        s.getStat().setPartitionPath(operation.getPartitionPath());
        s.getStat()
            .setTotalLogSizeCompacted(operation.getMetrics().get(CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue());
        s.getStat().setTotalLogBlocks(scanner.getTotalLogBlocks());
        s.getStat().setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks());
        s.getStat().setTotalRollbackBlocks(scanner.getTotalRollbacks());
        RuntimeStats runtimeStats = new RuntimeStats();
        runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks());
        s.getStat().setRuntimeStats(runtimeStats);
    }).collect(toList());
}

这里的数据更新和插入操作可参考:Hudi 源码之数据写入逻辑的BaseFlinkCommitActionExecutor 章节。

到这里compaction的逻辑已经分析完毕。但是需要回过头来关注下HoodieMergedLogRecordScanner

HoodieMergedLogRecordScanner默认情况在创建的时候执行full scan(全扫描)。读取所有log文件的record并放入缓存。

HoodieMergedLogRecordScanner在扫描并缓存log文件的时候会合并log文件中的重复数据。接下来我们分析合并重复数据的相关逻辑。

private HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
                                     String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily,
                                     boolean reverseReader, int bufferSize, String spillableMapBasePath,
                                     Option<InstantRange> instantRange,
                                     ExternalSpillableMap.DiskMapType diskMapType,
                                     boolean isBitCaskDiskMapCompressionEnabled,
                                     boolean withOperationField, boolean forceFullScan,
                                     Option<String> partitionName,
                                     InternalSchema internalSchema,
                                     Option<String> keyFieldOverride,
                                     boolean enableOptimizedLogBlocksScan, HoodieRecordMerger recordMerger) {
    super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize,
          instantRange, withOperationField, forceFullScan, partitionName, internalSchema, keyFieldOverride, enableOptimizedLogBlocksScan, recordMerger);
    try {
        this.maxMemorySizeInBytes = maxMemorySizeInBytes;
        // Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
        this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
                                                  new HoodieRecordSizeEstimator(readerSchema), diskMapType, isBitCaskDiskMapCompressionEnabled);
        this.scannedPrefixes = new HashSet<>();
    } catch (IOException e) {
        throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e);
    }
    // 默认需要执行完整扫描
    // 扫描合并所有待压缩log文件中的record,保存在缓存中
    if (forceFullScan) {
        performScan();
    }
}

performScan方法调用了scanInternal。我们查看它的代码:

protected final void scanInternal(Option<KeySpec> keySpecOpt, boolean skipProcessingBlocks) {
    synchronized (this) {
        // 是否启用多writer优化,默认未开启
        if (enableOptimizedLogBlocksScan) {
            scanInternalV2(keySpecOpt, skipProcessingBlocks);
        } else {
            scanInternalV1(keySpecOpt);
        }
    }
}

我们分析默认的scanInternalV1方法。它扫描所有正常的log block,如果该log block是data block,调用processQueuedBlocksForInstant方法,将此log block中的所有record写入缓存,同时重复数据与缓存中的合并。

private void scanInternalV1(Option<KeySpec> keySpecOpt) {
    currentInstantLogBlocks = new ArrayDeque<>();
    // ...
    try {
        // Iterate over the paths
        logFormatReaderWrapper = new HoodieLogFormatReader(fs,
                                                           logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()),
                                                           readerSchema, readBlocksLazily, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema);

        Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
        while (logFormatReaderWrapper.hasNext()) {
            HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
            LOG.info("Scanning log file " + logFile);
            scannedLogFiles.add(logFile);
            totalLogFiles.set(scannedLogFiles.size());
            // Use the HoodieLogFileReader to iterate through the blocks in the log file
            HoodieLogBlock logBlock = logFormatReaderWrapper.next();
            final String instantTime = logBlock.getLogBlockHeader().get(INSTANT_TIME);
            totalLogBlocks.incrementAndGet();
            // 不处理写入失败和corrupt的block
            if (logBlock.getBlockType() != CORRUPT_BLOCK
                && !HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME), HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime
                                                    )) {
                // hit a block with instant time greater than should be processed, stop processing further
                break;
            }
            if (logBlock.getBlockType() != CORRUPT_BLOCK && logBlock.getBlockType() != COMMAND_BLOCK) {
                if (!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
                    || inflightInstantsTimeline.containsInstant(instantTime)) {
                    // hit an uncommitted block possibly from a failed write, move to the next one and skip processing this one
                    continue;
                }
                if (instantRange.isPresent() && !instantRange.get().isInRange(instantTime)) {
                    // filter the log block by instant range
                    continue;
                }
            }
            switch (logBlock.getBlockType()) {
                case HFILE_DATA_BLOCK:
                case AVRO_DATA_BLOCK:
                case PARQUET_DATA_BLOCK:
                    LOG.info("Reading a data block from file " + logFile.getPath() + " at instant "
                             + logBlock.getLogBlockHeader().get(INSTANT_TIME));
                    // 对于数据类型的block,如果还没被处理过
                    if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
                        // If this is an avro data block belonging to a different commit/instant,
                        // then merge the last blocks and records into the main result
                        // 将该log block中的record读入缓存,和缓存中的重复数据合并
                        processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
                    }
                    // store the current block
                    currentInstantLogBlocks.push(logBlock);
                    break;
// ...
            }
        }
        // merge the last read block when all the blocks are done reading
        // 合并最后一次读入的数据
        if (!currentInstantLogBlocks.isEmpty()) {
            LOG.info("Merging the final data blocks");
            processQueuedBlocksForInstant(currentInstantLogBlocks, scannedLogFiles.size(), keySpecOpt);
        }
        // Done
        progress = 1.0f;
    } catch (IOException e) {
        LOG.error("Got IOException when reading log file", e);
        throw new HoodieIOException("IOException when reading log file ", e);
    } catch (Exception e) {
        LOG.error("Got exception when reading log file", e);
        throw new HoodieException("Exception when reading log file ", e);
    } finally {
        try {
            if (null != logFormatReaderWrapper) {
                logFormatReaderWrapper.close();
            }
        } catch (IOException ioe) {
            // Eat exception as we do not want to mask the original exception that can happen
            LOG.error("Unable to close log format reader", ioe);
        }
    }
}

processQueuedBlocksForInstant方法又调用了processDataBlock方法,然后是processDataBlock

private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> logBlocks, int numLogFilesSeen,
                                           Option<KeySpec> keySpecOpt) throws Exception {
    while (!logBlocks.isEmpty()) {
        LOG.info("Number of remaining logblocks to merge " + logBlocks.size());
        // poll the element at the bottom of the stack since that's the order it was inserted
        HoodieLogBlock lastBlock = logBlocks.pollLast();
        switch (lastBlock.getBlockType()) {
            case AVRO_DATA_BLOCK:
            case HFILE_DATA_BLOCK:
            case PARQUET_DATA_BLOCK:
                processDataBlock((HoodieDataBlock) lastBlock, keySpecOpt);
                break;
            case DELETE_BLOCK:
                Arrays.stream(((HoodieDeleteBlock) lastBlock).getRecordsToDelete()).forEach(this::processNextDeletedRecord);
                break;
            case CORRUPT_BLOCK:
                LOG.warn("Found a corrupt block which was not rolled back");
                break;
            default:
                break;
        }
    }
    // At this step the lastBlocks are consumed. We track approximate progress by number of log-files seen
    progress = (numLogFilesSeen - 1) / logFilePaths.size();
}

private void processDataBlock(HoodieDataBlock dataBlock, Option<KeySpec> keySpecOpt) throws Exception {
    checkState(partitionNameOverrideOpt.isPresent() || partitionPathFieldOpt.isPresent(),
               "Either partition-name override or partition-path field had to be present");

    Option<Pair<String, String>> recordKeyPartitionPathFieldPair = populateMetaFields
        ? Option.empty()
        : Option.of(Pair.of(recordKeyField, partitionPathFieldOpt.orElse(null)));

    try (ClosableIteratorWithSchema<HoodieRecord> recordIterator = getRecordsIterator(dataBlock, keySpecOpt)) {
        while (recordIterator.hasNext()) {
            HoodieRecord completedRecord = recordIterator.next()
                .wrapIntoHoodieRecordPayloadWithParams(recordIterator.getSchema(),
                                                       hoodieTableMetaClient.getTableConfig().getProps(),
                                                       recordKeyPartitionPathFieldPair,
                                                       this.withOperationField,
                                                       this.partitionNameOverrideOpt,
                                                       populateMetaFields);
            processNextRecord(completedRecord);
            totalLogRecords.incrementAndGet();
        }
    }
}

最终到了HoodieMergedLogRecordScanner::processNextRecord方法。此方法将record放入缓存。如果缓存中数据有重复,使用recordMerger合并重复数据。RecordMerger合并数据的分析可以参考:Hudi 源码之数据写入逻辑的HoodieAvroRecordMerger::merge 合并数据 章节。

@Override
protected <T> void processNextRecord(HoodieRecord<T> newRecord) throws IOException {
    // 获取record key
    String key = newRecord.getRecordKey();
    // 从缓存中取出key相同的老数据。
    HoodieRecord<T> prevRecord = records.get(key);
    if (prevRecord != null) {
        // Merge and store the combined record
        // 如果存在老数据,合并之
        // 可以认为是新数据中的字段值替换老数据的字段值
        HoodieRecord<T> combinedRecord = (HoodieRecord<T>) recordMerger.merge(prevRecord, readerSchema,
                                                                              newRecord, readerSchema, this.getPayloadProps()).get().getLeft();
        // If pre-combine returns existing record, no need to update it
        if (combinedRecord.getData() != prevRecord.getData()) {
            // 如果合并后的数据确实有改动,需要更新这条数据
            HoodieRecord latestHoodieRecord =
                combinedRecord.newInstance(new HoodieKey(key, newRecord.getPartitionPath()), newRecord.getOperation());

            latestHoodieRecord.unseal();
            latestHoodieRecord.setCurrentLocation(newRecord.getCurrentLocation());
            latestHoodieRecord.seal();

            // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
            //       payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
            //       it since these records will be put into records(Map).
            records.put(key, latestHoodieRecord.copy());
        }
    } else {
        // Put the record as is
        // NOTE: Record have to be cloned here to make sure if it holds low-level engine-specific
        //       payload pointing into a shared, mutable (underlying) buffer we get a clean copy of
        //       it since these records will be put into records(Map).
        // 如果缓存中没有找到老数据,直接放入缓存
        records.put(key, newRecord.copy());
    }
}

Inline Compaction

HoodieFlinkWriteClient方法commit的时候会调用commitStats。这个方法逐个检查是否触发运行table service的条件,如果满足条件运行对应的table service。这些table service包含compaction。

public boolean commit(String instantTime, List<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata,
                      String commitActionType, Map<String, List<String>> partitionToReplacedFileIds,
                      Option<BiConsumer<HoodieTableMetaClient, HoodieCommitMetadata>> extraPreCommitFunc) {
    List<HoodieWriteStat> writeStats = writeStatuses.parallelStream().map(WriteStatus::getStat).collect(Collectors.toList());
    // for eager flush, multiple write stat may share one file path.
    List<HoodieWriteStat> merged = writeStats.stream()
        .collect(Collectors.groupingBy(writeStat -> writeStat.getPartitionPath() + writeStat.getPath()))
        .values().stream()
        .map(duplicates -> duplicates.stream().reduce(WriteStatMerger::merge).get())
        .collect(Collectors.toList());
    return commitStats(instantTime, merged, extraMetadata, commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
}

BaseHoodieWriteClient::commitStat方法调用了runTableServicesInline方法,一路追踪到BaseHoodieTableServiceClient::runTableServicesInline方法。这个方法中涉及inline compaction的配置有3个:

  • hoodie.compact.inline:每次写入之后是否触发compaction。默认为false,启用会增大写入延迟。
  • hoodie.compact.schedule.inline:每次写入之后是否会schedule compaction。只是尝试schedule,前面提到的compaction条件仍然是有效的。默认为false。
  • hoodie.log.compaction.inline:每次写入之后是否会触发log compaction。默认为false,启用会增大写入延迟。
protected void runTableServicesInline(HoodieTable table, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
    // 如果没有启用table service,不执行
    if (!tableServicesEnabled(config)) {
        return;
    }

    // ...
    // Do an inline compaction if enabled
    // 如果启用的inline compaction
    // hoodie.compact.inline
    if (config.inlineCompactionEnabled()) {
        metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");
        // 尝试schedule然后执行compaction
        inlineCompaction(table, extraMetadata);
    } else {
        metadata.addMetadata(HoodieCompactionConfig.INLINE_COMPACT.key(), "false");
    }

    // if just inline schedule is enabled
    // 如果只是inline schedule
    // hoodie.compact.schedule.inline
    if (!config.inlineCompactionEnabled() && config.scheduleInlineCompaction()
        && table.getActiveTimeline().getWriteTimeline().filterPendingCompactionTimeline().empty()) {
        // proceed only if there are no pending compactions
        metadata.addMetadata(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key(), "true");
        // schedule一个compaction
        inlineScheduleCompaction(extraMetadata);
    }

    // Do an inline log compaction if enabled
    // 如果启用了inline log compaction
    // hoodie.log.compaction.inline
    if (config.inlineLogCompactionEnabled()) {
        runAnyPendingLogCompactions(table);
        metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "true");
        inlineLogCompact(extraMetadata);
    } else {
        metadata.addMetadata(HoodieCompactionConfig.INLINE_LOG_COMPACT.key(), "false");
    }

    // ...
}

参考文献

https://mp.weixin.qq.com/s/C5Nttr9QPmgVNyQFAO4J_Q

相关文章

网友评论

    本文标题:Hudi Compaction使用和源码分析

    本文链接:https://www.haomeiwen.com/subject/thspvdtx.html