mongo recordid src
抽象类RecordStore,封装了一系列,记录操作相关的接口,定义原文如下:
/**
* An abstraction used for storing documents in a collection or entries in an index.
*
* In storage engines implementing the KVEngine, record stores are also used for implementing
* catalogs.
*
* Many methods take an OperationContext parameter. This contains the RecoveryUnit, with
* all RecordStore specific transaction information, as well as the LockState. Methods that take
* an OperationContext may throw a WriteConflictException.
*
* This class must be thread-safe for document-level locking storage engines. In addition, for
* storage engines implementing the KVEngine some methods must be thread safe, see DurableCatalog.
* Only for MMAPv1 is this class not thread-safe.
*/
class RecordStore
insert 一条记录相关的源码记录。
插入记录的接口定义
/**通过复制传递的record data 将指定的记录插入到RecordStore中,并更新'inOutRecords'的值让其包含
* 插入数据的id
* Inserts the specified records into this RecordStore by copying the passed-in record data and
* updates 'inOutRecords' to contain the ids of the inserted records.
*/
virtual Status insertRecords(OperationContext* opCtx, // 操作的上下文
std::vector<Record>* inOutRecords, // inOutRecords对象
const std::vector<Timestamp>& timestamps) = 0; // 时戳列表
/**插入单条记录的简单封装
* A thin wrapper around insertRecords() to simplify handling of single document inserts.
*/
StatusWith<RecordId> insertRecord(OperationContext* opCtx,
const char* data,
int len,
Timestamp timestamp) {
std::vector<Record> inOutRecords{Record{RecordId(), RecordData(data, len)}}; //
// Record{RecordId(), RecordData(data, len)} -> 通过构建recordid 和data构建Record 对象
//然后构建inOutRecords数组,只包含一个Record对象
Status status = insertRecords(opCtx, &inOutRecords, std::vector<Timestamp>{timestamp});
//传入时戳,调用接口进行插入
if (!status.isOK())
return status;
return inOutRecords.front().id;
}
插入相关的数据结构
Record对象
/**
* The data items stored in a RecordStore.
*/
struct Record {
RecordId id;
RecordData data;
};
RecordId对象
RecordId对象 是一个collection或RecordStore中记录的唯一标识,是一个有范围的值,
有几种取值分类,null类型,最小保留值,最大大保留值,同时还有一个hasher哈希值对象,可以生成recordid的哈希值
/**
* The key that uniquely identifies a Record in a Collection or RecordStore.
*/
class RecordId {
public:
// This set of constants define the boundaries of the 'normal' and 'reserved' id ranges.
static constexpr int64_t kNullRepr = 0;
static constexpr int64_t kMinRepr = LLONG_MIN;
static constexpr int64_t kMaxRepr = LLONG_MAX;
static constexpr int64_t kMinReservedRepr = kMaxRepr - (1024 * 1024);
/**
* Enumerates all ids in the reserved range that have been allocated for a specific purpose.
*/
enum class ReservedId : int64_t { kWildcardMultikeyMetadataId = kMinReservedRepr };
/**
* Constructs a Null RecordId.
*/
RecordId() : _repr(kNullRepr) {}
explicit RecordId(int64_t repr) : _repr(repr) {}
explicit RecordId(ReservedId repr) : RecordId(static_cast<int64_t>(repr)) {}
/**
* Construct a RecordId from two halves.
* TODO consider removing.
*/
RecordId(int high, int low) : _repr((uint64_t(high) << 32) | uint32_t(low)) {}
/**
* A RecordId that compares less than all ids that represent documents in a collection.
*/
static RecordId min() {
return RecordId(kMinRepr);
}
/**
* A RecordId that compares greater than all ids that represent documents in a collection.
*/
static RecordId max() {
return RecordId(kMaxRepr);
}
/**
* Returns the first record in the reserved id range at the top of the RecordId space.
*/
static RecordId minReserved() {
return RecordId(kMinReservedRepr);
}
bool isNull() const {
return _repr == 0;
}
int64_t repr() const {
return _repr;
}
/**
* Valid RecordIds are the only ones which may be used to represent Records. The range of valid
* RecordIds includes both "normal" ids that refer to user data, and "reserved" ids that are
* used internally. All RecordIds outside of the valid range are sentinel values.
*/
bool isValid() const {
return isNormal() || isReserved();
}
/**
* Normal RecordIds are those which fall within the range used to represent normal user data,
* excluding the reserved range at the top of the RecordId space.
*/
bool isNormal() const {
return _repr > 0 && _repr < kMinReservedRepr;
}
/**
* Returns true if this RecordId falls within the reserved range at the top of the record space.
*/
bool isReserved() const {
return _repr >= kMinReservedRepr && _repr < kMaxRepr;
}
int compare(RecordId rhs) const {
return _repr == rhs._repr ? 0 : _repr < rhs._repr ? -1 : 1;
}
/**
* Hash value for this RecordId. The hash implementation may be modified, and its behavior
* may differ across platforms. Hash values should not be persisted.
*/
struct Hasher {
size_t operator()(RecordId rid) const {
size_t hash = 0;
// TODO consider better hashes
boost::hash_combine(hash, rid.repr());
return hash;
}
};
/// members for Sorter
struct SorterDeserializeSettings {}; // unused
void serializeForSorter(BufBuilder& buf) const {
buf.appendNum(static_cast<long long>(_repr));
}
static RecordId deserializeForSorter(BufReader& buf, const SorterDeserializeSettings&) {
return RecordId(buf.read<LittleEndian<int64_t>>());
}
int memUsageForSorter() const {
return sizeof(RecordId);
}
RecordId getOwned() const {
return *this;
}
private:
int64_t _repr;
};
RecordData 对象
/**
* A replacement for the Record class. This class represents data in a record store.
* The _ownedData attribute is used to manage memory ownership.
*/
class RecordData {
public:
RecordData() : _data(NULL), _size(0) {}
RecordData(const char* data, int size) : _data(data), _size(size) {}
RecordData(SharedBuffer ownedData, int size)
: _data(ownedData.get()), _size(size), _ownedData(std::move(ownedData)) {}
const char* data() const {
return _data;
}
int size() const {
return _size;
}
/**
* Returns true if this owns its own memory, and false otherwise
*/
bool isOwned() const {
return _ownedData.get();
}
SharedBuffer releaseBuffer() {
return std::move(_ownedData);
}
BSONObj toBson() const& {
return isOwned() ? BSONObj(_ownedData) : BSONObj(_data);
}
BSONObj releaseToBson() {
return isOwned() ? BSONObj(releaseBuffer()) : BSONObj(_data);
}
BSONObj toBson() && {
return releaseToBson();
}
RecordData getOwned() const {
if (isOwned())
return *this;
auto buffer = SharedBuffer::allocate(_size);
memcpy(buffer.get(), _data, _size);
return RecordData(buffer, _size);
}
void makeOwned() {
if (isOwned())
return;
*this = getOwned();
}
private:
const char* _data;
int _size;
SharedBuffer _ownedData;
};
wiredTiger 中对 insert 接口的实现
virtual Status insertRecords
Status WiredTigerRecordStore::insertRecords(OperationContext* opCtx,
std::vector<Record>* records,
const std::vector<Timestamp>& timestamps) {
return _insertRecords(opCtx, records->data(), timestamps.data(), records->size());
}
Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx,
Record* records,
const Timestamp* timestamps,
size_t nRecords) {
dassert(opCtx->lockState()->isWriteLocked()); // 获取到写锁
// We are kind of cheating on capped collections since we write all of them at once ....
// Simplest way out would be to just block vector writes for everything except oplog ?
int64_t totalLength = 0;
for (size_t i = 0; i < nRecords; i++)
totalLength += records[i].data.size();
// caller will retry one element at a time
// 如果store engine 是内存型的并且插入数据总的大小大于内存大小,之间退出
/*
// The capped settings should not be updated once operations have started
const bool _isCapped;
// True if the storage engine is an in-memory storage engine
*/
if (_isCapped && totalLength > _cappedMaxSize)
return Status(ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize");
WiredTigerCursor curwrap(_uri, _tableId, true, opCtx); // 通过表id以及uri等参数获取wtCursor对象
curwrap.assertInActiveTxn();
WT_CURSOR* c = curwrap.get();
invariant(c);
RecordId highestId = RecordId(); // 默认recordid
dassert(nRecords != 0);
for (size_t i = 0; i < nRecords; i++) {
auto& record = records[i];
if (_isOplog) {// 是读oplog进行表数据更新
StatusWith<RecordId> status =
oploghack::extractKey(record.data.data(), record.data.size());
if (!status.isOK())
return status.getStatus();
record.id = status.getValue();
} else {// 直接插入新的记录
record.id = _nextId(opCtx);
//获取opCtx下的有效的recordid
// 会先找到最近一个在使用的id,并在该基础上+1
}
dassert(record.id > highestId);
highestId = record.id;
}
for (size_t i = 0; i < nRecords; i++) {
auto& record = records[i];
Timestamp ts;
if (timestamps[i].isNull() && _isOplog) {
// If the timestamp is 0, that probably means someone inserted a document directly
// into the oplog. In this case, use the RecordId as the timestamp, since they are
// one and the same. Setting this transaction to be unordered will trigger a journal
// flush. Because these are direct writes into the oplog, the machinery to trigger a
// journal flush is bypassed. A followup oplog read will require a fresh visibility
// value to make progress.
ts = Timestamp(record.id.repr());
opCtx->recoveryUnit()->setOrderedCommit(false);
} else {
ts = timestamps[i];
}
if (!ts.isNull()) {
LOG(4) << "inserting record with timestamp " << ts;
fassert(39001, opCtx->recoveryUnit()->setTimestamp(ts));
}
setKey(c, record.id);
WiredTigerItem value(record.data.data(), record.data.size());
c->set_value(c, value.Get());
int ret = WT_OP_CHECK(c->insert(c));
if (ret)
return wtRCToStatus(ret, "WiredTigerRecordStore::insertRecord");
}
_changeNumRecords(opCtx, nRecords);
_increaseDataSize(opCtx, totalLength);
if (_oplogStones) {
_oplogStones->updateCurrentStoneAfterInsertOnCommit(
opCtx, totalLength, highestId, nRecords);
} else {
_cappedDeleteAsNeeded(opCtx, highestId);
}
return Status::OK();
}
网友评论