美文网首页
浅谈Deletion Vector及其在StarRocks中的应

浅谈Deletion Vector及其在StarRocks中的应

作者: LittleMagic | 来源:发表于2024-03-25 20:18 被阅读0次

    前言

    我们知道,列式存储的数据组织形式使得它适合海量数据在线查询、分析的场景,而写入性能相对于读取性能似乎并不那么重要(传统的ORC / Parquet on Hive方案就可以满足多数小时级到天级新鲜度的需求)。但是实际业务的发展是无止境的,越来越多的看板、报表要求分钟级甚至秒级刷新,对数据系统的实时性提出了极大挑战。当然,原始的列存格式并不支持行存风格的in-place update,高频更新势必会造成严重的写放大,这就需要一些trick来解决这个痛点。Deletion Vector就是其中之一,下面简要介绍。

    Deletion Vector

    Deletion Vector是在OLAP数据库、数据湖等系统中的一种优化设计。顾名思义,Deletion Vector是记录删除标记的向量(本质是位图),用于标记一个特定版本的列存文件中哪些行的数据已经失效。这样,通过把UPDATE语义改写成DELETE + INSERT语义,就可以无需更新旧版本的列存文件,只是在写新版本数据的同时写一个小得多的位图文件而已,吞吐量大大提升了。

    下面的示意图来自Delta Lake,可见为了节省空间,Deletion Vector还可以用压缩位图结构(e.g. RoaringBitmap)来存储。

    下图示出更新一个Parquet文件中的两行数据的过程,注意file_a_dv_1.bin就是DV文件,且生成新版本的Parquet文件file_c.parquet

    而在读取数据时则是经典的Merge-on-Read流程:对于附带有DV的旧版本文件file_a.parquet,会根据DV过滤掉失效数据(相当于为查询增加一个虚拟谓词,图中为_skip_row_),并与新版本数据file_c.parquet做合并,产生最新的数据集。当然,Compaction过程也会根据DV过滤失效数据,并同时更新DV文件的标记。

    大家耳熟能详的数据湖四剑客中,Delta Lake和Iceberg的MOR都采用了DV设计(Iceberg有两种删除标记,DV方案称为Position Delete)。而Paimon则是在最新的0.8-SNAPSHOT版本中加入了DV支持,具体可以参考社区公众号的这篇文章。作为比较,Hudi MOR走了相对传统的Base File + Log Files的路线,很明显这种方案可以达到更高的写性能(Log是顺序写),但是Merge阶段要合并的Log Files数据较多,读性能有一定折扣。

    下面看看DV在StarRocks中是如何发挥作用的。

    DV Implementation in StarRocks

    StarRocks的主键模型表能够同时支持高效更新和查询,它的Tablet(即最小存储单元)的结构与传统的明细、聚合、更新模型有较大差别,示意图如下。

    其中,Rowset是列存文件,Meta是列存文件的元信息(版本、Delta等),DelVector就是DV。存算一体部署时,Meta和DV都持久化在RocksDB中;存算分离部署时,我们可以在文件系统中直接观察到扩展名为.delvec的文件,如下图。

    StarRocks代码中的DV数据结构名为DelVector,底层存储直接复用了C++ RoaringBitmap,部分操作代码如下,本质上是对Roaring容器的操作,简单易懂。

    void DelVector::_add_dels(const std::vector<uint32_t>& dels) {
        if (!_roaring) {
            _roaring = std::make_unique<Roaring>(dels.size(), dels.data());
        } else {
            _roaring->addMany(dels.size(), dels.data());
        }
        _update_stats();
    }
    
    void DelVector::add_dels_as_new_version(const std::vector<uint32_t>& dels, int64_t version,
                                            std::shared_ptr<DelVector>* pdelvec) const {
        CHECK(this != pdelvec->get());
        DelVectorPtr tmp(new DelVector());
        if (_roaring) {
            tmp->_roaring = std::make_unique<Roaring>(*_roaring);
        }
        tmp->_version = version;
        tmp->_loaded = true;
        tmp->_add_dels(dels);
        tmp.swap(*pdelvec);
    }
    
    Status DelVector::load(int64_t version, const char* data, size_t length) {
        if (length < 1) {
            return Status::Corruption("zero length");
        }
        if (*data != 0x01) {
            return Status::Corruption("invalid flag");
        }
        data += 1;
        length -= 1;
        _loaded = true;
        _version = version;
        if (length > 0) {
            _roaring = std::make_unique<Roaring>(Roaring::readSafe(data, length));
        }
        _update_stats();
        return Status::OK();
    }
    
    void DelVector::init(int64_t version, const uint32_t* data, size_t length) {
        _loaded = true;
        _version = version;
        if (length > 0) {
            _roaring = std::make_unique<Roaring>(length, data);
        }
        _update_stats();
    }
    
    string DelVector::save() const {
        string ret;
        auto roaring_size = _roaring ? _roaring->getSizeInBytes() : 0;
        ret.resize(roaring_size + 1);
        ret[0] = 0x01; // one byte flag.
        if (roaring_size > 0) {
            _roaring->write(ret.data() + 1);
        }
        return ret;
    }
    
    void DelVector::save_to(std::string* str) const {
        auto roaring_size = _roaring ? _roaring->getSizeInBytes() : 0;
        str->resize(roaring_size + 1);
        str->at(0) = 0x01; // one byte flag.
        if (roaring_size > 0) {
            _roaring->write(str->data() + 1);
        }
    }
    

    数据写入StarRocks主键模型分为Write和Commit两个阶段,DV在Commit阶段生成,如下图所示。

    对应的方法为TabletUpdates::_apply_normal_rowset_commit(),相关的部分逻辑节选如下。

        PrimaryIndex::DeletesMap new_deletes;
        // ............
        size_t ndelvec = new_deletes.size();
        vector<std::pair<uint32_t, DelVectorPtr>> new_del_vecs(ndelvec);
        size_t idx = 0;
        size_t old_total_del = 0;
        size_t new_del = 0;
        size_t total_del = 0;
        string delvec_change_info;
        for (auto& new_delete : new_deletes) {
            uint32_t rssid = new_delete.first;
            if (rssid >= rowset_id && rssid < rowset_id + rowset->num_segments()) {
                // it's newly added rowset's segment, do not have latest delvec yet
                new_del_vecs[idx].first = rssid;
                new_del_vecs[idx].second = std::make_shared<DelVector>();
                auto& del_ids = new_delete.second;
                new_del_vecs[idx].second->init(version.major_number(), del_ids.data(), del_ids.size());
                if (VLOG_IS_ON(1)) {
                    StringAppendF(&delvec_change_info, " %u:+%zu", rssid, del_ids.size());
                }
                new_del += del_ids.size();
                total_del += del_ids.size();
            } else {
                TabletSegmentId tsid;
                tsid.tablet_id = tablet_id;
                tsid.segment_id = rssid;
                DelVectorPtr old_del_vec;
                // TODO(cbl): should get the version before this apply version, to be safe
                st = manager->get_latest_del_vec(_tablet.data_dir()->get_meta(), tsid, &old_del_vec);
                if (!st.ok()) {
                    std::string msg = strings::Substitute("_apply_rowset_commit error: get_latest_del_vec failed: $0 $1",
                                                          st.to_string(), debug_string());
                    failure_handler(msg, false);
                    return;
                }
                new_del_vecs[idx].first = rssid;
                old_del_vec->add_dels_as_new_version(new_delete.second, version.major_number(),
                                                     &(new_del_vecs[idx].second));
                size_t cur_old = old_del_vec->cardinality();
                size_t cur_add = new_delete.second.size();
                size_t cur_new = new_del_vecs[idx].second->cardinality();
                if (cur_old + cur_add != cur_new) {
                    // should not happen, data inconsistent
                    LOG(FATAL) << strings::Substitute(
                            "delvec inconsistent tablet:$0 rssid:$1 #old:$2 #add:$3 #new:$4 old_v:$5 "
                            "v:$6",
                            _tablet.tablet_id(), rssid, cur_old, cur_add, cur_new, old_del_vec->version(),
                            version.major_number());
                }
                if (VLOG_IS_ON(1)) {
                    StringAppendF(&delvec_change_info, " %u:%zu(%ld)+%zu=%zu", rssid, cur_old, old_del_vec->version(),
                                  cur_add, cur_new);
                }
                old_total_del += cur_old;
                new_del += cur_add;
                total_del += cur_new;
            }
        // ............
        }
    

    在这段逻辑中,首先根据主键索引(PrimaryIndex)找到所有更新和删除数据的映射表,称为DeletesMap。接下来根据源RowSet ID rssid 确定这是否为一个新写入的Segment,如果是,只需直接生成新的DV,否则需要将旧的DV与更新数据的标记做合并,作为新版本DV,同时还要将新旧DV的基数做校验,防止数据损坏。

    至于Compaction过程,则是通过Compaction Policy和Score确定候选Rowsets并合并完成后,在Commit阶段生成新的DV并更新元数据,逻辑与写入基本相同,不再赘述。

    The End

    相关文章

      网友评论

          本文标题:浅谈Deletion Vector及其在StarRocks中的应

          本文链接:https://www.haomeiwen.com/subject/dvzdtjtx.html