本文对于Mongodb上层事务中会让人困惑的几点进行源码层面的分析
- mongodb 的写操作(insert/update/delete)提供的“单行一致性”的具体含义,如何做到的?
- 为何db.coll.count()在宕机崩溃后经常就不准了。
- mongodb 查询操作的事务隔离级别。
写操作的事务性
Mongodb的数据组织
在了解写操作的事务性之前,需要先了解mongo层的每一个table,是如何与wiredtiger层的table(btree)对应的。mongo层一个最简单的table包含一个 ObjectId(_id) 索引。_id类似于Mysql中主键的概念
rs1:PRIMARY> db.abc.getIndexes()
[
{
"v" : 1,
"key" : {
"_id" : 1
},
"name" : "_id_",
"ns" : "test.abc"
}
]
但是mongo中并不会将_id索引与行内容存放在一起(即没有聚簇索引的概念)。取而代之的,mongodb将索引与数据分开存放,通过RecordId进行间接引用。 举例一张包含两个索引(_id 和 name)的表,在wt层将有三张表与其对应。通过name索引找到行记录的过程为:先通过name->Record的索引找到RecordId,再通过RecordId->RowData的索引找到记录内容。
此外,一个Mongodb实例还包含一张记录对每一行的写操作的表local.oplog.rs, 该表主要用于复制(primary-secondary replication)。每一次(对实例中任何一张表的任何一行的)更新操作,都会产生唯一的一条oplog,记录在local.oplog.rs表里。
理解单行事务
mongodb对某一行的写操作,会产生三个动作
- 对wt层的数据段btree(上图中的Data Ident)执行写操作
- 对wt层索引段的每个索引btree执行写操作
- 对oplog表执行写操作
mongodb的单行事务,说的是:对数据,索引,oplog这三者的更新是原子的。不存在索引段中的某个RecordId,在数据段中找不到,也不存在一条记录的更改被应用,但是没有记录到oplog中, 反之亦然。
从下面的代码可以看到,一个插入操作,更新数据,索引,以及Oplog的过程。
collection_impl.cpp
332 Status CollectionImpl::insertDocuments(OperationContext* opCtx)
370 Status status = _insertDocuments(opCtx, begin, end, enforceQuota, opDebug); // 更新数据和索引
375 getGlobalServiceContext()->getOpObserver()->onInserts(opCtx, ns(), uuid(), begin, end, fromMigrate); // 更新Oplog
380 return Status::OK();
381 }
452 Status CollectionImpl::_insertDocuments(OperationContext* opCtx)
489 _recordStore->insertRecords(opCtx, &records, ×tamps, _enforceQuota(enforceQuota)); // 更新数据
493 std::vector<BsonRecord> bsonRecords;
495 int recordIndex = 0;
496 for (auto it = begin; it != end; it++) {
497 RecordId loc = records[recordIndex++].id;
501 BsonRecord bsonRecord = {loc, &(it->doc)};
502 bsonRecords.push_back(bsonRecord);
503 }
505 int64_t keysInserted;
506 status = _indexCatalog.indexRecords(opCtx, bsonRecords, &keysInserted); // 更新所有索引
511 return status;
512 }
单行事务的实现
OperationContext与RecoveryUnit
客户端的每个请求(insert/update/delete/find/getmore),会生成一个唯一的OperationContext记录执行的上下文,OperationContext从请求解析时创建,到请求执行完成时释放。一般情况下,其生命周期等同于一个操作执行的生命周期。OperationContext创建时,会初始化RecoveryUnit。
service_context_d.cpp:288
288 std::unique_ptr<OperationContext> ServiceContextMongoD::_newOpCtx(Client* client, unsigned opId) {
290 auto opCtx = stdx::make_unique<OperationContext>(client, opId);
298 opCtx->setRecoveryUnit(getGlobalStorageEngine()->newRecoveryUnit(),
299 OperationContext::kNotInUnitOfWork);
300 return opCtx;
301 }
RecoveryUnit封装了wiredTiger层的事务。RecoveryUnit::_txnOpen 对应于WT层的beginTransaction。 RecoveryUnit::_txnClose封装了WT层的commit_transaction和rollback_transaction。
- beginTransaction
wiredtiger_recovery_unit.cpp
253 void WiredTigerRecoveryUnit::_txnOpen() {
283 invariantWTOK(session->begin_transaction(session, NULL));
287 _active = true;
288 }
- commit/rollback
wiredtiger_recovery_unit.cpp
192 void WiredTigerRecoveryUnit::_txnClose(bool commit) {
204 if (commit) {
211 wtRet = s->commit_transaction(s, NULL);
213 } else {
214 wtRet = s->rollback_transaction(s, NULL);
215 invariant(!wtRet);
217 }
225 _active = false;
228 }
WriteUnitOfWork
WriteUnitOfWork 是事务框架提供给server层,方便执行事务的API。它是对OperationContext和RecoveryUnit的封装。
class WriteUnitOfWork {
WriteUnitOfWork(OperationContext* opCtx) {
_opCtx->recoveryUnit()->beginUnitOfWork(_opCtx);
}
~WriteUnitOfWork() {
_opCtx->recoveryUnit()->abortUnitOfWork();
}
}
server层执行一个写操作的事务
mongo/db/exec/update.cpp
469 WriteUnitOfWork wunit(getOpCtx());
472 uassertStatusOK(_collection->insertDocument(getOpCtx(),
473 InsertStatement(request->getStmtId(), newObj),
474 _params.opDebug,
475 enforceQuota,
476 request->isFromMigration()));
480 wunit.commit();
总结
简而言之,对一行记录的更改,涉及到数据,索引,和Oplog三者,在wiredTiger层,这样的更改对应于对多张表的更改。Mongodb通过实现事务框架(RecoveryUnit,OperationContext, WriteUnitOfWork)将细节封装。但归根结底非常简单,依然是教科书般的
- begin_transaction
- do writes
- end_transaction(commit/rollback)
这一套。
下图是对上面的代码分析整理的调用层次关系。
事务层次表记录数的更新
对于数据库,我们希望,插入一条数据,记录数加一,删除一条记录,记录数减一。因为这是极其自然的事情。Mongodb在大多数情况下的确可以保证记录数的一致性。但是在异常恢复(死机重启,OOM重启,kill -9)下,很容易产生 db.coll.count()和表真实记录数不一致的情况。
其原因在于表记录数的更新独立于数据更新的事务之外。这么做的主要目的在于:减少写放大。
减少写放大
所有表的辅助信息(count和storageSize)都是以key-value的形式,存储在wiredTiger中的。对任何一行的更改,都会引起表的size和count的变化,size和count如果要做到和DML操作完全一致,只能将其放到上文中提到的begin_transaction/commit 事务中,造成显著的写放大。因此,不仅仅是mongodb,很多数据库引擎(比如mysql的myisam引擎)都选择将辅助信息额外存放,额外更新。
WiredTigerSizeStorer
mongodb使用WiredTigerSizeStorer做表的辅助信息的内存缓存。DML操作引起的辅助信息变化,不会直接反馈到WiredTiger层。而是cache在内存里,标记为dirty。db.coll.count()操作也只是读内存数据。
121 void WiredTigerSizeStorer::storeToCache(StringData uri, long long numRecords, long long dataSize) {
123 stdx::lock_guard<stdx::mutex> lk(_entriesMutex);
124 Entry& entry = _entries[uri.toString()];
125 entry.numRecords = numRecords;
126 entry.dataSize = dataSize;
127 entry.dirty = true;
128 }
129
130 void WiredTigerSizeStorer::loadFromCache(StringData uri,
131 long long* numRecords,
132 long long* dataSize) const {
134 stdx::lock_guard<stdx::mutex> lk(_entriesMutex);
135 Map::const_iterator it = _entries.find(uri.toString());
136 if (it == _entries.end()) {
137 *numRecords = 0;
138 *dataSize = 0;
139 return;
140 }
141 *numRecords = it->second.numRecords;
142 *dataSize = it->second.dataSize;
143 }
内存数据的落地通过_sizeStorerSyncTracker(cs, 100000, Seconds(60))定时器触发。每隔60秒同步一次。
将dirty entry更新到wt层,并根据参数决定是否进行journal flush。更新完毕后将所有entry的diry flag 标记为false。代码在WiredTigerSizeStorer::syncCache方法中。
Mongodb查询操作的一致性
Mongodb的查询操作,由最基本的find command 和 getmore command 组成。 find command返回firstBatch和cursorId。若是范围查循,则客户端driver通过cursorId组装getmore 命令,对游标进行迭代。
此外,mongodb还提供readConcern,若对某个节点执行readConcern=majority的读操作,可以读到同步到集群大多数节点的数据(潜在含义是:可以读到某个节点上的某一行数据的历史版本)。
再其次,find command本身读到的数据,是否是PIT(point in time) Consistent的? 多个getmore读到的数据之间,是否是PIT Consistent**的?
带着上面的问题,我们尝试从代码里找到答案。
事务隔离级别简介
首先,我们需要知道,数据库常用的隔离级别有
- Read UnCommitted
- Read Committed
- Repeatable Read
- Serializable
这个概念在很多数据库教材(特别是讲Mysql的书)中都有提到。一般书中很少提到一种介于Repeatable Read与Serializable 之间的隔离级别,即Snapshot Isolation。这部分内容很宏大,请参考wiki(Snapshot Isolation)。WiredTiger 提供Snapshot Isolation 隔离级别。Mongodb的事务,特别是readConcern=majority情况下,须要读某行数据的历史版本的能力,依赖该隔离级别。
committedSnapshot 与 majority ReadConcern
mongodb每次成功对wiredTiger层进行commit_transaction,都使得数据库生成一个Snapshot,得益于WiredTiger的Project Timestamp, Mongodb抛弃了[3.0--3.4]中使用WT 的named snapshot的方式,而直接使用Mongo层的OpTime,一个64位整数,通过WT_CONNECTION.set_timestamp 接口,更自然的将Snapshot和OplogTime对应。
CommittedSnapshot的含义:在某个节点自身的POV(point of view)下,被大多数节点同步到的oplog中,opTime最大的Oplog,其对应的Snapshot被称为 CommittedSnapshot。如下图:
在C节点自身的POV下,A,B,C 三个节点的复制进度如下:
committedSnapshot
- OpTime=3的Snapshot,是A节点的CommittedSnapshot,该Snapshot被大多数节点复制。
- OpTime=2的Snapshot,不是A节点的CommittedSnapshot,它虽然被大多数节点复制,但是它的OpTime不是最大的。
- OpTime=4的Snapshot,不是A节点的CommittedSnapshot,它没有被大多数节点复制。
在该时刻,从C节点,以readConcern=majority的方式,读到y=3
由于分布式的特性,每个节点(Primary or Secondary)的复制快慢不一。而某个节点的POV下,其他节点的复制进度,是通过节点之间的心跳广播lastOpTime得到的。
bool TopologyCoordinator::updateLastCommittedOpTime() {
std::vector<OpTime> votingNodesOpTimes;
for (const auto& memberData : _memberData) {
int memberIndex = memberData.getConfigIndex();
invariant(memberIndex >= 0);
const auto& memberConfig = _rsConfig.getMemberAt(memberIndex);
if (memberConfig.isVoter()) {
const auto opTime = useDurableOpTime ? memberData.getLastDurableOpTime()
: memberData.getLastAppliedOpTime();
votingNodesOpTimes.push_back(opTime);
}
}
std::sort(votingNodesOpTimes.begin(), votingNodesOpTimes.end());
// need the majority to have this OpTime
OpTime committedOpTime =
votingNodesOpTimes[votingNodesOpTimes.size() - _rsConfig.getWriteMajority()];
return advanceLastCommittedOpTime(committedOpTime);
}
find和getmore的PIT consistent保证
Mongodb默认使用WiredTiger提供的SnapshotIsolation隔离级别。证据如下:
wiredtiger_session_cache.cpp
81 WiredTigerSession::WiredTigerSession(WT_CONNECTION* conn)
91 invariantWTOK(conn->open_session(conn, NULL, "isolation=snapshot", &_session));
92 }
根据先前的分析,我们知道,每一个查询都会attach在一个WiredTiger层面的Snapshot上,如果一个查询开始后没有释放Snapshot重新申请,那么它就能保证Snapshot Isolation的读事务隔离级别,能保证PIT(of query begin time)的一致性。如果做不到这点,就只能保证Read Committed的隔离性。
带着上面的理论指导,我们继续探讨。Mongodb在查询的执行过程中,何时有可能释放掉Snapshot呢,这里需要牵扯到YieldPolicy。
YieldPolicy
我们知道,mongodb 在执行一个耗时较长的查询时,可以通过db.killOp()命令结束。 它是通过YieldPolicy做到这点的。具体到查询而言,查询使用YieldAuto Policy。所谓YieldAuto,是指查询请求会运行一段时间(可配置)后让出CPU时间片,并检测自己是否被killOp命令kill掉。这是一种典型的协作式调度策略。
59 PlanYieldPolicy::PlanYieldPolicy(PlanExecutor::YieldPolicy policy, ClockSource* cs)
62 :_elapsedTracker(cs,
63 internalQueryExecYieldIterations.load(),
64 Milliseconds(internalQueryExecYieldPeriodMS.load())),
65 _planYielding(nullptr) {}
66
67 bool PlanYieldPolicy::shouldYieldOrInterrupt() {
71 return shouldYield();
72 }
73
74 bool PlanYieldPolicy::shouldYield() {
75 if (!canAutoYield())
76 return false;
80 return _elapsedTracker.intervalHasElapsed();
81 }
上面的代码中,internalQueryExecYieldPeriodMS 可以通过setParameter 命令配置,控制查询命令的AutoYield的间隔,默认10毫秒。
Mongodb在一个执行计划被Yield出去之后,执行清理工作。 首先检查是否被killOp命令杀掉了,如果没有被杀掉,会通过yieldAllLocks暂时让出锁资源。
Status PlanYieldPolicy::yield(stdx::function<void()> beforeYieldingFn,
stdx::function<void()> whileYieldingFn) {
// 检查是否被kill掉了
if (_policy == PlanExecutor::YIELD_AUTO) {
auto interruptStatus = opCtx->checkForInterruptNoAssert();
if (!interruptStatus.isOK()) {
return interruptStatus;
}
}
planYielding->saveState();
QueryYield::yieldAllLocks(opCtx, whileYieldingFn, _planYielding->nss());
}
yieldAllLocks
yieldAllLocks函数就是最终的答案,AutoYield的过程中,会通过opCtx->recoveryUnit()->abandonSnapshot()释放掉执行计划中的Snapshot。
void QueryYield::yieldAllLocks(OperationContext* opCtx,
stdx::function<void()> whileYieldingFn,
const NamespaceString& planExecNS) {
// Things have to happen here in a specific order:
// * Release lock mgr locks
// * Go to sleep
// * Call the whileYieldingFn
// * Reacquire lock mgr locks
Locker* locker = opCtx->lockState();
Locker::LockSnapshot snapshot;
// Nothing was unlocked, just return, yielding is pointless.
if (!locker->saveLockStateAndUnlock(&snapshot)) {
return;
}
// Top-level locks are freed, release any potential low-level (storage engine-specific
// locks). If we are yielding, we are at a safe place to do so.
opCtx->recoveryUnit()->abandonSnapshot();
// Track the number of yields in CurOp.
CurOp::get(opCtx)->yielded();
MONGO_FAIL_POINT_PAUSE_WHILE_SET(setYieldAllLocksHang);
MONGO_FAIL_POINT_BLOCK(setYieldAllLocksWait, customWait) {
const BSONObj& data = customWait.getData();
BSONElement customWaitNS = data["namespace"];
if (!customWaitNS || planExecNS.ns() == customWaitNS.str()) {
sleepFor(Milliseconds(data["waitForMillis"].numberInt()));
}
}
if (whileYieldingFn) {
whileYieldingFn();
}
locker->restoreLockState(snapshot);
}
总结
Mongodb使用WiredTiger提供的SnapshotIsolation 隔离级别。但不代表Mongodb的查询是该隔离级别。Mongodb的查询过程中,会阶段性的将过程Yield出去,一方面是为了检测过程是否已经被终止,一方面是为了让出时间片给其它线程执行。而Yield出去的查询,会连带释放掉WiredTiger层的Snapshot。因此,Mongodb的查询操作的事务隔离级别是Read-Committed隔离级别的。
网友评论