美文网首页Java玩转大数据
Hudi 源码之 Cleaning service

Hudi 源码之 Cleaning service

作者: AlienPaul | 来源:发表于2023-08-30 18:37 被阅读0次

    Clean的概念

    Hudi表拥有时间线(Timeline)。可以理解为Hudi表的修改日志。Hudi不仅记录了什么时候发生了何种类型修改,还记录了这次修改对应的数据文件。这些数据文件可能保存了过期的数据,称之为历史数据。不难理解,表在长时间操作之后会生成大量的历史数据。这些历史数据占据了大量的存储空间。Clean就是为了解决历史数据占用过大存储空间的问题。Clean操作能够根据配置好的清理策略,按要求清除掉历史数据文件,释放空间减小磁盘压力。

    Clean的配置和操作方式

    Hudi Clean的核心配置项为清理策略hoodie.cleaner.policy。它有下面3个值:

    • KEEP_LATEST_COMMITS。此项为默认值。保留最近的N个commit。默认N为10。可以通过配置项hoodie.cleaner.commits.retained修改N的值。
    • KEEP_LATEST_BY_HOURS。保留最近N小时内的commit。默认N为24。可以通过配置项hoodie.cleaner.hours.retained修改N的值。
    • KEEP_LATEST_FILE_VERSIONS。保留最近的N个版本。默认N为3。可以通过配置项hoodie.cleaner.fileversions.retained修改N的值。

    除此之外相关的配置项还有:

    • hoodie.clean.automatic。每次commit之后是否会自动clean。默认为true。
    • hoodie.clean.async。异步clean。要求hoodie.clean.automatic开启的时候才能够启用。允许写入和clean同时进行。默认为false。
    • hoodie.clean.trigger.strategy。Clean的plan策略。依照什么标准schedule下一次clean。目前只有一个值NUM_COMMITS,即根据提交数。
    • hoodie.clean.max.commits。上次clean之后又创建多少个commit,会schedule下一次clean操作。默认为1。
    • hoodie.cleaner.incremental.mode。增量clean模式,默认启用。开启之后每次plan clean操作的时候,只需要计算上次clean后保留的最近的instant和下次clean需要保留到最近的instant之间的instant对应的文件。可减少clean plan阶段的耗时。
    • hoodie.cleaner.policy.failed.writes。Clean对待之前失败写入的策略。默认是EAGER。EAGER支持单写入的时候,每次写入之前查找并回滚失败的写入。LAZY模式支持多写的情况下,在clean的时候回滚失败的写入。
    • hoodie.cleaner.parallelism。Clean操作的并行度。默认为200。对Spark有效。一个分区的清理任务会分配给一个Spark任务。
    • hoodie.clean.allow.multiple。是否允许schedule或执行多个clean。默认为true。

    Clean默认是随着数据的写入操作自动进行。也可以使用专门的Spark任务离线执行。使用方式可参考官网:https://hudi.apache.org/cn/docs/hoodie_cleaner/#run-independently

    例如:

    spark-submit --master local --class org.apache.hudi.utilities.HoodieCleaner `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar`\
      --target-base-path /path/to/hoodie_table \
      --hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_COMMITS \
      --hoodie-conf hoodie.cleaner.commits.retained=10 \
      --hoodie-conf hoodie.cleaner.parallelism=200
    

    运行cleaner,保留最近10次commit。

    spark-submit --master local --class org.apache.hudi.utilities.HoodieCleaner `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar`\
      --target-base-path /path/to/hoodie_table \
      --hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_FILE_VERSIONS \
      --hoodie-conf hoodie.cleaner.fileversions.retained=3 \
      --hoodie-conf hoodie.cleaner.parallelism=200
    

    运行cleaner,保留最近3个版本。

    spark-submit --master local --class org.apache.hudi.utilities.HoodieCleaner `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar`\
      --target-base-path /path/to/hoodie_table \
      --hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_BY_HOURS \
      --hoodie-conf hoodie.cleaner.hours.retained=24 \
      --hoodie-conf hoodie.cleaner.parallelism=200
    

    运行cleaner,保留最近24小时内的commit。

    Clean源代码分析

    BaseHoodieTableServiceClient::clean

    clean的入口位于BaseHoodieTableServiceClient::clean。内容如下:

    /**
       * Clean up any stale/old files/data lying around (either on file storage or index storage) based on the
       * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be
       * cleaned). This API provides the flexibility to schedule clean instant asynchronously via
       * {@link BaseHoodieTableServiceClient#scheduleTableService(String, Option, TableServiceType)} and disable inline scheduling
       * of clean.
       *
       * @param cleanInstantTime instant time for clean.
       * @param scheduleInline   true if needs to be scheduled inline. false otherwise.
       */
    @Nullable
    public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline) throws HoodieIOException {
        // 检查hoodie.table.services.enabled配置项必须启用
        // 该配置项控制着所有table service是否启用。例如archive, clean, compact和cluster
        if (!tableServicesEnabled(config)) {
            return null;
        }
        // 获取clean的定时器context,监控相关
        final Timer.Context timerContext = metrics.getCleanCtx();
        // 将失败的写入回滚
        CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
                                          HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites());
    
        // 创建hudi table,根据引擎(Spark/Flink)和表类型(MOR/COW)的不同,有多种实现类    
        HoodieTable table = createTable(config, hadoopConf);
        // 是否允许多次clean(即上次clean还未完成的时候,还能够再schedule一次clean),对应配置项hoodie.clean.allow.multiple
        // 或者timeline中不存在inflight和requested状态的instant的时候,执行这一段
        if (config.allowMultipleCleans() || !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) {
            LOG.info("Cleaner started");
            // proceed only if multiple clean schedules are enabled or if there are no pending cleans.
            // 如果需要schedule clean操作
            if (scheduleInline) {
                // schedule clean操作
                // 写入一个clean类型,状态为requested的instant
                scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
                // 重新载入timeline
                table.getMetaClient().reloadActiveTimeline();
            }
            // clean操作不支持代理到table service manager
            if (shouldDelegateToTableServiceManager(config, ActionType.clean)) {
                LOG.warn("Cleaning is not yet supported with Table Service Manager.");
                return null;
            }
        }
    
        // Proceeds to execute any requested or inflight clean instances in the timeline
        // 执行timeline中的状态为requested或者inflight的clean操作
        HoodieCleanMetadata metadata = table.clean(context, cleanInstantTime);
        // 记录clean耗时和总计删除的文件数等
        if (timerContext != null && metadata != null) {
            long durationMs = metrics.getDurationInMs(timerContext.stop());
            metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
            LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
                     + " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
                     + " cleanerElapsedMs" + durationMs);
        }
        return metadata;
    }
    

    创建cleaner plan和clean的具体处理逻辑位于table.clean方法。其中Flink的实现位于HoodieFlinkCopyOnWriteTable中,Spark的实现位于HoodieSparkCopyOnWriteTable中。下面以Flink为例分析创建clean计划和执行clean计划的逻辑。

    Flink创建cleaner plan的逻辑

    FlinkFlink创建cleaner plan的逻辑位于HoodieFlinkCopyOnWriteTable。代码如下:

    @Override
    public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) {
        return new CleanPlanActionExecutor(context, config, this, instantTime, extraMetadata).execute();
    }
    

    Plan的操作由CleanPlanActionExecutor完成。接下来对这个类展开分析。

    CleanPlanActionExecutor

    CleanPlanActionExecutor的入口方法为execute(执行)。

    CleanPlanActionExecutorexecute方法内容如下:

    @Override
    public Option<HoodieCleanerPlan> execute() {
        // 判断是否需要clean
        // 如果上次clean之后commit的次数大于等于hoodie.clean.max.commits,需要clean
        if (!needsCleaning(config.getCleaningTriggerStrategy())) {
            return Option.empty();
        }
        // Plan a new clean action
        // 计划新的clean操作
        return requestClean(instantTime);
    }
    

    execute方法首先判断是否需要clean。如果需要,执行requestClean方法生成Hudi clean计划。内容如下:

    protected Option<HoodieCleanerPlan> requestClean(String startCleanTime) {
        // 创建cleaner plan,这里是重点,后面分析
        final HoodieCleanerPlan cleanerPlan = requestClean(context);
        Option<HoodieCleanerPlan> option = Option.empty();
        // 如果plan中需要删除的文件不为空
        if (nonEmpty(cleanerPlan.getFilePathsToBeDeletedPerPartition())
            && cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0) {
            // Only create cleaner plan which does some work
            // 创建clean类型的instant
            final HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime);
            // Save to both aux and timeline folder
            try {
                // 创建requested clean instant文件(.hoodie目录中)
                table.getActiveTimeline().saveToCleanRequested(cleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
                LOG.info("Requesting Cleaning with instant time " + cleanInstant);
            } catch (IOException e) {
                LOG.error("Got exception when saving cleaner requested file", e);
                throw new HoodieIOException(e.getMessage(), e);
            }
            option = Option.of(cleanerPlan);
        }
    
        return option;
    }
    

    它又调用了requestClean重载方法。这个方法生成需要删除的文件列表。

    HoodieCleanerPlan requestClean(HoodieEngineContext context) {
        try {
            // 生成clean计划由CleanPlanner负责
            CleanPlanner<T, I, K, O> planner = new CleanPlanner<>(context, table, config);
            // 根据配置的clean策略,获取需要最早保留的instant(之前的会被清理掉)
            Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain();
            context.setJobStatus(this.getClass().getSimpleName(), "Obtaining list of partitions to be cleaned: " + config.getTableName());
            // 获取需要清理的文件对应的partition path
            List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);
            // 如果为空,说明没有需要清理的文件
            if (partitionsToClean.isEmpty()) {
                LOG.info("Nothing to clean here. It is already clean");
                return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
            }
            LOG.info("Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy());
            // 获取清理任务并行度。取待清理的partition数和配置的cleaner并行度两者的最小值
            // 后面的参数对应配置项hoodie.cleaner.parallelism
            int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
            LOG.info("Using cleanerParallelism: " + cleanerParallelism);
    
            context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName());
    
            // 将每个partition path映射为partition和partition中需要删除的文件
            // Pair类型第一个Boolean参数的含义为是否需要删除整个分区
            // 第二个参数的含义为partition中需要删除的文件信息
            Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context
                .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)
                .stream()
                .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
    
            // 将CleanFileInfo转换为HoodieCleanFileInfo
            Map<String, List<HoodieCleanFileInfo>> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream()
                .collect(Collectors.toMap(Map.Entry::getKey,
                                          e -> CleanerUtils.convertToHoodieCleanFileInfoList(e.getValue().getValue())));
    
            // 获取Pair中第一个参数为true的partition path列表
            // 即需要删除分区的partition path
            List<String> partitionsToDelete = cleanOpsWithPartitionMeta.entrySet().stream().filter(entry -> entry.getValue().getKey()).map(Map.Entry::getKey)
                .collect(Collectors.toList());
    
            // 构造cleaner plan并返回
            return new HoodieCleanerPlan(earliestInstant
                                         .map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
                                         planner.getLastCompletedCommitTimestamp(),
                                         config.getCleanerPolicy().name(), CollectionUtils.createImmutableMap(),
                                         CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps, partitionsToDelete);
        } catch (IOException e) {
            throw new HoodieIOException("Failed to schedule clean operation", e);
        }
    }
    

    该方法获取需要最早保留的instant,获取需要清理的分区和需要清理的文件,封装为HoodieCleanerPlan返回。该方法将这些操作委托给了CleanPlanner。接下来分析CleanPlanner

    CleanPlanner

    CleanPlannergetEarliestCommitToRetain方法作用为根据清理策略计算出从哪个commit开始需要保留(这个commit之前的所有commit会被清理掉)。代码如下:

    public Option<HoodieInstant> getEarliestCommitToRetain() {
        Option<HoodieInstant> earliestCommitToRetain = Option.empty();
        // 获取需要保留的commit个数,对应配置项hoodie.cleaner.commits.retained
        int commitsRetained = config.getCleanerCommitsRetained();
        // 获取需要保留多少个小时之内的commit,对应配置项hoodie.cleaner.hours.retained
        int hoursRetained = config.getCleanerHoursRetained();
        // 如果清理策略为保留最近N个commit
        // 并且已提交的instant数量大于commitsRetained
        if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS
            && commitTimeline.countInstants() > commitsRetained) {
            // 找到最早的pending commit
            Option<HoodieInstant> earliestPendingCommits = hoodieTable.getMetaClient()
                .getActiveTimeline()
                .getCommitsTimeline()
                .filter(s -> !s.isCompleted()).firstInstant();
            // 如果存在
            if (earliestPendingCommits.isPresent()) {
                // Earliest commit to retain must not be later than the earliest pending commit
                // 确保最早需要保留的commit时间必须在最早的pending commit之前(pending commit不能被clean)
                // nthInstant方法返回第n个instant
                // commitTimeline.countInstants()(共多少个) - commitsRetained(需要保留多少个)= 第n个
                earliestCommitToRetain =
                    commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained).map(nthInstant -> {
                    // 如果第n个commit时间小于等于earliestPendingCommits,返回这个commit
                    if (nthInstant.compareTo(earliestPendingCommits.get()) <= 0) {
                        return Option.of(nthInstant);
                    } else {
                        // 否则,返回earliestPendingCommits前一个commit
                        return commitTimeline.findInstantsBefore(earliestPendingCommits.get().getTimestamp()).lastInstant();
                    }
                }).orElse(Option.empty());
            } else {
                // 如果不存在pending的commit,直接计算第n个commit,和前面逻辑类似
                earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants()
                                                                   - commitsRetained); //15 instants total, 10 commits to retain, this gives 6th instant in the list
            }
        } else if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
            // 如果清理策略为保留最近n小时的commit
            // 获取当前时间
            Instant instant = Instant.now();
            ZonedDateTime currentDateTime = ZonedDateTime.ofInstant(instant, hoodieTable.getMetaClient().getTableConfig().getTimelineTimezone().getZoneId());
            // 计算需要最早的需要保留的commit的时间
            // 当前时间 - hoursRetained
            String earliestTimeToRetain = HoodieActiveTimeline.formatDate(Date.from(currentDateTime.minusHours(hoursRetained).toInstant()));
            // 找到所有时间在earliestTimeToRetain之后的commit,取第一个
            earliestCommitToRetain = Option.fromJavaOptional(commitTimeline.getInstantsAsStream().filter(i -> HoodieTimeline.compareTimestamps(i.getTimestamp(),
                                                                                                                                               HoodieTimeline.GREATER_THAN_OR_EQUALS, earliestTimeToRetain)).findFirst());
        }
        return earliestCommitToRetain;
    }
    

    CleanPlannergetPartitionPathsToClean方法找出需要clean的分区。逻辑如下:

    public List<String> getPartitionPathsToClean(Option<HoodieInstant> earliestRetainedInstant) throws IOException {
        switch (config.getCleanerPolicy()) {
            case KEEP_LATEST_COMMITS:
            case KEEP_LATEST_BY_HOURS:
                return getPartitionPathsForCleanByCommits(earliestRetainedInstant);
            case KEEP_LATEST_FILE_VERSIONS:
                // 保存最新版本的策略,必须处理所有文件。获取所有文件的partition path,后面分析
                return getPartitionPathsForFullCleaning();
            default:
                throw new IllegalStateException("Unknown Cleaner Policy");
        }
    }
    

    getPartitionPathsForCleanByCommits方法中判断是否可以采用增量获取的方式获取需要clean的partition path。如果不行,需要扫描所有的partition。

    private List<String> getPartitionPathsForCleanByCommits(Option<HoodieInstant> instantToRetain) throws IOException {
        // 参数校验
        if (!instantToRetain.isPresent()) {
            LOG.info("No earliest commit to retain. No need to scan partitions !!");
            return Collections.emptyList();
        }
    
        // 是否开启增量clean模式。开启后仅计算最近一次clean之后发生的事件。对应配置项hoodie.cleaner.incremental.mode
        // 开启可提高性能,默认为开启状态
        if (config.incrementalCleanerModeEnabled()) {
            // 获取最近一次已完成的clean instant
            Option<HoodieInstant> lastClean = hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
            if (lastClean.isPresent()) {
                // 如果存在
                if (hoodieTable.getActiveTimeline().isEmpty(lastClean.get())) {
                    // 如果为空,删除这个instant
                    hoodieTable.getActiveTimeline().deleteEmptyInstantIfExists(lastClean.get());
                } else {
                    // 获取上次clean的详细信息
                    HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
                        .deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
                    if ((cleanMetadata.getEarliestCommitToRetain() != null)
                        && (cleanMetadata.getEarliestCommitToRetain().length() > 0)
                        && !hoodieTable.getActiveTimeline().isBeforeTimelineStarts(cleanMetadata.getEarliestCommitToRetain())) {
                        // 如果获取到获取上次clean详细信息中的最早保留的commit
                        // 并且这个commit在第一个非savepoint类型的commit时间之后
                        // 获取上次clean之前保留的commit到这次clean需要保留的commit之间所有的instant对应文件的partition path
                        return getPartitionPathsForIncrementalCleaning(cleanMetadata, instantToRetain);
                    }
                }
            }
        }
        // 否则所有partition在clean之前都需要被扫描
        return getPartitionPathsForFullCleaning();
    }
    

    getPartitionPathsForIncrementalCleaning方法查找最近一次clean和earliestRetainedInstant之间需要清除文件的partition path。是一种优化措施,不需要扫描所有分区。代码如下:

    private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata cleanMetadata,
                                                                 Option<HoodieInstant> newInstantToRetain) {
        LOG.info("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
                 + "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
                 + ". New Instant to retain : " + newInstantToRetain);
        // 首先找到在cleanMetadata.getEarliestCommitToRetain()之后和newInstantToRetain之前的instant
        return hoodieTable.getCompletedCommitsTimeline().getInstantsAsStream().filter(
            instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS,
                                                        cleanMetadata.getEarliestCommitToRetain()) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
                                                                                                                                       HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())).flatMap(instant -> {
            try {
                if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
                    // 如果instant为replace commit类型
                    HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata.fromBytes(
                        hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);
                    // 已经被replace前后的文件(新老文件)都需要被清理
                    return Stream.concat(replaceCommitMetadata.getPartitionToReplaceFileIds().keySet().stream(), replaceCommitMetadata.getPartitionToWriteStats().keySet().stream());
                } else {
                    // 如果是普通commit类型,获取这次commit写入的所有文件
                    HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
                        .fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
                                   HoodieCommitMetadata.class);
                    return commitMetadata.getPartitionToWriteStats().keySet().stream();
                }
            } catch (IOException e) {
                throw new HoodieIOException(e.getMessage(), e);
            }
        }).distinct().collect(Collectors.toList());
        // stream去重之后返回
    }
    

    如果增量方式不可用,我们只能使用getPartitionPathsForFullCleaning方法,暴力扫描所有的分区并返回。

    private List<String> getPartitionPathsForFullCleaning() {
        // Go to brute force mode of scanning all partitions
        return FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), config.getBasePath());
    }
    

    我们回到CleanPlannergetDeletePaths方法。它返回分区中需要删除的文件。代码如下:

    /**
       * Returns files to be cleaned for the given partitionPath based on cleaning policy.
       */
    // 第一个boolean的含义是这个分区本身是否需要删除(如果分区内没有任何file group,分区就没必要存在)
    // 第二个参数的含义是需要删除的文件信息
    public Pair<Boolean, List<CleanFileInfo>> getDeletePaths(String partitionPath) {
        HoodieCleaningPolicy policy = config.getCleanerPolicy();
        Pair<Boolean, List<CleanFileInfo>> deletePaths;
        if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
            deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath);
        } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
            deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath);
        } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
            deletePaths = getFilesToCleanKeepingLatestHours(partitionPath);
        } else {
            throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());
        }
        LOG.info(deletePaths.getValue().size() + " patterns used to delete in partition path:" + partitionPath);
        if (deletePaths.getKey()) {
            LOG.info("Partition " + partitionPath + " to be deleted");
        }
        return deletePaths;
    }
    

    该方法根据不同的清理策略调用不同的底层方法。其中KEEP_LATEST_COMMITSKEEP_LATEST_BY_HOURS策略对应的底层方法相同。我们先分析它。

    private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(String partitionPath) {
        return getFilesToCleanKeepingLatestCommits(partitionPath, config.getCleanerCommitsRetained(), HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
    }
    
    private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestHours(String partitionPath) {
        return getFilesToCleanKeepingLatestCommits(partitionPath, 0, HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS);
    }
    

    getFilesToCleanKeepingLatestCommits为保留最近N个commit的策略。对于KEEP_LATEST_BY_HOURS,仍可以通过保留时间计算出需要保留的instant,因此逻辑是通用的。代码如下:

    private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(String partitionPath, int commitsRetained, HoodieCleaningPolicy policy) {
        LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. ");
        List<CleanFileInfo> deletePaths = new ArrayList<>();
    
        // Collect all the datafiles savepointed by all the savepoints
        // 获取所有savepoint对应的数据文件
        // 这些文件不能被删除掉
        List<String> savepointedFiles = hoodieTable.getSavepointTimestamps().stream()
            .flatMap(this::getSavepointedDataFiles)
            .collect(Collectors.toList());
    
        // determine if we have enough commits, to start cleaning.
        // 标志着整个分区是否需要删除
        boolean toDeletePartition = false;
        // instant数量超过需要保留的instant数量的时候,才需要清理
        if (commitTimeline.countInstants() > commitsRetained) {
            // 获取需要保留的最早的instant
            Option<HoodieInstant> earliestCommitToRetainOption = getEarliestCommitToRetain();
            HoodieInstant earliestCommitToRetain = earliestCommitToRetainOption.get();
            // all replaced file groups before earliestCommitToRetain are eligible to clean
            // 获取所有的被替换文件,排除savepoint包含的文件
            deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, earliestCommitToRetainOption));
            // add active files
            // 以及所有的活动文件
            List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
            for (HoodieFileGroup fileGroup : fileGroups) {
                List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());
    
                if (fileSliceList.isEmpty()) {
                    continue;
                }
                // 最新的版本
                String lastVersion = fileSliceList.get(0).getBaseInstantTime();
                // 在EarliestCommitToRetain之前的最新的版本
                String lastVersionBeforeEarliestCommitToRetain =
                    getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain);
    
                // Ensure there are more than 1 version of the file (we only clean old files from updates)
                // i.e always spare the last commit.
                // 对于file slice,保留最后一个版本,排除savepoint相关的文件
                for (FileSlice aSlice : fileSliceList) {
                    Option<HoodieBaseFile> aFile = aSlice.getBaseFile();
                    String fileCommitTime = aSlice.getBaseInstantTime();
                    if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) {
                        // do not clean up a savepoint data file
                        continue;
                    }
                    // 不删除最新版本的文件
                    if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
                        // Dont delete the latest commit and also the last commit before the earliest commit we
                        // are retaining
                        // The window of commit retain == max query run time. So a query could be running which
                        // still
                        // uses this file.
                        if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) {
                            // move on to the next file
                            continue;
                        }
                    } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
                        // This block corresponds to KEEP_LATEST_BY_HOURS policy
                        // Do not delete the latest commit.
                        if (fileCommitTime.equals(lastVersion)) {
                            // move on to the next file
                            continue;
                        }
                    }
    
                    // Always keep the last commit
                    // 如果file slice在接下来的compaction不需要被使用
                    // 并且file slice commit时间在earliestCommitToRetain之前
                    if (!isFileSliceNeededForPendingMajorOrMinorCompaction(aSlice) && HoodieTimeline
                        .compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) {
                        // this is a commit, that should be cleaned.
                        // 需要清理这个文件
                        aFile.ifPresent(hoodieDataFile -> {
                            deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(), false));
                            if (hoodieDataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) {
                                deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true));
                            }
                        });
                        // clean the log files for the commits, which contain cdc log files in cdc scenario
                        // and normal log files for mor tables.
                        // 清理这个slice对应的所有log文件
                        deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
                                           .collect(Collectors.toList()));
                    }
                }
            }
            // if there are no valid file groups for the partition, mark it to be deleted
            // 如果分区中所有file group为空,标记这个分区可以删除
            if (fileGroups.isEmpty()) {
                toDeletePartition = true;
            }
        }
        return Pair.of(toDeletePartition, deletePaths);
    }
    

    保留最新N个版本的逻辑和前面的不同。接下来分析getFilesToCleanKeepingLatestVersions方法。

    private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestVersions(String partitionPath) {
        LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained()
                 + " file versions. ");
        List<CleanFileInfo> deletePaths = new ArrayList<>();
        // Collect all the datafiles savepointed by all the savepoints
        // 获取所有savepoint对应的数据文件
        // 这些文件不能被删除掉
        List<String> savepointedFiles = hoodieTable.getSavepointTimestamps().stream()
            .flatMap(this::getSavepointedDataFiles)
            .collect(Collectors.toList());
    
        // In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely
        // In other words, the file versions only apply to the active file groups.
        // 所有替换掉的文件都可以被清理
        deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, Option.empty()));
        boolean toDeletePartition = false;
        List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
        for (HoodieFileGroup fileGroup : fileGroups) {
            // 获取需要保留最近多少个版本的file slice文件。对应配置项为hoodie.cleaner.fileversions.retained
            int keepVersions = config.getCleanerFileVersionsRetained();
            // do not cleanup slice required for pending compaction
            // 接下来压缩操作涉及到的file slice不能够被清理
            Iterator<FileSlice> fileSliceIterator =
                fileGroup.getAllFileSlices()
                .filter(fs -> !isFileSliceNeededForPendingMajorOrMinorCompaction(fs))
                .iterator();
            if (isFileGroupInPendingMajorOrMinorCompaction(fileGroup)) {
                // We have already saved the last version of file-groups for pending compaction Id
                // 如果接下来的压缩涉及到此file group
                // compaction完毕会生成一个新版本的文件,因此这里为了确保保留keepVersions个版本的文件,先自减1
                // 下面的步骤中就会有多一个老版本的file slice被清理
                // 这样在clean和compaction运行之后,保留的版本数正好为keepVersions个
                keepVersions--;
            }
    
            while (fileSliceIterator.hasNext() && keepVersions > 0) {
                // Skip this most recent version
                // 跳过keepVersions次next
                // 只清理后面的file slice,相当于保留了最近keepVersions个版本
                fileSliceIterator.next();
                keepVersions--;
            }
            // Delete the remaining files
            // 剩下的文件都需要被清理
            while (fileSliceIterator.hasNext()) {
                FileSlice nextSlice = fileSliceIterator.next();
                Option<HoodieBaseFile> dataFile = nextSlice.getBaseFile();
                if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) {
                    // do not clean up a savepoint data file
                    continue;
                }
                deletePaths.addAll(getCleanFileInfoForSlice(nextSlice));
            }
        }
        // if there are no valid file groups for the partition, mark it to be deleted
        // 如果分区中所有file group为空,标记这个分区可以删除
        if (fileGroups.isEmpty()) {
            toDeletePartition = true;
        }
        return Pair.of(toDeletePartition, deletePaths);
    }
    

    到这里clean计划的逻辑分析完毕。我们已经完成通过清理策略和保留commit配置计算出需要清理的partition及其下面的文件这一步。接下来是执行clean计划。

    Flink的clean逻辑

    HoodieFlinkCopyOnWriteTable::clean

    入口方法为HoodieFlinkCopyOnWriteTable::clean。代码如下:

    @Override
    public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime) {
        return new CleanActionExecutor(context, config, this, cleanInstantTime).execute();
    }
    

    Flink的clean操作由CleanActionExecutor承担。我们从它的execute方法开始分析。

    CleanActionExecutor

    execute方法执行clean计划,清理旧的schema和执行pending的clean计划。内容如下:

    @Override
    public HoodieCleanMetadata execute() {
        List<HoodieCleanMetadata> cleanMetadataList = new ArrayList<>();
        // If there are inflight(failed) or previously requested clean operation, first perform them
        List<HoodieInstant> pendingCleanInstants = table.getCleanTimeline()
            .filterInflightsAndRequested().getInstants();
        // 检查是否有已存在的pending和requested状态的clean instant
        if (pendingCleanInstants.size() > 0) {
            // try to clean old history schema.
            try {
                // 该类用来保存schema,或者读取schema历史(和schema evolution相关,位于.hoodie/.schema目录)和清理旧的schema文件
                FileBasedInternalSchemaStorageManager fss = new FileBasedInternalSchemaStorageManager(table.getMetaClient());
                // 清理旧的schema文件
                fss.cleanOldFiles(pendingCleanInstants.stream().map(is -> is.getTimestamp()).collect(Collectors.toList()));
            } catch (Exception e) {
                // we should not affect original clean logic. Swallow exception and log warn.
                LOG.warn("failed to clean old history schema");
            }
            pendingCleanInstants.forEach(hoodieInstant -> {
                // 如果instant详情为空,删除这个instant
                if (table.getCleanTimeline().isEmpty(hoodieInstant)) {
                    table.getActiveTimeline().deleteEmptyInstantIfExists(hoodieInstant);
                } else {
                    LOG.info("Finishing previously unfinished cleaner instant=" + hoodieInstant);
                    try {
                        // 运行pending的clean操作
                        // 保存clean结果到cleanMetadataList
                        cleanMetadataList.add(runPendingClean(table, hoodieInstant));
                    } catch (Exception e) {
                        LOG.warn("Failed to perform previous clean operation, instant: " + hoodieInstant, e);
                    }
                }
                // 重载timeline
                table.getMetaClient().reloadActiveTimeline();
                // 如果启用了metadata table,强制同步
                // metadata table维护了文件系统试图,避免操作大表的时候list file操作成为性能瓶颈
                // clean操作涉及到文件变更,因此需要强制通过metadata table
                if (config.isMetadataTableEnabled()) {
                    table.getHoodieView().sync();
                }
            });
        }
    
        // return the last clean metadata for now
        // TODO (NA) : Clean only the earliest pending clean just like how we do for other table services
        // This requires the CleanActionExecutor to be refactored as BaseCommitActionExecutor
        // 返回最后一个clean metadata
        return cleanMetadataList.size() > 0 ? cleanMetadataList.get(cleanMetadataList.size() - 1) : null;
    }
    

    执行clean计划逻辑位于runPendingClean方法。内容如下:

    HoodieCleanMetadata runPendingClean(HoodieTable<T, I, K, O> table, HoodieInstant cleanInstant) {
        try {
            HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(table.getMetaClient(), cleanInstant);
            return runClean(table, cleanInstant, cleanerPlan);
        } catch (IOException e) {
            throw new HoodieIOException(e.getMessage(), e);
        }
    }
    

    这个方法首先获取clean计划,然后执行这个计划。执行的逻辑位于runClean方法。该方法将instant转换为inflight状态,执行clean,成功后写入clean元数据,将instant转换为completed状态。

    private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstant cleanInstant, HoodieCleanerPlan cleanerPlan) {
        // 检查clean instant状态必须是requested或者是inflight
        ValidationUtils.checkArgument(cleanInstant.getState().equals(HoodieInstant.State.REQUESTED)
                                      || cleanInstant.getState().equals(HoodieInstant.State.INFLIGHT));
    
        HoodieInstant inflightInstant = null;
        try {
            final HoodieTimer timer = HoodieTimer.start();
            // 如果是requested状态,转换为inflight状态
            if (cleanInstant.isRequested()) {
                inflightInstant = table.getActiveTimeline().transitionCleanRequestedToInflight(cleanInstant,
                                                                                               TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
            } else {
                inflightInstant = cleanInstant;
            }
    
            // 执行清理计划,后面分析
            List<HoodieCleanStat> cleanStats = clean(context, cleanerPlan);
            if (cleanStats.isEmpty()) {
                return HoodieCleanMetadata.newBuilder().build();
            }
    
            table.getMetaClient().reloadActiveTimeline();
            // 生成clean的元数据,包含保留的最早instant,开始清理时间,耗时,删除的文件数,清理的分区元数据信息等
            HoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata(
                inflightInstant.getTimestamp(),
                Option.of(timer.endTimer()),
                cleanStats
            );
            // 事务加锁
            if (!skipLocking) {
                this.txnManager.beginTransaction(Option.of(inflightInstant), Option.empty());
            }
            // 写入clean的元数据
            writeTableMetadata(metadata, inflightInstant.getTimestamp());
            // 将inflight instant转换为completed状态
            table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant,
                                                                        TimelineMetadataUtils.serializeCleanMetadata(metadata));
            LOG.info("Marked clean started on " + inflightInstant.getTimestamp() + " as complete");
            return metadata;
        } catch (IOException e) {
            throw new HoodieIOException("Failed to clean up after commit", e);
        } finally {
            // 事务锁释放
            if (!skipLocking) {
                this.txnManager.endTransaction(Option.of(inflightInstant));
            }
        }
    }
    

    clean计划的执行位于clean方法。通过cleaner plan读取每个需要清理的partition和这些partition中需要清理的文件。然后删除这些文件。如果分区本身需要删除的化也一并删除。最后将clean操作的统计数据组装成为HoodieCleanStat。每个partition的clean操作对应一个HoodieCleanStat对象。

    List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) {
        int cleanerParallelism = Math.min(
            cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum(),
            config.getCleanerParallelism());
        LOG.info("Using cleanerParallelism: " + cleanerParallelism);
    
        context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of table: " + config.getTableName());
    
        // 从clean计划中获取每个partition需要删除的文件
        Stream<Pair<String, CleanFileInfo>> filesToBeDeletedPerPartition =
            cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
            .flatMap(x -> x.getValue().stream().map(y -> new ImmutablePair<>(x.getKey(),
                                                                             new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile()))));
    
        // 删除这些文件
        Stream<ImmutablePair<String, PartitionCleanStat>> partitionCleanStats =
            context.mapPartitionsToPairAndReduceByKey(filesToBeDeletedPerPartition,
                                                      iterator -> deleteFilesFunc(iterator, table), PartitionCleanStat::merge, cleanerParallelism);
    
        Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats
            .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
    
        // 获取需要删除的分区,并删除它们
        List<String> partitionsToBeDeleted = cleanerPlan.getPartitionsToBeDeleted() != null ? cleanerPlan.getPartitionsToBeDeleted() : new ArrayList<>();
        partitionsToBeDeleted.forEach(entry -> {
            try {
                deleteFileAndGetResult(table.getMetaClient().getFs(), table.getMetaClient().getBasePath() + "/" + entry);
            } catch (IOException e) {
                LOG.warn("Partition deletion failed " + entry);
            }
        });
    
        // Return PartitionCleanStat for each partition passed.
        // 组装并返回每个文件的清理状态
        return cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
            PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath)
                ? partitionCleanStatsMap.get(partitionPath)
                : new PartitionCleanStat(partitionPath);
            HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();
            return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
                .withEarliestCommitRetained(Option.ofNullable(
                    actionInstant != null
                    ? new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
                                        actionInstant.getAction(), actionInstant.getTimestamp())
                    : null))
                .withLastCompletedCommitTimestamp(cleanerPlan.getLastCompletedCommitTimestamp())
                .withDeletePathPattern(partitionCleanStat.deletePathPatterns())
                .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
                .withFailedDeletes(partitionCleanStat.failedDeleteFiles())
                .withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns())
                .withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles())
                .withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles())
                .isPartitionDeleted(partitionsToBeDeleted.contains(partitionPath))
                .build();
        }).collect(Collectors.toList());
    }
    

    到此为止Hudi表的清理操作分析完毕。

    参考文献

    Cleaning | Apache Hudi

    相关文章

      网友评论

        本文标题:Hudi 源码之 Cleaning service

        本文链接:https://www.haomeiwen.com/subject/ichamdtx.html