美文网首页
Hudi Timeline简析

Hudi Timeline简析

作者: LittleMagic | 来源:发表于2022-08-22 22:33 被阅读0次

    前言

    Long time no see(鞠躬

    最近终于开始尝试推广Hudi在部门内部的应用,作为流批一体计划的最后一块拼图,顺便复活许久未更的博客,希望今后至少能保持周更的节奏吧。

    在Hudi官方文档的开头列举了四大核心概念,分别是:

    • Timeline
    • File Layout
    • Table Types
    • Query Types

    本文就来简要地谈谈Timeline。

    Timeline作用与结构

    官网关于Timeline的页面洋洋洒洒介绍了很多,但是少了笔者认为最关键的、本质的概念:

    Timeline就是Hudi的事务日志。

    读者可以回想一下MySQL中的Redo/Undo Log、Kudu中的Redo/Undo File(可参见很久之前写的解析)。Timeline在Hudi中扮演的角色和它们基本相同(尽管Hudi并不是一个数据库系统),也就是说,Hudi依靠Timeline提供快照隔离(SI)的事务语义,并使得增量查询、Time-travel等特性成为可能。

    每张Hudi表都有一条Timeline,由许多Instant组成,其中维护了各个时间点在该表上进行的操作。每个Instant又包含以下3个主要field。

    • Time:操作进行的时间戳,单调递增,格式为yyyyMMddHHmmssSSS
    • Action:该时间戳进行的具体操作,如commitcompaction等,所有操作都是原子的;
    • State:这个Instant的状态,包含requestedinflightcompleted三种。

    Timeline和Instant的详细图示如下。

    关于各个Action和State值的含义,可直接参考文档,这里不再赘述。

    Timeline以文件序列的形式存储,其路径位于/path/to/table/.hoodie目录,每个文件的命名方式是[time].[action].[state](处于completed状态的Instant没有state后缀),例如:20220822181448272.deltacommit.inflight。不同类型的Action对应的文件格式由不同的Avro Schema定义,以一个已经完成的deltacommit操作为例,它对应的Instant数据节选如下:

    {
          "fileId" : "6e0ef835-2474-4182-b085-e64994788729",
          "path" : "2022-08-22/.6e0ef835-2474-4182-b085-e64994788729_20220822181218028.log.1_3-4-0",
          "prevCommit" : "20220822181218028",
          "numWrites" : 179,
          "numDeletes" : 0,
          "numUpdateWrites" : 179,
          "numInserts" : 0,
          "totalWriteBytes" : 60666,
          "totalWriteErrors" : 0,
          "tempPath" : null,
          "partitionPath" : "2022-08-22",
          "totalLogRecords" : 0,
          "totalLogFilesCompacted" : 0,
          "totalLogSizeCompacted" : 0,
          "totalUpdatedRecordsCompacted" : 0,
          "totalLogBlocks" : 0,
          "totalCorruptLogBlock" : 0,
          "totalRollbackBlocks" : 0,
          "fileSizeInBytes" : 199309,
          "minEventTime" : null,
          "maxEventTime" : null,
          "logVersion" : 1,
          "logOffset" : 0,
          "baseFile" : "6e0ef835-2474-4182-b085-e64994788729_0-4-0_20220822181218028.parquet",
          "logFiles" : [ ".6e0ef835-2474-4182-b085-e64994788729_20220822181218028.log.1_3-4-0" ],
          "recordsStats" : {
            "val" : null,
            "present" : false
          },
          "columnStats" : {
            "val" : null,
            "present" : false
          }
    }
    

    Timeline实现

    Timeline的类层次体系如下图所示。

    HoodieTimeline接口定义了所有合法的Action和State的组合(也就是Instant文件的扩展名组合),以及Instant的获取、过滤和文件名拼接等规范,主要的实现则位于HoodieDefaultTimeline类。所有的Instant维护在List<HoodieInstant>容器中。

    举个例子,Flink-Hudi Sink配备了生成Inline Compaction计划的算子CompactionPlanOperator,在每个Checkpoint完毕时负责调度。它需要在Timeline中寻找第一个pending的Compaction操作,就会用到HoodieDefaultTimeline提供的对应方法:

    // CompactionPlanOperator
      private void scheduleCompaction(HoodieFlinkTable<?> table, long checkpointId) throws IOException {
        // the first instant takes the highest priority.
        Option<HoodieInstant> firstRequested = table.getActiveTimeline().filterPendingCompactionTimeline()
            .filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).firstInstant();
        if (!firstRequested.isPresent()) {
          // do nothing.
          LOG.info("No compaction plan for checkpoint " + checkpointId);
          return;
        }
        // ......
      }
    // HoodieDefaultTimeline
      @Override
      public HoodieTimeline filterPendingCompactionTimeline() {
        return new HoodieDefaultTimeline(
            instants.stream().filter(s -> s.getAction().equals(HoodieTimeline.COMPACTION_ACTION) && !s.isCompleted()), details);
      }
    

    下面再来看看HoodieDefaultTimeline的两个实现。

    HoodieActiveTimeline

    顾名思义,HoodieActiveTimeline维护当前活动的Timeline,它的主要作用是读写不同Action、不同State对应的Instant文件,所以大部分操作都是直接对文件操作。以requested状态到inflight状态的转换为例,代码比较易懂,其他操作都类似:

      public void transitionRequestedToInflight(HoodieInstant requested, Option<byte[]> content,
          boolean allowRedundantTransitions) {
        HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, requested.getAction(), requested.getTimestamp());
        ValidationUtils.checkArgument(requested.isRequested(), "Instant " + requested + " in wrong state");
        transitionState(requested, inflight, content, allowRedundantTransitions);
      }
    
      private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data,
           boolean allowRedundantTransitions) {
        ValidationUtils.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp()));
        try {
          if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
            // Re-create the .inflight file by opening a new file and write the commit metadata in
            createFileInMetaPath(fromInstant.getFileName(), data, allowRedundantTransitions);
            Path fromInstantPath = getInstantFileNamePath(fromInstant.getFileName());
            Path toInstantPath = getInstantFileNamePath(toInstant.getFileName());
            boolean success = metaClient.getFs().rename(fromInstantPath, toInstantPath);
            if (!success) {
              throw new HoodieIOException("Could not rename " + fromInstantPath + " to " + toInstantPath);
            }
          } else {
            // Ensures old state exists in timeline
            LOG.info("Checking for file exists ?" + getInstantFileNamePath(fromInstant.getFileName()));
            ValidationUtils.checkArgument(metaClient.getFs().exists(getInstantFileNamePath(fromInstant.getFileName())));
            // Use Write Once to create Target File
            if (allowRedundantTransitions) {
              FileIOUtils.createFileInPath(metaClient.getFs(), getInstantFileNamePath(toInstant.getFileName()), data);
            } else {
              createImmutableFileInPath(getInstantFileNamePath(toInstant.getFileName()), data);
            }
            LOG.info("Create new file for toInstant ?" + getInstantFileNamePath(toInstant.getFileName()));
          }
        } catch (IOException e) {
          throw new HoodieIOException("Could not complete " + fromInstant, e);
        }
      }
    

    除此之外,HoodieActiveTimeline还有一个非常重要的功能是生成新的Instant时间戳:

      public static String createNewInstantTime(long milliseconds) {
        return lastInstantTime.updateAndGet((oldVal) -> {
          String newCommitTime;
          do {
            if (commitTimeZone.equals(HoodieTimelineTimeZone.UTC)) {
              LocalDateTime now = LocalDateTime.now(ZoneOffset.UTC);
              newCommitTime = now.format(MILLIS_INSTANT_TIME_FORMATTER);
            } else {
              Date d = new Date(System.currentTimeMillis() + milliseconds);
              newCommitTime = MILLIS_INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(d));
            }
          } while (HoodieTimeline.compareTimestamps(newCommitTime, HoodieActiveTimeline.LESSER_THAN_OR_EQUALS, oldVal));
          return newCommitTime;
        });
      }
    

    注意最近一个Instant的时间以AtomicReference<String>来维护,这样就可以通过CAS操作(updateAndGet())来保证Instant的时间戳单调递增。

    活动Timeline中可维护的Commit数目的上下界可由参数hoodie.keep.max.commitshoodie.keep.min.commits来指定,默认值分别为30和20。

    HoodieArchivedTimeline

    随着Hudi表不断写入,Instant会逐渐增多,为了降低活动Timeline上的文件压力,需要对比较久远的Instant进行归档,并将这些Instant从活动Timeline移除。这个操作一般是默认执行的(hoodie.archive.automatic默认为true ),归档后的Instant就会维护在HoodieArchivedTimeline中,位于/path/to/table/.hoodie/archived目录下。触发自动归档的Commit数上下界则由参数archive.max_commitsarchive.min_commits指定,默认值分别为50和40。

    HoodieArchivedTimeline进行归档的逻辑并不在它内部,而位于HoodieTimelineArchiver中,看官可自行参考其源码。为了进一步减少小文件的影响,在归档的同时还可以进行小文件合并,与合并操作相关的参数有:

    • hoodie.archive.merge.enable:是否启用归档合并,默认false;
    • hoodie.archive.merge.small.file.limit.bytes:小文件阈值,默认20971520字节;
    • hoodie.archive.merge.files.batch.size:合并小文件的批次大小,默认为10。

    The End

    晚安晚安。

    相关文章

      网友评论

          本文标题:Hudi Timeline简析

          本文链接:https://www.haomeiwen.com/subject/syhigrtx.html