美文网首页我爱编程
Mongodb事务模型分析

Mongodb事务模型分析

作者: 729f645884b1 | 来源:发表于2018-05-13 00:41 被阅读0次

本文对于Mongodb上层事务中会让人困惑的几点进行源码层面的分析

  1. mongodb 的写操作(insert/update/delete)提供的“单行一致性”的具体含义,如何做到的?
  2. 为何db.coll.count()在宕机崩溃后经常就不准了。
  3. mongodb 查询操作的事务隔离级别。

写操作的事务性

Mongodb的数据组织

在了解写操作的事务性之前,需要先了解mongo层的每一个table,是如何与wiredtiger层的table(btree)对应的。mongo层一个最简单的table包含一个 ObjectId(_id) 索引。_id类似于Mysql中主键的概念

rs1:PRIMARY> db.abc.getIndexes()
[
        {
                "v" : 1,
                "key" : {
                        "_id" : 1
                },
                "name" : "_id_",
                "ns" : "test.abc"
        }
]

但是mongo中并不会将_id索引与行内容存放在一起(即没有聚簇索引的概念)。取而代之的,mongodb将索引与数据分开存放,通过RecordId进行间接引用。 举例一张包含两个索引(_id 和 name)的表,在wt层将有三张表与其对应。通过name索引找到行记录的过程为:先通过name->Record的索引找到RecordId,再通过RecordId->RowData的索引找到记录内容。

数据组织

此外,一个Mongodb实例还包含一张记录对每一行的写操作的表local.oplog.rs, 该表主要用于复制(primary-secondary replication)。每一次(对实例中任何一张表的任何一行的)更新操作,都会产生唯一的一条oplog,记录在local.oplog.rs表里。

理解单行事务

mongodb对某一行的写操作,会产生三个动作

  • 对wt层的数据段btree(上图中的Data Ident)执行写操作
  • 对wt层索引段的每个索引btree执行写操作
  • 对oplog表执行写操作
    mongodb的单行事务,说的是:对数据,索引,oplog这三者的更新是原子的。不存在索引段中的某个RecordId,在数据段中找不到,也不存在一条记录的更改被应用,但是没有记录到oplog中, 反之亦然。
    从下面的代码可以看到,一个插入操作,更新数据,索引,以及Oplog的过程。
collection_impl.cpp
 332 Status CollectionImpl::insertDocuments(OperationContext* opCtx)
 370     Status status = _insertDocuments(opCtx, begin, end, enforceQuota, opDebug); // 更新数据和索引
 375     getGlobalServiceContext()->getOpObserver()->onInserts(opCtx, ns(), uuid(), begin, end, fromMigrate);    // 更新Oplog
 380     return Status::OK();
 381 }
 
 452 Status CollectionImpl::_insertDocuments(OperationContext* opCtx)
 489     _recordStore->insertRecords(opCtx, &records, &timestamps, _enforceQuota(enforceQuota)); // 更新数据
 493     std::vector<BsonRecord> bsonRecords;
 495     int recordIndex = 0;
 496     for (auto it = begin; it != end; it++) {
 497         RecordId loc = records[recordIndex++].id;
 501         BsonRecord bsonRecord = {loc, &(it->doc)};
 502         bsonRecords.push_back(bsonRecord);
 503     }
 505     int64_t keysInserted;
 506     status = _indexCatalog.indexRecords(opCtx, bsonRecords, &keysInserted);  // 更新所有索引
 511     return status;
 512 }

单行事务的实现

OperationContext与RecoveryUnit

客户端的每个请求(insert/update/delete/find/getmore),会生成一个唯一的OperationContext记录执行的上下文,OperationContext从请求解析时创建,到请求执行完成时释放。一般情况下,其生命周期等同于一个操作执行的生命周期。OperationContext创建时,会初始化RecoveryUnit。

service_context_d.cpp:288
288 std::unique_ptr<OperationContext> ServiceContextMongoD::_newOpCtx(Client* client, unsigned opId) {
290     auto opCtx = stdx::make_unique<OperationContext>(client, opId);
298     opCtx->setRecoveryUnit(getGlobalStorageEngine()->newRecoveryUnit(),
299                            OperationContext::kNotInUnitOfWork);
300     return opCtx;
301 }

RecoveryUnit封装了wiredTiger层的事务。RecoveryUnit::_txnOpen 对应于WT层的beginTransaction。 RecoveryUnit::_txnClose封装了WT层的commit_transaction和rollback_transaction。

  • beginTransaction
