美文网首页Java
Hudi 源码之Bloom Index

Hudi 源码之Bloom Index

作者: AlienPaul | 来源:发表于2023-11-14 13:49 被阅读0次

    Bloom filter

    Bloom filter中文名为布隆过滤器。用于检索一个元素是否在一个集合中。这个算法具有很高的时间和空间效率。但代价是存在一定的误判概率(假阳)。Bloom filter判断某个元素不存在,则一定不存在;判断某个元素存在,会有较小的概率不存在。可以根据集合数据量精心设计bloom filter的bitset大小,平衡误判的概率和bitset的存储占用。

    Bloom filter详解请见:深入浅出BloomFilter原理

    Hudi在upsert操作的时候需要定位record位于哪个数据文件。使用bloom filter可以加速定位数据的过程。Hudi索引的配置参见:Indexing | Apache Hudi

    Bloom filter 索引定位过程

    1. 找出需要查询索引的record所属的partition path,找出这些partition path对应的数据文件,以及每个数据文件存储的record key的范围。可以快速排除绝大部分的数据文件,加快索引定位速度。
    2. 根据上面的record key范围信息,将record和可能位于的数据文件对应起来。这个范围太广,接下来步骤需要缩小范围。
    3. 利用这些数据文件的bloom filter,将不存在的数据排除,可能存在的数据保存起来(bloom filter存在假阳)。
    4. 逐个验证数据文件中是否真的包含bloom filter判断可能存在的数据,将假阳性的剔除。

    源代码分析

    HoodieBloomIndex

    Hudi利用索引标记数据位置的方法为HoodieIndex::tagLocation。这个方法不同的索引类型有不同的实现。其中bloom filter对应的是HoodieBloomIndex::tagLocation。它的源代码如下所示:

    @Override
    public <R> HoodieData<HoodieRecord<R>> tagLocation(
        HoodieData<HoodieRecord<R>> records, HoodieEngineContext context,
        HoodieTable hoodieTable) {
        // Step 0: cache the input records if needed
        // 对应hoodie.bloom.index.use.caching配置,默认为true
        // 将需要index查找的数据缓存起来
        if (config.getBloomIndexUseCaching()) {
            records.persist(new HoodieConfig(config.getProps())
                            .getString(HoodieIndexConfig.BLOOM_INDEX_INPUT_STORAGE_LEVEL_VALUE));
        }
    
        // Step 1: Extract out thinner pairs of (partitionPath, recordKey)
        // 提取出这些数据的partition path和record key
        HoodiePairData<String, String> partitionRecordKeyPairs = records.mapToPair(
            record -> new ImmutablePair<>(record.getPartitionPath(), record.getRecordKey()));
    
        // Step 2: Lookup indexes for all the partition/recordkey pair
        //查找索引,找到这些数据所在文件的信息(file id)
        HoodiePairData<HoodieKey, HoodieRecordLocation> keyFilenamePairs =
            lookupIndex(partitionRecordKeyPairs, context, hoodieTable);
    
        // Cache the result, for subsequent stages.
        // 如果启用了缓存,将查找结果缓存起来
        if (config.getBloomIndexUseCaching()) {
            keyFilenamePairs.persist(new HoodieConfig(config.getProps())
                                     .getString(HoodieIndexConfig.BLOOM_INDEX_INPUT_STORAGE_LEVEL_VALUE));
        }
        if (LOG.isDebugEnabled()) {
            long totalTaggedRecords = keyFilenamePairs.count();
            LOG.debug("Number of update records (ones tagged with a fileID): " + totalTaggedRecords);
        }
    
        // Step 3: Tag the incoming records, as inserts or updates, by joining with existing record keys
        // 将数据位置标记到records中并返回
        HoodieData<HoodieRecord<R>> taggedRecords = tagLocationBacktoRecords(keyFilenamePairs, records, hoodieTable);
    
        if (config.getBloomIndexUseCaching()) {
            records.unpersist();
            keyFilenamePairs.unpersist();
        }
    
        return taggedRecords;
    }
    

    LookupIndex返回record(用record key表示)和所在位置(instant time和file ID)的对应关系。

    private HoodiePairData<HoodieKey, HoodieRecordLocation> lookupIndex(
        HoodiePairData<String, String> partitionRecordKeyPairs, final HoodieEngineContext context,
        final HoodieTable hoodieTable) {
        // Step 1: Obtain records per partition, in the incoming records
        // 获取每个分区对应的record key的个数
        Map<String, Long> recordsPerPartition = partitionRecordKeyPairs.countByKey();
        // 获取这些partition path
        List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());
    
        // Step 2: Load all involved files as <Partition, filename> pairs
        // 获取每个分区包含的file id和这个文件的最大最小record key的对应关系
        List<Pair<String, BloomIndexFileInfo>> fileInfoList = getBloomIndexFileInfoForPartitions(context, hoodieTable, affectedPartitionPathList);
        // 将相同分区下的所有文件组成list形式
        final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
            fileInfoList.stream().collect(groupingBy(Pair::getLeft, mapping(Pair::getRight, toList())));
    
        // Step 3: Obtain a HoodieData, for each incoming record, that already exists, with the file id,
        // that contains it.
        // 根据前面所述的对应关系,找到这些record可能属于的文件
        // 返回file group id和record key的对应关系
        HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs =
            explodeRecordsWithFileComparisons(partitionToFileInfo, partitionRecordKeyPairs);
    
        // 使用bloom filter,获取record key所在的文件
        return bloomIndexHelper.findMatchingFilesForRecordKeys(config, context, hoodieTable,
                                                               partitionRecordKeyPairs, fileComparisonPairs, partitionToFileInfo, recordsPerPartition);
    }
    

    getBloomIndexFileInfoForPartitions用来获取partition path包含的数据文件以及record key范围。

    private List<Pair<String, BloomIndexFileInfo>> getBloomIndexFileInfoForPartitions(HoodieEngineContext context,
                                                                                      HoodieTable hoodieTable,
                                                                                      List<String> affectedPartitionPathList) {
        List<Pair<String, BloomIndexFileInfo>> fileInfoList = new ArrayList<>();
    
        // 对应配置hoodie.bloom.index.prune.by.ranges,默认启用
        // 如果启用,可以从metadata table的column stat(列统计信息)中读取key范围
        // 否则,需要逐个从数据文件中读取,速度较慢
        if (config.getBloomIndexPruneByRanges()) {
            // load column ranges from metadata index if column stats index is enabled and column_stats metadata partition is available
            if (config.getBloomIndexUseMetadata()
                && hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions().contains(COLUMN_STATS.getPartitionPath())) {
                fileInfoList = loadColumnRangesFromMetaIndex(affectedPartitionPathList, context, hoodieTable);
            }
            // fallback to loading column ranges from files
            if (isNullOrEmpty(fileInfoList)) {
                fileInfoList = loadColumnRangesFromFiles(affectedPartitionPathList, context, hoodieTable);
            }
        } else {
            fileInfoList = getFileInfoForLatestBaseFiles(affectedPartitionPathList, context, hoodieTable);
        }
    
        return fileInfoList;
    }
    

    explodeRecordsWithFileComparisons根据getBloomIndexFileInfoForPartitions的结果,找到这些record可能属于的文件。

    HoodiePairData<HoodieFileGroupId, String> explodeRecordsWithFileComparisons(
        final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
        HoodiePairData<String, String> partitionRecordKeyPairs) {
        // 对应hoodie.bloom.index.use.treebased.filter配置项,默认启用,表示使用区间树查找数据可能位于的文件
        // 否则使用链表遍历,效率较低
        IndexFileFilter indexFileFilter =
            config.useBloomIndexTreebasedFilter() ? new IntervalTreeBasedIndexFileFilter(partitionToFileIndexInfo)
            : new ListBasedIndexFileFilter(partitionToFileIndexInfo);
    
        return partitionRecordKeyPairs.map(partitionRecordKeyPair -> {
            String recordKey = partitionRecordKeyPair.getRight();
            String partitionPath = partitionRecordKeyPair.getLeft();
    
            // 查找record所在位置
            return indexFileFilter.getMatchingFilesAndPartition(partitionPath, recordKey)
                .stream()
                .map(partitionFileIdPair ->
                     new ImmutablePair<>(
                         new HoodieFileGroupId(partitionFileIdPair.getLeft(), partitionFileIdPair.getRight()), recordKey));
        })
            .flatMapToPair(Stream::iterator);
    }
    

    ListBasedHoodieBloomIndexHelper

    由于前面根据record key范围定位出的数据位置存在很大的误判的可能性,这里调用ListBasedHoodieBloomIndexHelperfindMatchingFilesForRecordKeys方法,使用数据文件对应的bloom filter过滤出数据文件中真正存在的record。

    @Override
    public HoodiePairData<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
        HoodieWriteConfig config, HoodieEngineContext context, HoodieTable hoodieTable,
        HoodiePairData<String, String> partitionRecordKeyPairs,
        HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs,
        Map<String, List<BloomIndexFileInfo>> partitionToFileInfo, Map<String, Long> recordsPerPartition) {
        // 将对应关系按照HoodieFileGroupId排序
        List<Pair<HoodieFileGroupId, String>> fileComparisonPairList =
            fileComparisonPairs.collectAsList().stream()
            .sorted(Comparator.comparing(Pair::getLeft)).collect(toList());
    
        // HoodieBloomIndexCheckFunction使用数据文件的bloom filter,逐个确认数据是否真正在对应的数据文件中
        List<HoodieKeyLookupResult> keyLookupResults =
            CollectionUtils.toStream(
            new HoodieBloomIndexCheckFunction<Pair<HoodieFileGroupId, String>>(hoodieTable, config, Pair::getLeft, Pair::getRight)
            .apply(fileComparisonPairList.iterator())
        )
            .flatMap(Collection::stream)
            .filter(lr -> lr.getMatchingRecordKeys().size() > 0)
            .collect(toList());
    
        // 组装返回结果
        return context.parallelize(keyLookupResults).flatMap(lookupResult ->
                                                             lookupResult.getMatchingRecordKeys().stream()
                                                             .map(recordKey -> new ImmutablePair<>(lookupResult, recordKey)).iterator()
                                                            ).mapToPair(pair -> {
            HoodieKeyLookupResult lookupResult = pair.getLeft();
            String recordKey = pair.getRight();
            return new ImmutablePair<>(
                new HoodieKey(recordKey, lookupResult.getPartitionPath()),
                new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId()));
        });
    }
    

    使用bloom filter检查文件是否真正包含某条record的逻辑位于HoodieBloomIndexCheckFunction

    HoodieBloomIndexCheckFunction

    HoodieBloomIndexCheckFunction使用数据文件对应的bloom filter来判断record是否真的在数据文件中。

    HoodieBloomIndexCheckFunction::apply方法返回的是前面file group id和record key的对应关系集合的迭代器。

    @Override
    public Iterator<List<HoodieKeyLookupResult>> apply(Iterator<I> fileGroupIdRecordKeyPairIterator) {
        return new LazyKeyCheckIterator(fileGroupIdRecordKeyPairIterator);
    }
    

    在后面flatmap处理这个迭代器转换成的stream的时候,会调用LazyKeyCheckIterator::computeNext方法。内容如下:

    @Override
    protected List<HoodieKeyLookupResult> computeNext() {
    
        List<HoodieKeyLookupResult> ret = new ArrayList<>();
        try {
            // process one file in each go.
            // 一轮处理一个数据文件
            // 查询一个数据文件的bloom filter
            while (inputItr.hasNext()) {
                I tuple = inputItr.next();
    
                // 找出record key和它可能所在的数据文件位置信息
                HoodieFileGroupId fileGroupId = fileGroupIdExtractor.apply(tuple);
                String recordKey = recordKeyExtractor.apply(tuple);
    
                String fileId = fileGroupId.getFileId();
                String partitionPath = fileGroupId.getPartitionPath();
    
                Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, fileId);
    
                // lazily init state
                // 创建查找handle,查找partitionPathFilePair对应的bloom filter
                if (keyLookupHandle == null) {
                    keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
                }
    
                // if continue on current file
                // HoodieKeyLookupHandle是懒创建策略,只有在需要检索下一个数据文件的时候才会创建新的handle
                if (keyLookupHandle.getPartitionPathFileIDPair().equals(partitionPathFilePair)) {
                    // 如果bloom filter包含该record key,说明该record key对应的记录可能会在这个文件中
                    // 在结果中加入这个record
                    keyLookupHandle.addKey(recordKey);
                } else {
                    // do the actual checking of file & break out
                    // 当处理到下个文件的时候,检查可能包含之前文件中的记录是否真的存在,因为bloom filter存在假阳现象
                    // 在返回结果之前,需要去数据文件中检查这些key是否真的在数据文件中存在,具体逻辑在getLookupResult方法中
                    // 将检验过后的返回结果添加到ret中
                    ret.add(keyLookupHandle.getLookupResult());
                    // 然后创建新的HoodieKeyLookupHandle用来使用下一个数据文件对应的bloom filter
                    keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
                    // 这里和上面相同,使用bloom filter判断record是否可能在这个数据文件中
                    keyLookupHandle.addKey(recordKey);
                    break;
                }
            }
    
            // handle case, where we ran out of input, close pending work, update return val
            if (!inputItr.hasNext()) {
                ret.add(keyLookupHandle.getLookupResult());
            }
        } catch (Throwable e) {
            if (e instanceof HoodieException) {
                throw (HoodieException) e;
            }
    
            throw new HoodieIndexException("Error checking bloom filter index. ", e);
        }
    
        return ret;
    }
    

    在迭代的时候针对每一个数据文件的bloom filter,会创建出专门的HoodieKeyLookupHandle来处理。

    HoodieKeyLookupHandle

    HoodieKeyLookupHandle方法负责通过数据文件对应的bloom filter,检查数据文件是否包含某个record。

    HoodieKeyLookupHandle::addKey方法判断bloom filter是否可能包含record key。如果可能包含,加入到候选列表candidateRecordKeys中。

    public void addKey(String recordKey) {
        // check record key against bloom filter of current file & add to possible keys if needed
        // 使用bloom filter判断是否可能包含该record key
        if (bloomFilter.mightContain(recordKey)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Record key " + recordKey + " matches bloom filter in  " + partitionPathFileIDPair);
            }
            // 加入到候选中
            candidateRecordKeys.add(recordKey);
        }
        // 总共检查的key计数值 +1
        totalKeysChecked++;
    }
    

    HoodieKeyLookupHandle::getLookupResult方法将candidateRecordKeys中的record key代入数据文件中校验是否真的存在。因为bloom filter存在一定概率假阳性,需要再次检查。

    public HoodieKeyLookupResult getLookupResult() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("#The candidate row keys for " + partitionPathFileIDPair + " => " + candidateRecordKeys);
        }
    
        // 获取最新版本的base file
        HoodieBaseFile baseFile = getLatestBaseFile();
        // 检查这些record key是否真的在base file中,返回base file真实包含的record key
        List<String> matchingKeys = HoodieIndexUtils.filterKeysFromFile(new Path(baseFile.getPath()), candidateRecordKeys,
                                                                        hoodieTable.getHadoopConf());
        LOG.info(
            String.format("Total records (%d), bloom filter candidates (%d)/fp(%d), actual matches (%d)", totalKeysChecked,
                          candidateRecordKeys.size(), candidateRecordKeys.size() - matchingKeys.size(), matchingKeys.size()));
        // 组装成查找结果并返回
        // 包含文件(partition path,file id和commit time)同包含的record key的对应关系
        return new HoodieKeyLookupResult(partitionPathFileIDPair.getRight(), partitionPathFileIDPair.getLeft(),
                                         baseFile.getCommitTime(), matchingKeys);
    }
    

    最后我们跟踪一下bloom filter是从哪里读取的。查看getBloomFilter方法,内容如下:

    private BloomFilter getBloomFilter() {
        BloomFilter bloomFilter = null;
        HoodieTimer timer = HoodieTimer.start();
        try {
            // 如果启用hoodie.bloom.index.use.metadata(默认启用)并且metadata table存放的有bloom filter
            // 从metadata table加载bloom filter,可以提高速度
            if (config.getBloomIndexUseMetadata()
                && hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions()
                .contains(BLOOM_FILTERS.getPartitionPath())) {
                bloomFilter = hoodieTable.getMetadataTable().getBloomFilter(partitionPathFileIDPair.getLeft(), partitionPathFileIDPair.getRight())
                    .orElseThrow(() -> new HoodieIndexException("BloomFilter missing for " + partitionPathFileIDPair.getRight()));
            } else {
                // 否则读取数据文件中的bloom filter
                try (HoodieFileReader reader = createNewFileReader()) {
                    bloomFilter = reader.readBloomFilter();
                }
            }
        } catch (IOException e) {
            throw new HoodieIndexException(String.format("Error reading bloom filter from %s", getPartitionPathFileIDPair()), e);
        }
        LOG.info(String.format("Read bloom filter from %s in %d ms", partitionPathFileIDPair, timer.endTimer()));
        return bloomFilter;
    }
    

    Hudi建议将bloom filter统一保存在metadata table中,可以提高加载速度。

    相关文章

      网友评论

        本文标题:Hudi 源码之Bloom Index

        本文链接:https://www.haomeiwen.com/subject/apfdwdtx.html