美文网首页Java玩转大数据
Hudi 源码之Bucket Index

Hudi 源码之Bucket Index

作者: AlienPaul | 来源:发表于2023-09-04 08:33 被阅读0次

    背景

    对于Upsert操作,Hudi需要定位到数据所在的File Group。当File Group很多的时候,定位File Group的过程会成为性能瓶颈。

    Hudi 提供了索引的方式,保存了每个record key和他所属的file id的对应关系。然后将这些对应关系保存到外部存储系统(HBase, Flink状态后端等)。这种方式需要引入外部系统,运维的复杂度较高且索引数据量较大。除此之外Hudi还提供了Bloom filter方式。每个parquet文件都对应一个bloom filter。通过这个bloom filter可以很容易确定数据不在这个parquet文件。有助于在扫描parquet文件的时候快速跳过无关的文件。但是在确认数据在某个parquet的时候,因bloom filter存在误判的可能性,需要逐条比对数据,存在较大的性能消耗。

    在这个背景下提出了Hudi bucket Index。它是一种优化措施,将每个partition中的file group分为N份,N为bucket个数。每个分区下的File group个数一旦确定不再会变化(除了Clustering的时候)。未启用bucket index的情况下file group的file id使用UUID标识。启用了bucket index之后。每个file id的前8为被替换为bucket number(同一个partition中的不同bucket使用bucket number标识)。通过数据的record key取hash运算可以将数据映射到不同的bucket上。也就是说bucket index通过partition -> bucket number两个层级来定位record所属的file group。这两级查找时间复杂度都是O(1),无需遍历数据文件,极大的提高了查找的速度。

    除此之外,在查询的时候如果使用bucket字段作为查询筛选条件,由于bucket字段相同的数据一定位于同一个bucket中,可以跳过其他的file group,减少扫描的数据量。

    使用bucket index需要注意的是,每个partition的bucket数量一旦确定就无法更改。Hudi的小文件处理策略和大文件分块不再有效。所以说使用前需要预估数据量。如果bucket数量过少,每个file group文件大小会过大,不利于并发处理。如果bucket数量过多,会遇到大量小文件问题,会增大分布式文件系统元数据负载,降低持续读写性能。

    Bucket index配置项

    • index.type(Flink) / hoodie.index.type(Spark)。使用的索引类型。如果要使用bucket index,需要配置为BUCKET
    • hoodie.bucket.index.num.buckets。bucket个数,默认为256。在Flink中默认为4。
    • hoodie.bucket.index.hash.field。按照哪个资源hash分桶。不配置默认使用record key。

    Bucket Index的原理

    Pipelines

    我们从构建bucket写入逻辑的BucketStreamWriteOperator所在的PipelineshoodieStreamWrite方法开始分析。它的代码如下:

    public static DataStream<Object> hoodieStreamWrite(Configuration conf, DataStream<HoodieRecord> dataStream) {
        // 如果表的index类型是BUCKET。对应配置项index.type
        if (OptionsResolver.isBucketIndexType(conf)) {
            // 使用Bucket类型的StreamWriter
            WriteOperatorFactory<HoodieRecord> operatorFactory = BucketStreamWriteOperator.getFactory(conf);
            // 获取bucket个数,对应配置项hoodie.bucket.index.num.buckets。在Flink中默认为4
            int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
            // 获取index key字段,对应配置项hoodie.bucket.index.hash.field。如果没有配置,使用record key
            String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
            // 使用bucket index分区器,根据record key,partition和flink channel数量分区
            BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
            return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
                .transform(opName("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory)
                .uid(opUID("bucket_write", conf))
                .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
        } else {
            // 其他数据类型使用StreamWriteOperator
            WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf);
            return dataStream
                // Key-by record key, to avoid multiple subtasks write to a bucket at the same time
                // 先按照record key分区
                // 这里的bucket概念和bucket index中的bucket不同
                // 这里的bucket是根据file id和partition分组的概念,同时还考虑到了小文件聚合(将insert的数据优先分配到小文件)
                // bucket作为整体flush到磁盘上
                .keyBy(HoodieRecord::getRecordKey)
                // 分配bucket
                .transform(
                "bucket_assigner",
                TypeInformation.of(HoodieRecord.class),
                new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
                .uid(opUID("bucket_assigner", conf))
                .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
                // shuffle by fileId(bucket id)
                // 确定好数据所属的file id,分区写入
                .keyBy(record -> record.getCurrentLocation().getFileId())
                .transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)
                .uid(opUID("stream_write", conf))
                .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
        }
    }
    

    BucketStreamWriteOperator代码较少,在构造函数中创建出了BucketStreamWriteFunction。我们接下来分析它。

    BucketStreamWriteFunction

    processElement方法中,Hudi根据record的key计算出record对应的bucket number。结合record所在的partition可以很快的确定数据所在的file group。

    @Override
    public void processElement(I i, ProcessFunction<I, Object>.Context context, Collector<Object> collector) throws Exception {
        HoodieRecord<?> record = (HoodieRecord<?>) i;
        // 获取key
        final HoodieKey hoodieKey = record.getKey();
        // 获取partition path
        final String partition = hoodieKey.getPartitionPath();
        final HoodieRecordLocation location;
    
        // 在同一个partition中,从Hudi表中读取bucket number和file id的对应关系,放入索引中(bucketIndex)
        // 后面分析
        bootstrapIndexIfNeed(partition);
        // 从索引读取该分区中bucket number和file id的对应关系
        Map<Integer, String> bucketToFileId = bucketIndex.computeIfAbsent(partition, p -> new HashMap<>());
        // 获取record对应的bucket number
        // 代码为getHashKeys(hoodieKey, indexKeyFields).hashCode() & Integer.MAX_VALUE) % numBuckets
        // indexKeyField为配置项hoodie.bucket.index.hash.field的值,如果没有配置,使用record key
        final int bucketNum = BucketIdentifier.getBucketId(hoodieKey, indexKeyFields, this.bucketNum);
        // 组装bucket id
        final String bucketId = partition + "/" + bucketNum;
    
        // incBucketIndex是新增数据的bucketIndex缓存
        if (incBucketIndex.contains(bucketId)) {
            // 如果是新增数据
            // 根据bucket number找到对应的file group的id
            location = new HoodieRecordLocation("I", bucketToFileId.get(bucketNum));
        } else if (bucketToFileId.containsKey(bucketNum)) {
            // 如果索引中有,说明是修改的数据
            location = new HoodieRecordLocation("U", bucketToFileId.get(bucketNum));
        } else {
            // 如果任何索引中都没有,该bucket number还没有对应的file group,需要创建一个
            // 生成新的file id,替换前8位为bucket number
            String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);
            location = new HoodieRecordLocation("I", newFileId);
            // 加入索引缓存中
            bucketToFileId.put(bucketNum, newFileId);
            incBucketIndex.add(bucketId);
        }
        record.unseal();
        // 设置数据的location
        record.setCurrentLocation(location);
        record.seal();
        // 将数据缓存起来
        bufferRecord(record);
    }
    

    bootstrapIndexIfNeed方法在指定的partition中,从Hudi表中读取bucket number和file id的对应关系放入索引。代码如下:

    private void bootstrapIndexIfNeed(String partition) {
        // 如果是insert overwrite,跳过
        if (OptionsResolver.isInsertOverwrite(config)) {
            // skips the index loading for insert overwrite operation.
            return;
        }
        // 如果partition已被索引,返回
        if (bucketIndex.containsKey(partition)) {
            return;
        }
        LOG.info(String.format("Loading Hoodie Table %s, with path %s", this.metaClient.getTableConfig().getTableName(),
                               this.metaClient.getBasePath() + "/" + partition));
    
        // Load existing fileID belongs to this task
        // 索引的数据类型为map,key为bucket number
        // valu为file id
        Map<Integer, String> bucketToFileIDMap = new HashMap<>();
        // 遍历partition中的所有file slice
        this.writeClient.getHoodieTable().getHoodieView().getLatestFileSlices(partition).forEach(fileSlice -> {
            // 获取file id
            String fileId = fileSlice.getFileId();
            // file id的前8位是bucket number,获取它
            int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileId);
            // 检查这个bucket是否归本task处理
            // 每个task只缓存自己需要处理的bucket的索引
            if (isBucketToLoad(bucketNumber, partition)) {
                LOG.info(String.format("Should load this partition bucket %s with fileId %s", bucketNumber, fileId));
                // Validate that one bucketId has only ONE fileId
                // 检查一个bucket number只对应一个file id
                if (bucketToFileIDMap.containsKey(bucketNumber)) {
                    throw new RuntimeException(String.format("Duplicate fileId %s from bucket %s of partition %s found "
                                                             + "during the BucketStreamWriteFunction index bootstrap.", fileId, bucketNumber, partition));
                } else {
                    LOG.info(String.format("Adding fileId %s to the bucket %s of partition %s.", fileId, bucketNumber, partition));
                    // 对应关系加入缓存中
                    bucketToFileIDMap.put(bucketNumber, fileId);
                }
            }
        });
        // 加入缓存
        bucketIndex.put(partition, bucketToFileIDMap);
    }
    

    Flink Hudi默认的state索引

    作为对比,我们再去分析下Flink state索引的实现方式。Flink state索引保存了record key和file id的对应关系,保存在Flink的状态后端中。

    接下来我们分别分析使用索引和加载索引的方式。

    使用索引方式

    按照前面Pipelines的分析,在数据流向BucketAssignFunctionprocessElement方法之前已经按照record key分区。所以索引和record key是一一对应关系。

    @Override
    public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
        if (value instanceof IndexRecord) {
            // 如果读进来的数据是IndexRecord类型,说明处于加载索引的阶段
            IndexRecord<?> indexRecord = (IndexRecord<?>) value;
            // 更新保存的索引状态
            this.indexState.update((HoodieRecordGlobalLocation) indexRecord.getCurrentLocation());
        } else {
            // 处理用户数据
            processRecord((HoodieRecord<?>) value, out);
        }
    }
    

    processRecord方法读取状态中的索引。如果record的partition path没有发生变化,数据还在原先索引指向的位置,否则需要分配新的位置,更新索引。数据的位置和partition, record key有关。

    private void processRecord(HoodieRecord<?> record, Collector<O> out) throws Exception {
        // 1. put the record into the BucketAssigner;
        // 2. look up the state for location, if the record has a location, just send it out;
        // 3. if it is an INSERT, decide the location using the BucketAssigner then send it out.
        final HoodieKey hoodieKey = record.getKey();
        // 获取key和partition path
        final String recordKey = hoodieKey.getRecordKey();
        final String partitionPath = hoodieKey.getPartitionPath();
        final HoodieRecordLocation location;
    
        // Only changing records need looking up the index for the location,
        // append only records are always recognized as INSERT.
        // 从状态中获取上次record位置
        HoodieRecordGlobalLocation oldLoc = indexState.value();
        // upsert,upsert_prepped或者delete的时候isChangingRecords为true
        if (isChangingRecords && oldLoc != null) {
            // Set up the instant time as "U" to mark the bucket as an update bucket.
            // 如果partition path发生了变化
            // record的partition字段值发生变化会导致partition path发生变化
            if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) {
                // 如果开启了全局索引,意思如果是新旧数据的partition path不同,是否更新旧数据的partition path
                if (globalIndex) {
                    // if partition path changes, emit a delete record for old partition path,
                    // then update the index state using location with new partition path.
                    // 生成一个删除类型的数据,指向旧的partition path
                    HoodieRecord<?> deleteRecord = new HoodieAvroRecord<>(new HoodieKey(recordKey, oldLoc.getPartitionPath()),
                                                                          payloadCreation.createDeletePayload((BaseAvroPayload) record.getData()));
    
                    deleteRecord.unseal();
                    // 设置instant time为U
                    deleteRecord.setCurrentLocation(oldLoc.toLocal("U"));
                    deleteRecord.seal();
    
                    out.collect((O) deleteRecord);
                }
                // 获取新数据的location
                location = getNewRecordLocation(partitionPath);
            } else {
                // 如果partition path没有发生变化
                location = oldLoc.toLocal("U");
                // 为update类型record创建或加入bucket
                this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
            }
        } else {
            // 新增数据,创建新的location
            location = getNewRecordLocation(partitionPath);
        }
        // always refresh the index
        if (isChangingRecords) {
            // 如果数据更新,需要紧接着更新index状态变量
            updateIndexState(partitionPath, location);
        }
    
        // 配置record的location
        record.unseal();
        record.setCurrentLocation(location);
        record.seal();
    
        out.collect((O) record);
    }
    

    这段方法中BucketAssigner更详细的分析可以参考Hudi 源码之数据写入逻辑

    加载索引的方式

    Pipelines::bootstrap启动方法流式启动调用的是streamBootstrap。该方法创建了BootstrapOperator

    BootstrapOperator在启动初始化状态量的时候调用initializeState,从Hudi表加载索引。

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        ListStateDescriptor<String> instantStateDescriptor = new ListStateDescriptor<>(
            "instantStateDescriptor",
            Types.STRING
        );
        instantState = context.getOperatorStateStore().getListState(instantStateDescriptor);
    
        if (context.isRestored()) {
            Iterator<String> instantIterator = instantState.get().iterator();
            if (instantIterator.hasNext()) {
                lastInstantTime = instantIterator.next();
            }
        }
    
        this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf);
        this.writeConfig = FlinkWriteClients.getHoodieClientConfig(this.conf, true);
        this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext());
        this.ckpMetadata = CkpMetadata.getInstance(hoodieTable.getMetaClient().getFs(), this.writeConfig.getBasePath());
        this.aggregateManager = getRuntimeContext().getGlobalAggregateManager();
    
        preLoadIndexRecords();
    }
    

    继续分析preLoadIndexRecords方法。该方法判断需要加载哪些分区的索引。代码如下:

    protected void preLoadIndexRecords() throws Exception {
        String basePath = hoodieTable.getMetaClient().getBasePath();
        int taskID = getRuntimeContext().getIndexOfThisSubtask();
        LOG.info("Start loading records in table {} into the index state, taskId = {}", basePath, taskID);
        // 遍历所有的分区
        for (String partitionPath : FSUtils.getAllFoldersWithPartitionMetaFile(FSUtils.getFs(basePath, hadoopConf), basePath)) {
            // 如果分区名称匹配正则(对应index.partition.regex配置项),加载该分区的索引
            if (pattern.matcher(partitionPath).matches()) {
                loadRecords(partitionPath);
            }
        }
    
        LOG.info("Finish sending index records, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask());
    
        // wait for the other bootstrap tasks finish bootstrapping.
        waitForBootstrapReady(getRuntimeContext().getIndexOfThisSubtask());
    }
    

    loadRecords方法读取所有分区下的record(包含base file和log),包装为IndexRecord发往下游。Flink下游算子接收到IndexRecord会更新状态变量。

    protected void loadRecords(String partitionPath) throws Exception {
        long start = System.currentTimeMillis();
    
        // 获取并行度
        final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
        // 最大并行度
        final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
        // 当前作业id
        final int taskID = getRuntimeContext().getIndexOfThisSubtask();
    
        HoodieTimeline commitsTimeline = this.hoodieTable.getMetaClient().getCommitsTimeline();
        // 获取上次snapshot之后的所有commit组成的timeline
        if (!StringUtils.isNullOrEmpty(lastInstantTime)) {
            commitsTimeline = commitsTimeline.findInstantsAfter(lastInstantTime);
        }
        // 找到最近的已完成的commit instant
        Option<HoodieInstant> latestCommitTime = commitsTimeline.filterCompletedInstants().lastInstant();
    
        // 如果存在
        if (latestCommitTime.isPresent()) {
            // 根据不同的文件类型,获取不同的文件读取工具
            BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
            // 读取schema
            Schema schema = new TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema();
    
            // 获取latestCommitTime之前的instant,如果有还没有完成的compaction,将这个的instant和前一个合并(将这两个file slice的log文件视为一个file slice的)后返回
            List<FileSlice> fileSlices = this.hoodieTable.getSliceView()
                .getLatestMergedFileSlicesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp())
                .collect(toList());
    
            for (FileSlice fileSlice : fileSlices) {
                // 如果这个fileSlice不归该任务处理,跳过
                if (!shouldLoadFile(fileSlice.getFileId(), maxParallelism, parallelism, taskID)) {
                    continue;
                }
                LOG.info("Load records from {}.", fileSlice);
    
                // load parquet records
                fileSlice.getBaseFile().ifPresent(baseFile -> {
                    // filter out crushed files
                    // 如果base file为空或受损,跳过
                    if (!isValidFile(baseFile.getFileStatus())) {
                        return;
                    }
                    try (ClosableIterator<HoodieKey> iterator = fileUtils.getHoodieKeyIterator(this.hadoopConf, new Path(baseFile.getPath()))) {
                        // 逐个读取base file中保存的record,包装为IndexRecord类型发往下游
                        iterator.forEachRemaining(hoodieKey -> {
                            output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice))));
                        });
                    }
                });
    
                // load avro log records
                // 获取所有的log file路径
                List<String> logPaths = fileSlice.getLogFiles()
                    .sorted(HoodieLogFile.getLogFileComparator())
                    // filter out crushed files
                    .filter(logFile -> isValidFile(logFile.getFileStatus()))
                    .map(logFile -> logFile.getPath().toString())
                    .collect(toList());
                HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, latestCommitTime.get().getTimestamp(),
                                                                              writeConfig, hadoopConf);
    
                try {
                    // 读取出这些log文件中的数据,包装为IndexRecord类型发往下游
                    for (String recordKey : scanner.getRecords().keySet()) {
                        output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(new HoodieKey(recordKey, partitionPath), fileSlice))));
                    }
                } catch (Exception e) {
                    throw new HoodieException(String.format("Error when loading record keys from files: %s", logPaths), e);
                } finally {
                    scanner.close();
                }
            }
        }
    
        long cost = System.currentTimeMillis() - start;
        LOG.info("Task [{}}:{}}] finish loading the index under partition {} and sending them to downstream, time cost: {} milliseconds.",
                 this.getClass().getSimpleName(), taskID, partitionPath, cost);
    }
    

    参考文献

    Hudi Bucket Index 在字节跳动的设计与实践 - 知乎 (zhihu.com)

    相关文章

      网友评论

        本文标题:Hudi 源码之Bucket Index

        本文链接:https://www.haomeiwen.com/subject/zgzfvdtx.html