MongoDB副本集是一种有效的提高服务可用性和数据可靠性的手段。多个数据副本节点,提供了故障切换和数据冗余能力。MongoDB副本集的选举选法是基于大多数共识,因此2n+2个节点和2n+1个节点,都可以承受最多n个节点不可用的场景。因而通常副本集的节点数都是奇数个。副本节点的数据通过同步的方式进行复制,具体的复制方式可以分为增量同步(background sync)和全量同步(initial sync)。一个全新的数据节点数据同步通常首先经过全量同步阶段进行已有数据复制,最后进入稳定的增量同步阶段。
全量同步(initla sync)
全量同步全量同步可以大致可以分为同步前准备和数据同步2个步骤,数据同步过程会启动2个同时执行的数据库克隆&oplog回放和oplog复制异步任务。需要指明的是,oplog拉取的开始时间戳是就是克隆数据库开始时间。在克隆数据库过程中,由于MongoDB3.6尚不支持跨文档事务,得到的数据是一个不一致的数据镜像,需要依赖oplog的幂等性进行回放,从而得到一致的数据库数据。
准备阶段
同步前准备步骤,主要逻辑用伪代码大致简单描述如下:
void prepareInitialSync() {
//重置optime、sync source、lastApplied、lastFetched,清空oplog buffer
reset();
// 选择同步源: 固定配置节点、主节点、链式复制最优节点
chooseSyncSource();
//清空oplog集合,删除所有用户数据库(保留local)
truncateOplogAndDropReplicatedDatabases();
//检查版本兼容性,3.6版本secondary节点,可兼容3.4/3.6版本的primary节点
checkFeatureCompatibilityVersion();
}
数据库克隆&日志回放
克隆数据库需要进行数据库和集合两次迭代,既遍历所有需要遍历的数据库下所有需要遍历的集合。在遍历过程中,部分数据库(如local数据库)和部分集合(如system集合)需要略过,admin数据库会放在第一个同步以使得在同步过程中用户即可登录数据库。在3.6版本的MongoDB中,针对每个待克隆的db会开启1个thread执行相应的CloneDatabase任务;针对每个db下的collection,也会开启1个thread执行相应的CloneCollection任务。在collection内部,通过find操作顺序遍历cursor来读取collection的所有数据。
在回放日志过程中,通常会采取批量读取批量回放的方式提高回放速度,单一批次的限制为oplog日志数据量和oplog日志条数。需要说明的是,部分日志类型(如kCommand类型日志、ns为system.index的日志)不能进行批量回放。
- 从源端获取db列表
- 获取每个db的collection列表
- 获取每个collection的index并在本地创建collection和相应的index
- 通过cursor读取collection的所有doc,并在本地执行insert操作插入到相应的collection中
- 从oplogBuffer读取所有日志,将oplog写入到本地oplog集合,并apply相应的oplog日志
- 若在回放期间有新日志产生,则回到5,否则initial sync阶段结束。
增量同步(background sync)
增量同步过程,主要包括选择同步源、开始同步等逻辑,在同步失败后,会根据错误的不同类型进行重试。通常,在重试前会主动sleep一定的时间。
void BackgroundSync(){
auto status = _syncSourceResolver->startup();
if (syncSourceResp.syncSourceStatus == ErrorCodes::OplogStartMissing) {
// 如果本地需要复制的oplog start在source中无法找到,则进入recovering状态
auto status = _replCoord->setMaintenanceMode(true);
status = _replCoord->setFollowerMode(MemberState::RS_RECOVERING);
}
//开始日志复制
auto scheduleStatus = oplogFetcher->startup();
if (fetcherReturnStatus.code() == ErrorCodes::OplogStartMissing) {
// 若在同步源中找不到接下来需要同步的oplog,则进行rollback
_runRollback(opCtx.get(), fetcherReturnStatus, source,
syncSourceResp.rbid, storageInterface);
}
}
日志同步的一些关键点
在阅读代码过程中,发现了一些比较有借鉴参考意义的设计关键点,接下来将重点进行讨论。
同步源选择
HostAndPort chooseNewSyncSource(const OpTime& lastOpTimeFetched) {
// 使用强制同步源,在强制设置member同步源时生效,当次有效
if (_forceSyncSourceIndex != -1) {
_syncSource =
_rsConfig.getMemberAt(_forceSyncSourceIndex).getHostAndPort();
_forceSyncSourceIndex = -1;
return _syncSource;
}
// 不允许链式复制,以primary节点作为复制源
if (!_rsConfig.isChainingAllowed()) {
_syncSource = _currentPrimaryMember()->getHostAndPort();
return _syncSource;
}
for (auto attempt = 0; attempt < 2; attempt++) {
for (auto member : _members) {
// 必须是primary或者secondary节点
if (member.getState().readable()) {
continue;
}
if (attempt == 0) {
//若本身为vote,则同步源也需要为voter
if (_selfConfig().isVoter() && !member.isVoter()) {
continue;
}
// 同步源的optime不能落后primary节点太多
if (member.getHeartbeatAppliedOpTime() < oldestSyncOpTime) {
continue;
}
// 同步源的延迟时间不能大于自身的延迟时间
if (_selfConfig().getSlaveDelay() < member.getSlaveDelay()) {
continue;
}
}
// 同步源的optime必须领先自身的optime
if (member.getHeartbeatAppliedOpTime() <= lastOpTimeFetched) {
continue
}
// 选择网络延迟最低的节点
if (_getPing(member.getHostAndPort()) >
_getPing(_rsConfig.getMemberAt(closestIndex).getHostAndPort()) {
_closestIndex = member.Index
}
}
_syncSource = _rsConfig.getMemberAt(closestIndex).getHostAndPort();
return _syncSource;
}
}
在全量和增量同步启动前,都会涉及同步源的选择。选择的依据主要考虑同步源的日志足够新,且同步源的节点网络延迟最低。同时,在第一次尝试无法选择到最优的同步源时,会考虑进行降级,降低选择的标准。
rollback
由于网络存在时延,在主节点因为异常宕机时,存在部分日志只同步到少数节点且并未提交。新选举的主节点可能并未持有这些日志(选举共识算法只会保证新的主节点的lastAppliedOptime比大多数节点更新),从而产生日志冲突。rollback操作是MongoDB用来解决日志冲突的手段。
void BackgroundSync::_runRollback(OperationContext* opCtx,
const Status& fetcherReturnStatus,
const HostAndPort& source, int requiredRBID,
StorageInterface* storageInterface) {
// 等待本地日志全部被回放
if (lastApplied != lastOpTimeFetched) {
while (lastOpTimeFetched > _replCoord->getMyLastAppliedOpTime())) {
sleepmillis(10);
}
}
// 设置ROLLBACK状态
auto status = replCoord->setFollowerMode(MemberState::RS_ROLLBACK);
FixUpInfo how;
// 遍历远端的oplog集合,在本地oplog中查找该条记录是否存在
while (remoteResult.isOK()) {
auto result = finder.onRemoteOperation(theirObj);
if (result.isOK()) {
//找到了commonPoint
how.commonPoint =
result.getValue().first; // OpTime result.getValue();
} else if (result.getStatus().code() != ErrorCodes::NoSuchKey) {
return result;
}
remoteResult = remoteIterator->next();
}
// rollback createCollections, renameCollection, dropIndexes...
// 生成rollback archive文件
syncFixUp(opCtx, how, rollbackSource, replCoord, replicationProcess);
// truncate oplog集合
oplogCollection->cappedTruncateAfter(opCtx, fixUpInfo.commonPointOurDiskloc, false);
// 重置oplog时间
replCoord->resetLastOpTimesFromOplog(
opCtx, ReplicationCoordinator::DataConsistency::Inconsistent);
}
在rollback过程中,需要找到本地和同步源oplog的common point,并回滚从当前oplog到common point的所有操作。一个典型update操作对应的oplog如下:
{
"ts" : Timestamp(1585487134, 1),
"t" : NumberLong(7),
"h" : NumberLong("4637469566660292008"),
"v" : 2,
"op" : "u",
"ns" : "db_test.t_test",
"ui" : UUID("afa974c0-b640-4280-a2f8-e769427e0906"),
"o2" : {
"_id" : ObjectId("5e809c1a9d1cdab0f2c6bed6")
},
"wall" : ISODate("2020-03-29T13:05:34.117Z"),
"o" : {
"$v" : 1,
"$set" : {
"name" : "ricky1"
}
}
}
Write Concern
MongoDB由secondary主动发起复制日志请求,在一定程度上可以将其日志复制机制归结为异步复制。为了提供更强的灵活性,允许用户在不同场景下选择适当的系统配置,MongoDB为客户端提供Write Concern设置Write Ackknowledgement(写确认)的参数。具体参数如下:
{ w: <value>, j: <boolean>, wtimeout: <number> }
- w用于设置写入需要返回确认的节点数量,可以为特定的数字或者majority(写大多数)。
- j为bool类型,设置为true时,表示w设置节点数的节点journal写入成功则返回。
- wtimeout设置写操作的超时时间。
可以看到,w参数可以影响MongoDB的写入同步返回时间。在需要高一致性的场景下,选择majority或者较大的w参数;在需要较高写入性能的情况下,可以将其w设置为0或者1。
Status waitForWriteConcern(OperationContext* opCtx, const OpTime& replOpTime,
const WriteConcernOptions& writeConcern,
WriteConcernResult* result) {
switch (writeConcernWithPopulatedSyncMode.syncMode) {
case WriteConcernOptions::SyncMode::UNSET:
case WriteConcernOptions::SyncMode::NONE:
break;
case WriteConcernOptions::SyncMode::FSYNC: {
// sync所有数据到磁盘后返回
StorageEngine* storageEngine =
result->fsyncFiles = storageEngine->flushAllFiles(opCtx, true);
break;
}
case WriteConcernOptions::SyncMode::JOURNAL:
// journal写入成功后返回
OpTime appliedOpTime = replCoord->getMyLastAppliedOpTime();
opCtx->recoveryUnit()->waitUntilDurable();
replCoord->setMyLastDurableOpTimeForward(appliedOpTime);
break;
}
if (replMode == modeMasterSlave && writeConcern.wMode == WriteConcernOptions::kMajority) {
// master/slave 模式的majority相当于w=1
return Status::OK();
}
while (!_doneWaitingForReplication_inlock(opTime, minSnapshot, writeConcern)) {}
}
网友评论