美文网首页玩转大数据JavaFlink学习指南
Hudi 源码之 Flink Table Service 排期和

Hudi 源码之 Flink Table Service 排期和

作者: AlienPaul | 来源:发表于2024-04-21 14:55 被阅读0次

    前言

    Hudi Flink支持配置table service的异步执行。Schedule的时机为checkpoint完成的时候。执行过程在线程池中完成。Flink Hudi 常用的table service有compaction,clustering和clean三种。它们对应的配置项为:

    • clustering.async.enabled:是否开启异步的clustering。默认不开启。
    • compaction.async.enabled:是否开启异步compaction。默认开启。
    • clean.async.enabled:是否开启异步clean。默认开启。

    本篇主要分析Flink中Hudi table service的排期和执行时机。至于compaction clustering和clean表服务具体的执行逻辑,参见:

    Scheduling排期

    Hudi Flink table service的排期主要位于如下两个方法中:

    • StreamWriteOperatorCoordinator::handleInputEvent: batch模式需要schedule的table service。
    • StreamWriteOperatorCoordinator::notifyCheckpointComplete: streaming模式需要schedule的table service

    接下来我们分别分析这两个方法。
    handleEndInputWvent方法:

    private void handleEndInputEvent(WriteMetadataEvent event) {  
      addEventToBuffer(event);  
      // 如果已经接收到所有数据
      if (allEventsReceived()) {  
        // start to commit the instant.  
        // 提交
        // 如果数据成功写入,返回true,表示提交成功
        boolean committed = commitInstant(this.instant);  
        if (committed) {  
          // The executor thread inherits the classloader of the #handleEventFromOperator  
          // caller, which is a AppClassLoader.      Thread.currentThread().setContextClassLoader(getClass().getClassLoader());  
          // sync Hive synchronously if it is enabled in batch mode.  
          // 如果开启了hive sync,执行
          syncHive();  
          // schedules the compaction or clustering if it is enabled in batch execution mode  
          // 表服务排期
          scheduleTableServices(true);  
        }  
      }  
    }
    

    notifyCheckpointComplete方法。在checkpoint执行成功的时候执行回调。

    @Override  
    public void notifyCheckpointComplete(long checkpointId) {  
      executor.execute(  
          () -> {  
            // The executor thread inherits the classloader of the #notifyCheckpointComplete  
            // caller, which is a AppClassLoader.  
            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());  
            // for streaming mode, commits the ever received events anyway,  
            // the stream write task snapshot and flush the data buffer synchronously in sequence,        
            // so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)        
            // 如果数据成功写入,返回true,表示提交成功
            final boolean committed = commitInstant(this.instant, checkpointId);  
            // schedules the compaction or clustering if it is enabled in stream execution mode  
            // 排期表服务
            scheduleTableServices(committed);  
      
            if (committed) {  
              // start new instant.  
              // 写入instant
              startInstant();  
              // sync Hive if is enabled  
              // 如果开启了hive sync,执行
              syncHiveAsync();  
            }  
          }, "commits the instant %s", this.instant  
      );  
    }
    

    scheduleTableServices方法:

    private void scheduleTableServices(Boolean committed) {  
      // if compaction is on, schedule the compaction  
      // 如果是MOR表,并且开启了compaction.schedule.enabled配置(默认开启)
      if (tableState.scheduleCompaction) {  
        CompactionUtil.scheduleCompaction(writeClient, tableState.isDeltaTimeCompaction, committed);  
      }  
      // if clustering is on, schedule the clustering  
      // clustering.schedule.enabled如果开启(默认不开启)
      // 对于bucket index表,如果配置的是consistent hash(一致性hash),要求写入类型必须是upsert
      // 否则(SIMPLE类型)要求写入类型必须是insert
      if (tableState.scheduleClustering) {  
        ClusteringUtil.scheduleClustering(conf, writeClient, committed);  
      }  
    }
    

    Executing执行

    HoodieTableSink

    Hudi Flink创建table service异步任务流位于HoodieTableSink::getSinkRuntimeProvider

    // ...
    // Append mode  
    // 如果是增量写入模式
    if (OptionsResolver.isAppendMode(conf)) {  
      // close compaction for append mode  
      // 关闭compaction schedule模式
      conf.set(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false);  
      // append 模式写入数据
      DataStream<Object> pipeline = Pipelines.append(conf, rowType, dataStream);  
      // 如果需要异步clustering
      // write.operation为insert并且启用了异步clustering(clustering.async.enabled为true)
      if (OptionsResolver.needsAsyncClustering(conf)) {  
        // 执行clustering
        return Pipelines.cluster(conf, rowType, pipeline);  
        // 如果hoodie.cleaner.policy.failed.writes配置为lazy
      } else if (OptionsResolver.isLazyFailedWritesCleanPolicy(conf)) {  
        // add clean function to rollback failed writes for lazy failed writes cleaning policy  
        // 执行清理
        return Pipelines.clean(conf, pipeline);  
      } else {  
        // 否则什么也不做
        return Pipelines.dummySink(pipeline);  
      }  
    }  
      
    DataStream<Object> pipeline;  
    // bootstrap加载索引
    final DataStream<HoodieRecord> hoodieRecordDataStream =  
        Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(), overwrite);  
    // write pipeline  
    // 流式写入
    pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream);  
    // compaction  
    // 是否需要异步压缩
    // compaction.async.enabled是否为true。默认为true
    if (OptionsResolver.needsAsyncCompaction(conf)) {  
      // use synchronous compaction for bounded source.  
      // 如果是bounded数据源(有头有尾),使用同步压缩
      if (context.isBounded()) {  
        conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);  
      }  
      // 执行压缩
      return Pipelines.compact(conf, pipeline);  
    } else {  
      // 如果没有配置压缩,执行清理
      return Pipelines.clean(conf, pipeline);  
    }
    // ...
    

    通过上面的分析不难得知,Flink中compact,clean和clustering表服务都在Pipeline中创建。接下来我们分析Pipeline的源代码。

    Pipelines

    Pipeline创建了一条专用的数据流,这些数据流分别用来周期性创建compaction和clustering的执行计划,以及执行compact,clean和clustering。它们独立于系统的业务数据流。

    Pipelines::compact

    该方法用来启动周期压缩任务流。

    public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf, DataStream<Object> dataStream) {  
    // 使用CompactionPlanOperator下发compaction执行计划
      DataStreamSink<CompactionCommitEvent> compactionCommitEventDataStream = dataStream.transform("compact_plan_generate",  
              TypeInformation.of(CompactionPlanEvent.class),  
              new CompactionPlanOperator(conf))  
          // plan生成过程必须是单并行度
          .setParallelism(1) // plan generate must be singleton  
          .setMaxParallelism(1)  
          // make the distribution strategy deterministic to avoid concurrent modifications  
          // on the same bucket files      .keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())  
          // 使用CompactorOperator执行压缩计划
          .transform("compact_task",  
              TypeInformation.of(CompactionCommitEvent.class),  
              new CompactOperator(conf))  
          // 并行度配置为compaction.tasks
          .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))  
          // CompactionSommitSink检查并提交compaction instant
          .addSink(new CompactionCommitSink(conf))  
          .name("compact_commit")  
          // 执行commit的并行度必须是1
          .setParallelism(1); // compaction commit should be singleton  
      compactionCommitEventDataStream.getTransformation().setMaxParallelism(1);  
      return compactionCommitEventDataStream;  
    }
    

    CompactionPlanOperator::notifyCheckpointComplete。在checkpoint的时候检查是否生成的有requested状态的compaction instant。如果有,生成CompactionPlanEvent发往下游。
    SteamWriteOperatorCoordinator用来生成requested状态的compaction instant,CompactionPlanOperator用来获取到这些compaction instant,读取保存的执行计划然后发往下游。

    @Override  
    public void notifyCheckpointComplete(long checkpointId) {  
      try {  
        table.getMetaClient().reloadActiveTimeline();  
        // There is no good way to infer when the compaction task for an instant crushed  
        // or is still undergoing. So we use a configured timeout threshold to control the rollback:    // {@code FlinkOptions.COMPACTION_TIMEOUT_SECONDS},    // when the earliest inflight instant has timed out, assumes it has failed    // already and just rolls it back.  
        // comment out: do we really need the timeout rollback ?    // CompactionUtil.rollbackEarliestCompaction(table, conf);    
        scheduleCompaction(table, checkpointId);  
      } catch (Throwable throwable) {  
        // make it fail-safe  
        LOG.error("Error while scheduling compaction plan for checkpoint: " + checkpointId, throwable);  
      }  
    }
    

    scheduleCompaction方法读取第一个状态为reqested状态的compaction instant,获取到它的compaction plan,将该compaction涉及到的compaction plan中的compactionOperation(即Compaction操作涉及到的file group信息)包装为CompactionPlanEvent发往下游。

    private void scheduleCompaction(HoodieFlinkTable<?> table, long checkpointId) throws IOException {  
      // 获取包含所有pending compaction的timeline
      HoodieTimeline pendingCompactionTimeline = table.getActiveTimeline().filterPendingCompactionTimeline();  
      
      // the first instant takes the highest priority.  
      // 找到时间最早的requested compaction instant
      Option<HoodieInstant> firstRequested = pendingCompactionTimeline  
          .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).firstInstant();  
      // record metrics  
      compactionMetrics.setFirstPendingCompactionInstant(firstRequested);  
      compactionMetrics.setPendingCompactionCount(pendingCompactionTimeline.countInstants());  
      // 如果没有requested状态的compaction instant,说明没有必要schedule
      if (!firstRequested.isPresent()) {  
        // do nothing.  
        LOG.info("No compaction plan for checkpoint " + checkpointId);  
        return;  
      }  
      // 获取这个requested compaction instant对应的时间
      String compactionInstantTime = firstRequested.get().getTimestamp();  
      
      // generate compaction plan  
      // should support configurable commit metadata 
      获取compaction plan
      HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(  
          table.getMetaClient(), compactionInstantTime);  
    
      // 如果没有获取到有效的compactio plan
      if (compactionPlan == null || (compactionPlan.getOperations() == null)  
          || (compactionPlan.getOperations().isEmpty())) {  
        // do nothing.  
        LOG.info("Empty compaction plan for instant " + compactionInstantTime);  
      } else {  
        // 获取这个requested状态的 compaction instant
        HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);  
        // Mark instant as compaction inflight  
        // 将它的状态修改为inflight
        table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);  
        table.getMetaClient().reloadActiveTimeline();  
        // 获取plan中所有的compaction operation,封装为compactionPlanEvent发往下游
        List<CompactionOperation> operations = compactionPlan.getOperations().stream()  
            .map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());  
        LOG.info("Execute compaction plan for instant {} as {} file groups", compactionInstantTime, operations.size());  
        // 删除标记文件
        WriteMarkersFactory  
            .get(table.getConfig().getMarkersType(), table, compactionInstantTime)  
            .deleteMarkerDir(table.getContext(), table.getConfig().getMarkersDeleteParallelism());  
        // 每一个operation封装为一个CompactionPlanEvent
        // 这样到下游的时候可以将这些压缩任务均分
        // 每个并行度处理一部分file group的压缩
        for (CompactionOperation operation : operations) {  
          output.collect(new StreamRecord<>(new CompactionPlanEvent(compactionInstantTime, operation)));  
        }  
      }  
    }
    

    CompactOperatorprocessElement方法接收上面生成的CompactionPlanEvent,执行压缩任务。

    @Override  
    public void processElement(StreamRecord<CompactionPlanEvent> record) throws Exception {  
      final CompactionPlanEvent event = record.getValue();  
      // 获取inflight compaction instant time
      final String instantTime = event.getCompactionInstantTime(); 
      // 获取compaction operation 
      final CompactionOperation compactionOperation = event.getOperation(); 
      // 如果是异步压缩
      // 在线程池中执行压缩,不会影响checkpoint过程 
      if (asyncCompaction) {  
        // executes the compaction task asynchronously to not block the checkpoint barrier propagate.  
        executor.execute(  
            () -> doCompaction(instantTime, compactionOperation, collector, reloadWriteConfig()),  
            (errMsg, t) -> collector.collect(new CompactionCommitEvent(instantTime, compactionOperation.getFileId(), taskID)),  
            "Execute compaction for instant %s from task %d", instantTime, taskID);  
      } else {  
        // executes the compaction task synchronously for batch mode.  
        // 否则同步执行compaction
        LOG.info("Execute compaction for instant {} from task {}", instantTime, taskID);  
        doCompaction(instantTime, compactionOperation, collector, writeClient.getConfig());  
      }  
    }
    

    接下来的doCompaction方法间接调用了HoodieFlinkMergeOnReadTableCompactorcompact方法,前面的文章已有分析,这里不再赘述。

    Pipelines::cluster

    该方法启动周期clustering任务流。

    public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration conf, RowType rowType, DataStream<Object> dataStream) {  
      // 使用ClusteringPlanOperator下发clustering执行计划
      DataStream<ClusteringCommitEvent> clusteringStream = dataStream.transform("cluster_plan_generate",  
              TypeInformation.of(ClusteringPlanEvent.class),  
              new ClusteringPlanOperator(conf))  
          // 下发执行计划的并行度必须是1
          .setParallelism(1) // plan generate must be singleton  
          .setMaxParallelism(1) // plan generate must be singleton  
          .keyBy(plan ->  
              // make the distribution strategy deterministic to avoid concurrent modifications  
              // on the same bucket files    
              // 按照ClusteringPlanEvent的fileId分组
              // 针对同一个file slice的clustering操作会分配给相同的线程执行,防止并发修改    
              plan.getClusteringGroupInfo().getOperations()                .stream().map(ClusteringOperation::getFileId.collect(Collectors.joining()))  
          .transform("clustering_task",  
              TypeInformation.of(ClusteringCommitEvent.class),  
              // 通过ClusteringOperator执行clustering
              new ClusteringOperator(conf, rowType))  
          // clustering任务的并行度为clustering.tasks
          .setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS));  
      // 如果启用了排序,即clustering.plan.strategy.sort.columns配置项不为空
      // 配置该步骤的执行内存,对应配置项为write.sort.memory
      if (OptionsResolver.sortClusteringEnabled(conf)) {  
        ExecNodeUtil.setManagedMemoryWeight(clusteringStream.getTransformation(),  
            conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);  
      }  
      // 检查并提交clustering instant
      DataStreamSink<ClusteringCommitEvent> clusteringCommitEventDataStream = clusteringStream.addSink(new ClusteringCommitSink(conf))  
          .name("clustering_commit")  
          .setParallelism(1); // clustering commit should be singleton  
      clusteringCommitEventDataStream.getTransformation().setMaxParallelism(1);  
      return clusteringCommitEventDataStream;  
    }
    

    ClusteringPlanOperatorschedule clustering的过程和前面schedule compaction的非常相似。

    private void scheduleClustering(HoodieFlinkTable<?> table, long checkpointId) {  
      // 获取request状态的clustering instant
      List<HoodieInstant> pendingClusteringInstantTimes =  
          ClusteringUtils.getPendingClusteringInstantTimes(table.getMetaClient());  
      // the first instant takes the highest priority.  
      // 获取时间最早的一个
      Option<HoodieInstant> firstRequested = Option.fromJavaOptional(  
          pendingClusteringInstantTimes.stream()  
              .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).findFirst());  
      
      // record metrics  
      clusteringMetrics.setFirstPendingClusteringInstant(firstRequested);  
      clusteringMetrics.setPendingClusteringCount(pendingClusteringInstantTimes.size());  
      
      if (!firstRequested.isPresent()) {  
        // do nothing.  
        LOG.info("No clustering plan for checkpoint " + checkpointId);  
        return;  
      }  
      
      String clusteringInstantTime = firstRequested.get().getTimestamp();  
      
      // generate clustering plan  
      // should support configurable commit metadata  HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstantTime);  
      // 拿到之前生成的clustering plan
      Option<Pair<HoodieInstant, HoodieClusteringPlan>> clusteringPlanOption = ClusteringUtils.getClusteringPlan(  
          table.getMetaClient(), clusteringInstant);  
    
      // 如果没有获取到有效的clustering plan,直接返回
      if (!clusteringPlanOption.isPresent()) {  
        // do nothing.  
        LOG.info("No clustering plan scheduled");  
        return;  
      }  
      
      HoodieClusteringPlan clusteringPlan = clusteringPlanOption.get().getRight();  
      
      if (clusteringPlan == null || (clusteringPlan.getInputGroups() == null)  
          || (clusteringPlan.getInputGroups().isEmpty())) {  
        // do nothing.  
        LOG.info("Empty clustering plan for instant " + clusteringInstantTime);  
      } else {  
        // Mark instant as clustering inflight  
        table.getActiveTimeline().transitionReplaceRequestedToInflight(clusteringInstant, Option.empty());  
        table.getMetaClient().reloadActiveTimeline();  
        // 遍历所有的inputGroup,封装为ClusteringPlanEvent
        for (HoodieClusteringGroup clusteringGroup : clusteringPlan.getInputGroups()) {  
          LOG.info("Execute clustering plan for instant {} as {} file slices", clusteringInstantTime, clusteringGroup.getSlices().size());  
          output.collect(new StreamRecord<>(  
              new ClusteringPlanEvent(clusteringInstantTime, ClusteringGroupInfo.create(clusteringGroup), clusteringPlan.getStrategy().getStrategyParams())  
          ));  
        }  
      }  
    }
    

    Clustering对应一个file slice的操作封装为了HoodieClusteringGroup(和compatcion的CompactionOperation对应)。这里将同一个plan中所有的HoodieClusteringGroup,每一个封装为ClusteringPlanEvent,目的是为了下游可以并行执行clustering。

    ClusteringOperatorprocessElement方法执行clustering计划。和compaction相同,分为同步执行和异步执行两种方式。

    @Override  
    public void processElement(StreamRecord<ClusteringPlanEvent> element) throws Exception {  
      final ClusteringPlanEvent event = element.getValue();  
      final String instantTime = event.getClusteringInstantTime();  
      final List<ClusteringOperation> clusteringOperations = event.getClusteringGroupInfo().getOperations();  
      if (this.asyncClustering) {  
        // executes the compaction task asynchronously to not block the checkpoint barrier propagate.  
        executor.execute(  
            () -> doClustering(instantTime, clusteringOperations),  
            (errMsg, t) -> collector.collect(new ClusteringCommitEvent(instantTime, getFileIds(clusteringOperations), taskID)),  
            "Execute clustering for instant %s from task %d", instantTime, taskID);  
      } else {  
        // executes the clustering task synchronously for batch mode.  
        LOG.info("Execute clustering for instant {} from task {}", instantTime, taskID);  
        doClustering(instantTime, clusteringOperations);  
      }  
    }
    

    doClustering方法为clustering过程的纯Flink实现。

    private void doClustering(String instantTime, List<ClusteringOperation> clusteringOperations) throws Exception {  
      clusteringMetrics.startClustering();  
      // 采用bulk insert的方式写入clustering之后的数据
      BulkInsertWriterHelper writerHelper = new BulkInsertWriterHelper(this.conf, this.table, this.writeConfig,  
          instantTime, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(),  
          this.rowType, true);  
      
      Iterator<RowData> iterator;  
      // 如果clustering操作涉及到log文件,使用readRecordsForGroupWithLogs
      // 否则仅使用readRecordsForGroupBaseFiles
      // 这两个方法读取file group中的数据,以iterator的形式返回
      if (clusteringOperations.stream().anyMatch(operation -> CollectionUtils.nonEmpty(operation.getDeltaFilePaths()))) {  
        // if there are log files, we read all records into memory for a file group and apply updates.  
        iterator = readRecordsForGroupWithLogs(clusteringOperations, instantTime);  
      } else {  
        // We want to optimize reading records for case there are no log files.  
        iterator = readRecordsForGroupBaseFiles(clusteringOperations);  
      }  
      // 如果配置了clustering.plan.strategy.sort.columns
      // 说明需要排序
      if (this.sortClusteringEnabled) {  
        RowDataSerializer rowDataSerializer = new RowDataSerializer(rowType);
        // 使用BinaryexternalSorter来排序  
        // BinaryExternalSorter根据clustering.plan.strategy.sort.columns
        // 生成排序代码
        BinaryExternalSorter sorter = initSorter();  
        while (iterator.hasNext()) {  
          RowData rowData = iterator.next();  
          BinaryRowData binaryRowData = rowDataSerializer.toBinaryRow(rowData).copy();  
          sorter.write(binaryRowData);  
        }  
        // 使用bulk insert写入排序之后的数据
        BinaryRowData row = binarySerializer.createInstance();  
        while ((row = sorter.getIterator().next(row)) != null) {  
          writerHelper.write(row);  
        }  
        sorter.close();  
      } else {  
        while (iterator.hasNext()) {  
          writerHelper.write(iterator.next());  
        }  
      }  
      
      List<WriteStatus> writeStatuses = writerHelper.getWriteStatuses(this.taskID);  
      clusteringMetrics.endClustering();  
      collector.collect(new ClusteringCommitEvent(instantTime, getFileIds(clusteringOperations), writeStatuses, this.taskID));  
      writerHelper.close();  
    }
    

    Pipelines::clean

    clean方法的实现较为简单。Flink为数据流增加了一个CleanFunction类型的sink,并行度为1。代码如下所示。

    public static DataStreamSink<Object> clean(Configuration conf, DataStream<Object> dataStream) {  
      DataStreamSink<Object> cleanCommitDataStream = dataStream.addSink(new CleanFunction<>(conf))  
          .setParallelism(1)  
          .name("clean_commits");  
      cleanCommitDataStream.getTransformation().setMaxParallelism(1);  
      return cleanCommitDataStream;
    

    我们继续分析CleanFunction。如果启用了异步clean,CleanFunction在启动的时候(open)异步执行一次。创建checkpoint的时候snapshotState启动异步clean服务。在checkpoint完成的时候notifyCheckpointComplete等待clean错操作执行完毕。相关代码如下所示。

    @Override  
    public void open(Configuration parameters) throws Exception {  
      super.open(parameters);  
      this.writeClient = FlinkWriteClients.createWriteClient(conf, getRuntimeContext());  
      this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build();  
      String instantTime = writeClient.createNewInstantTime();  
      LOG.info(String.format("exec clean with instant time %s...", instantTime));  
      if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) {  
        executor.execute(() -> {  
          this.isCleaning = true;  
          try {  
            this.writeClient.clean(instantTime);  
          } finally {  
            this.isCleaning = false;  
          }  
        }, "wait for cleaning finish");  
      }  
    }  
      
    @Override  
    public void notifyCheckpointComplete(long l) throws Exception {  
      if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && isCleaning) {  
        executor.execute(() -> {  
          try {  
            this.writeClient.waitForCleaningFinish();  
          } finally {  
            // ensure to switch the isCleaning flag  
            this.isCleaning = false;  
          }  
        }, "wait for cleaning finish");  
      }  
    }  
      
    @Override  
    public void snapshotState(FunctionSnapshotContext context) throws Exception {  
      if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {  
        try {  
          this.writeClient.startAsyncCleaning();  
          this.isCleaning = true;  
        } catch (Throwable throwable) {  
          // catch the exception to not affect the normal checkpointing  
          LOG.warn("Error while start async cleaning", throwable);  
        }  
      }  
    }
    

    除了Pipeline::clean方法直接使用CleanFunction之外,我们还注意到ClusteringCommitSinkCompactionCommitSink都继承了CleanFunction,都没有重写snapshotStatenotifyCheckPointComplete方法。因此这两个sink的行为和CleanFunction一致,在checkpoint的时候会触发clean操作。
    继续观察这两个sink的doCommit方法,我们发现最后一段都有如下代码:

    // Whether to clean up the old log file when compaction  
    if (!conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED) && !isCleaning) {  
      this.writeClient.clean();  
    }
    

    该段代码表示,如果没有启用异步clean,且当前时刻没有clean没有正在执行,执行同步clean操作。
    接下来我们分析this.writeClient.startAsyncCleaning()调用,一路跟踪下去。跟踪过程中的非关键代码这里不再展示。

    • CleanFunction的this.writeClient.startAsyncCleaning()
    • HoodieFlinkWriteClient的tableServiceClient.startAsyncCleanerService(this);
    • BaseHoodieTableServiceClient的this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(writeClient);
      分析到这里,我们确定了clean表服务是在AsyncCleanerService中封装的。继续分析startAsyncCleaningIfEnabled方法,该方法首先判断是否开启了异步clean配置,如果开启了,创建一个异步clean服务,代码如下所示:
    public static AsyncCleanerService startAsyncCleaningIfEnabled(BaseHoodieWriteClient writeClient) {  
      HoodieWriteConfig config = writeClient.getConfig();  
      // hoodie.clean.automatic和hoodie.clean.async这两个任意一个配置为false,不运行该服务
      if (!config.isAutoClean() || !config.isAsyncClean()) {  
        LOG.info("The HoodieWriteClient is not configured to auto & async clean. Async clean service will not start.");  
        return null;  
      }  
      // 创建并启动AsyncCleanerService
      AsyncCleanerService asyncCleanerService = new AsyncCleanerService(writeClient);  
      asyncCleanerService.start(null);  
      return asyncCleanerService;  
    }
    

    我们继续分析asyncCleanerService.start,它位于HoodieAsyncServicestart方法中。
    其中的startService方法将异步服务逻辑本身和运行异步服务的executor封装为Pair返回。

    public void start(Function<Boolean, Boolean> onShutdownCallback) {  
      if (started) {  
        LOG.warn("The async service already started.");  
        return;  
      }  
      Pair<CompletableFuture, ExecutorService> res = startService();  
      future = res.getKey();  
      executor = res.getValue();  
      started = true;  
      shutdownCallback(onShutdownCallback);  
    }
    

    startService方法在实现类AsyncCleanerFunction中,如下所示。

    @Override  
    protected Pair<CompletableFuture, ExecutorService> startService() {  
      String instantTime = writeClient.createNewInstantTime();  
      LOG.info(String.format("Starting async clean service with instant time %s...", instantTime));  
      //在线程池中启动异步clean操作
      return Pair.of(CompletableFuture.supplyAsync(() -> {  
        writeClient.clean(instantTime);  
        return true;  
      }, executor), executor);  
    }
    

    调用完该方法之后,异步clean的逻辑封装赋值到future变量中。
    按照上面的分析notifyCheckpointComplete的时候执行waitForCompletion方法。如果clean操作还没有结束,这里阻塞等待其执行完毕。

    public static void waitForCompletion(AsyncCleanerService asyncCleanerService) {  
      if (asyncCleanerService != null) {  
        LOG.info("Waiting for async clean service to finish");  
        try {  
          asyncCleanerService.waitForShutdown();  
        } catch (Exception e) {  
          throw new HoodieException("Error waiting for async clean service to finish", e);  
        }  
      }  
    }
    

    分析HoodieAsnycServicewaitForshutdown方法,同步等待clean执行完毕,内容如下:

    public void waitForShutdown() throws ExecutionException, InterruptedException {  
      if (future == null) {  
        return;  
      }  
      try {  
        future.get();  
      } catch (ExecutionException ex) {  
        LOG.error("Service shutdown with error", ex);  
        throw ex;  
      }  
    }
    

    参考文献

    Compaction | Apache Hudi
    Cleaning | Apache Hudi
    Clustering | Apache Hudi

    相关文章

      网友评论

        本文标题:Hudi 源码之 Flink Table Service 排期和

        本文链接:https://www.haomeiwen.com/subject/gxevxjtx.html