wiredtiger_recovery_unit.cpp
253 void WiredTigerRecoveryUnit::_txnOpen() {
283         invariantWTOK(session->begin_transaction(session, NULL));
287     _active = true;
288 }
  • commit/rollback
wiredtiger_recovery_unit.cpp
192 void WiredTigerRecoveryUnit::_txnClose(bool commit) {
204     if (commit) {
211         wtRet = s->commit_transaction(s, NULL);
213     } else {
214         wtRet = s->rollback_transaction(s, NULL);
215         invariant(!wtRet);
217     }
225     _active = false;
228 }

WriteUnitOfWork

WriteUnitOfWork 是事务框架提供给server层,方便执行事务的API。它是对OperationContext和RecoveryUnit的封装。

class WriteUnitOfWork {
    WriteUnitOfWork(OperationContext* opCtx) {
            _opCtx->recoveryUnit()->beginUnitOfWork(_opCtx);
    }   

    ~WriteUnitOfWork() {
            _opCtx->recoveryUnit()->abortUnitOfWork();
    }   
}
server层执行一个写操作的事务
mongo/db/exec/update.cpp
469         WriteUnitOfWork wunit(getOpCtx());
472         uassertStatusOK(_collection->insertDocument(getOpCtx(),
473                                                     InsertStatement(request->getStmtId(), newObj),
474                                                     _params.opDebug,
475                                                     enforceQuota,
476                                                     request->isFromMigration()));
480         wunit.commit();

总结

简而言之,对一行记录的更改,涉及到数据,索引,和Oplog三者,在wiredTiger层,这样的更改对应于对多张表的更改。Mongodb通过实现事务框架(RecoveryUnit,OperationContext, WriteUnitOfWork)将细节封装。但归根结底非常简单,依然是教科书般的

  • begin_transaction
  • do writes
  • end_transaction(commit/rollback)

这一套。

下图是对上面的代码分析整理的调用层次关系。

事务层次

表记录数的更新

对于数据库,我们希望,插入一条数据,记录数加一,删除一条记录,记录数减一。因为这是极其自然的事情。Mongodb在大多数情况下的确可以保证记录数的一致性。但是在异常恢复(死机重启,OOM重启,kill -9)下,很容易产生 db.coll.count()和表真实记录数不一致的情况。
其原因在于表记录数的更新独立于数据更新的事务之外。这么做的主要目的在于:减少写放大

减少写放大

所有表的辅助信息(count和storageSize)都是以key-value的形式,存储在wiredTiger中的。对任何一行的更改,都会引起表的size和count的变化,size和count如果要做到和DML操作完全一致,只能将其放到上文中提到的begin_transaction/commit 事务中,造成显著的写放大。因此,不仅仅是mongodb,很多数据库引擎(比如mysql的myisam引擎)都选择将辅助信息额外存放,额外更新。

WiredTigerSizeStorer

mongodb使用WiredTigerSizeStorer做表的辅助信息的内存缓存。DML操作引起的辅助信息变化,不会直接反馈到WiredTiger层。而是cache在内存里,标记为dirty。db.coll.count()操作也只是读内存数据。

121 void WiredTigerSizeStorer::storeToCache(StringData uri, long long numRecords, long long dataSize) {
123     stdx::lock_guard<stdx::mutex> lk(_entriesMutex);
124     Entry& entry = _entries[uri.toString()];
125     entry.numRecords = numRecords;
126     entry.dataSize = dataSize;
127     entry.dirty = true;
128 }
129 
130 void WiredTigerSizeStorer::loadFromCache(StringData uri,
131                                          long long* numRecords,
132                                          long long* dataSize) const {
134     stdx::lock_guard<stdx::mutex> lk(_entriesMutex);
135     Map::const_iterator it = _entries.find(uri.toString());
136     if (it == _entries.end()) {
137         *numRecords = 0;
138         *dataSize = 0;
139         return;
140     }
141     *numRecords = it->second.numRecords;
142     *dataSize = it->second.dataSize;
143 }

内存数据的落地通过_sizeStorerSyncTracker(cs, 100000, Seconds(60))定时器触发。每隔60秒同步一次。
将dirty entry更新到wt层,并根据参数决定是否进行journal flush。更新完毕后将所有entry的diry flag 标记为false。代码在WiredTigerSizeStorer::syncCache方法中。

Mongodb查询操作的一致性

Mongodb的查询操作,由最基本的find command getmore command 组成。 find command返回firstBatch和cursorId。若是范围查循,则客户端driver通过cursorId组装getmore 命令,对游标进行迭代。
此外,mongodb还提供readConcern,若对某个节点执行
readConcern=majority
的读操作,可以读到同步到集群大多数节点的数据(
潜在含义是:可以读到某个节点上的某一行数据的历史版本
)。
再其次,find command本身读到的数据,是否是
PIT(point in time) Consistent
的? 多个getmore读到的数据之间,是否是PIT Consistent**的?
带着上面的问题,我们尝试从代码里找到答案。

事务隔离级别简介

首先,我们需要知道,数据库常用的隔离级别有

  • Read UnCommitted
  • Read Committed
  • Repeatable Read
  • Serializable

这个概念在很多数据库教材(特别是讲Mysql的书)中都有提到。一般书中很少提到一种介于Repeatable Read与Serializable 之间的隔离级别,即Snapshot Isolation。这部分内容很宏大,请参考wiki(Snapshot Isolation)。WiredTiger 提供Snapshot Isolation 隔离级别。Mongodb的事务,特别是readConcern=majority情况下,须要读某行数据的历史版本的能力,依赖该隔离级别。

committedSnapshot 与 majority ReadConcern

mongodb每次成功对wiredTiger层进行commit_transaction,都使得数据库生成一个Snapshot,得益于WiredTiger的Project Timestamp, Mongodb抛弃了[3.0--3.4]中使用WT 的named snapshot的方式,而直接使用Mongo层的OpTime,一个64位整数,通过WT_CONNECTION.set_timestamp 接口,更自然的将Snapshot和OplogTime对应。

CommittedSnapshot的含义:在某个节点自身的POV(point of view)下,被大多数节点同步到的oplog中,opTime最大的Oplog,其对应的Snapshot被称为 CommittedSnapshot。如下图:
在C节点自身的POV下,A,B,C 三个节点的复制进度如下:


committedSnapshot
  • OpTime=3的Snapshot,是A节点的CommittedSnapshot,该Snapshot被大多数节点复制。
  • OpTime=2的Snapshot,不是A节点的CommittedSnapshot,它虽然被大多数节点复制,但是它的OpTime不是最大的。
  • OpTime=4的Snapshot,不是A节点的CommittedSnapshot,它没有被大多数节点复制。

在该时刻,从C节点,以readConcern=majority的方式,读到y=3

由于分布式的特性,每个节点(Primary or Secondary)的复制快慢不一。而某个节点的POV下,其他节点的复制进度,是通过节点之间的心跳广播lastOpTime得到的。

bool TopologyCoordinator::updateLastCommittedOpTime() {
    std::vector<OpTime> votingNodesOpTimes;
    for (const auto& memberData : _memberData) {
        int memberIndex = memberData.getConfigIndex();
        invariant(memberIndex >= 0);
        const auto& memberConfig = _rsConfig.getMemberAt(memberIndex);
        if (memberConfig.isVoter()) {
            const auto opTime = useDurableOpTime ? memberData.getLastDurableOpTime()
                                                 : memberData.getLastAppliedOpTime();
            votingNodesOpTimes.push_back(opTime);
        }    
    }    
    std::sort(votingNodesOpTimes.begin(), votingNodesOpTimes.end());
    // need the majority to have this OpTime
    OpTime committedOpTime =
        votingNodesOpTimes[votingNodesOpTimes.size() - _rsConfig.getWriteMajority()];
    return advanceLastCommittedOpTime(committedOpTime);
}

find和getmore的PIT consistent保证

Mongodb默认使用WiredTiger提供的SnapshotIsolation隔离级别。证据如下:

wiredtiger_session_cache.cpp
 81 WiredTigerSession::WiredTigerSession(WT_CONNECTION* conn)
 91     invariantWTOK(conn->open_session(conn, NULL, "isolation=snapshot", &_session));
 92 }

根据先前的分析,我们知道,每一个查询都会attach在一个WiredTiger层面的Snapshot上,如果一个查询开始后没有释放Snapshot重新申请,那么它就能保证Snapshot Isolation的读事务隔离级别,能保证PIT(of query begin time)的一致性。如果做不到这点,就只能保证Read Committed的隔离性。
带着上面的理论指导,我们继续探讨。Mongodb在查询的执行过程中,何时有可能释放掉Snapshot呢,这里需要牵扯到YieldPolicy。

YieldPolicy

我们知道,mongodb 在执行一个耗时较长的查询时,可以通过db.killOp()命令结束。 它是通过YieldPolicy做到这点的。具体到查询而言,查询使用YieldAuto Policy。所谓YieldAuto,是指查询请求会运行一段时间(可配置)后让出CPU时间片,并检测自己是否被killOp命令kill掉。这是一种典型的协作式调度策略。

 59 PlanYieldPolicy::PlanYieldPolicy(PlanExecutor::YieldPolicy policy, ClockSource* cs)
 62       :_elapsedTracker(cs,
 63                       internalQueryExecYieldIterations.load(),
 64                       Milliseconds(internalQueryExecYieldPeriodMS.load())),
 65       _planYielding(nullptr) {}
 66 
 67 bool PlanYieldPolicy::shouldYieldOrInterrupt() {
 71     return shouldYield();
 72 }
 73 
 74 bool PlanYieldPolicy::shouldYield() {
 75     if (!canAutoYield())
 76         return false;
 80     return _elapsedTracker.intervalHasElapsed();
 81 }

上面的代码中,internalQueryExecYieldPeriodMS 可以通过setParameter 命令配置,控制查询命令的AutoYield的间隔,默认10毫秒。
Mongodb在一个执行计划被Yield出去之后,执行清理工作。 首先检查是否被killOp命令杀掉了,如果没有被杀掉,会通过yieldAllLocks暂时让出锁资源。

Status PlanYieldPolicy::yield(stdx::function<void()> beforeYieldingFn,
                              stdx::function<void()> whileYieldingFn) {
            // 检查是否被kill掉了
            if (_policy == PlanExecutor::YIELD_AUTO) {
                auto interruptStatus = opCtx->checkForInterruptNoAssert();
                if (!interruptStatus.isOK()) {
                    return interruptStatus;
                }   
            }   
            planYielding->saveState();
            QueryYield::yieldAllLocks(opCtx, whileYieldingFn, _planYielding->nss());
    }   

yieldAllLocks

yieldAllLocks函数就是最终的答案,AutoYield的过程中,会通过opCtx->recoveryUnit()->abandonSnapshot()释放掉执行计划中的Snapshot。

void QueryYield::yieldAllLocks(OperationContext* opCtx,
                               stdx::function<void()> whileYieldingFn,
                               const NamespaceString& planExecNS) {
    // Things have to happen here in a specific order:
    //   * Release lock mgr locks
    //   * Go to sleep
    //   * Call the whileYieldingFn
    //   * Reacquire lock mgr locks

    Locker* locker = opCtx->lockState();
    
    Locker::LockSnapshot snapshot;
    
    // Nothing was unlocked, just return, yielding is pointless.
    if (!locker->saveLockStateAndUnlock(&snapshot)) {
        return;
    }

    // Top-level locks are freed, release any potential low-level (storage engine-specific
    // locks). If we are yielding, we are at a safe place to do so.
    opCtx->recoveryUnit()->abandonSnapshot();
    
    // Track the number of yields in CurOp.
    CurOp::get(opCtx)->yielded();
                           
    MONGO_FAIL_POINT_PAUSE_WHILE_SET(setYieldAllLocksHang);
    
    MONGO_FAIL_POINT_BLOCK(setYieldAllLocksWait, customWait) {
        const BSONObj& data = customWait.getData();
        BSONElement customWaitNS = data["namespace"];
        if (!customWaitNS || planExecNS.ns() == customWaitNS.str()) {
            sleepFor(Milliseconds(data["waitForMillis"].numberInt()));
        }   
    }       
            
    if (whileYieldingFn) {
        whileYieldingFn();
    }   
        
    locker->restoreLockState(snapshot);
}   

总结

Mongodb使用WiredTiger提供的SnapshotIsolation 隔离级别。但不代表Mongodb的查询是该隔离级别。Mongodb的查询过程中,会阶段性的将过程Yield出去,一方面是为了检测过程是否已经被终止,一方面是为了让出时间片给其它线程执行。而Yield出去的查询,会连带释放掉WiredTiger层的Snapshot。因此,Mongodb的查询操作的事务隔离级别是Read-Committed隔离级别的。

相关文章

网友评论

    本文标题:Mongodb事务模型分析

    本文链接:https://www.haomeiwen.com/subject/qejkdftx.html