美文网首页
mongo recordid src

mongo recordid src

作者: Vackine | 来源:发表于2020-03-08 16:58 被阅读0次

mongo recordid src

抽象类RecordStore,封装了一系列,记录操作相关的接口,定义原文如下:

/**
 * An abstraction used for storing documents in a collection or entries in an index.
 *
 * In storage engines implementing the KVEngine, record stores are also used for implementing
 * catalogs.
 *
 * Many methods take an OperationContext parameter. This contains the RecoveryUnit, with
 * all RecordStore specific transaction information, as well as the LockState. Methods that take
 * an OperationContext may throw a WriteConflictException.
 *
 * This class must be thread-safe for document-level locking storage engines. In addition, for
 * storage engines implementing the KVEngine some methods must be thread safe, see DurableCatalog.
 * Only for MMAPv1 is this class not thread-safe.
 */
class RecordStore

insert 一条记录相关的源码记录。

插入记录的接口定义

 /**通过复制传递的record data 将指定的记录插入到RecordStore中,并更新'inOutRecords'的值让其包含
 * 插入数据的id
     * Inserts the specified records into this RecordStore by copying the passed-in record data and
     * updates 'inOutRecords' to contain the ids of the inserted records.
     */
    virtual Status insertRecords(OperationContext* opCtx, // 操作的上下文
                                 std::vector<Record>* inOutRecords, // inOutRecords对象
                                 const std::vector<Timestamp>& timestamps) = 0; // 时戳列表

    /**插入单条记录的简单封装
     * A thin wrapper around insertRecords() to simplify handling of single document inserts.
     */
    StatusWith<RecordId> insertRecord(OperationContext* opCtx,
                                      const char* data,
                                      int len,
                                      Timestamp timestamp) {
        std::vector<Record> inOutRecords{Record{RecordId(), RecordData(data, len)}}; //
        // Record{RecordId(), RecordData(data, len)} -> 通过构建recordid 和data构建Record 对象
        //然后构建inOutRecords数组,只包含一个Record对象
        Status status = insertRecords(opCtx, &inOutRecords, std::vector<Timestamp>{timestamp});
        //传入时戳,调用接口进行插入
        if (!status.isOK())
            return status;
        return inOutRecords.front().id;
    }

插入相关的数据结构

Record对象

/**
 * The data items stored in a RecordStore.
 */
struct Record {
    RecordId id;
    RecordData data;
};

RecordId对象

RecordId对象 是一个collection或RecordStore中记录的唯一标识,是一个有范围的值,
有几种取值分类,null类型,最小保留值,最大大保留值,同时还有一个hasher哈希值对象,可以生成recordid的哈希值

/**
 * The key that uniquely identifies a Record in a Collection or RecordStore.
 */
class RecordId {
public:
    // This set of constants define the boundaries of the 'normal' and 'reserved' id ranges.
    static constexpr int64_t kNullRepr = 0;
    static constexpr int64_t kMinRepr = LLONG_MIN;
    static constexpr int64_t kMaxRepr = LLONG_MAX;
    static constexpr int64_t kMinReservedRepr = kMaxRepr - (1024 * 1024);

    /**
     * Enumerates all ids in the reserved range that have been allocated for a specific purpose.
     */
    enum class ReservedId : int64_t { kWildcardMultikeyMetadataId = kMinReservedRepr };

    /**
     * Constructs a Null RecordId.
     */
    RecordId() : _repr(kNullRepr) {}

    explicit RecordId(int64_t repr) : _repr(repr) {}

    explicit RecordId(ReservedId repr) : RecordId(static_cast<int64_t>(repr)) {}

    /**
     * Construct a RecordId from two halves.
     * TODO consider removing.
     */
    RecordId(int high, int low) : _repr((uint64_t(high) << 32) | uint32_t(low)) {}

    /**
     * A RecordId that compares less than all ids that represent documents in a collection.
     */
    static RecordId min() {
        return RecordId(kMinRepr);
    }

    /**
     * A RecordId that compares greater than all ids that represent documents in a collection.
     */
    static RecordId max() {
        return RecordId(kMaxRepr);
    }

    /**
     * Returns the first record in the reserved id range at the top of the RecordId space.
     */
    static RecordId minReserved() {
        return RecordId(kMinReservedRepr);
    }

    bool isNull() const {
        return _repr == 0;
    }

    int64_t repr() const {
        return _repr;
    }

    /**
     * Valid RecordIds are the only ones which may be used to represent Records. The range of valid
     * RecordIds includes both "normal" ids that refer to user data, and "reserved" ids that are
     * used internally. All RecordIds outside of the valid range are sentinel values.
     */
    bool isValid() const {
        return isNormal() || isReserved();
    }

    /**
     * Normal RecordIds are those which fall within the range used to represent normal user data,
     * excluding the reserved range at the top of the RecordId space.
     */
    bool isNormal() const {
        return _repr > 0 && _repr < kMinReservedRepr;
    }

    /**
     * Returns true if this RecordId falls within the reserved range at the top of the record space.
     */
    bool isReserved() const {
        return _repr >= kMinReservedRepr && _repr < kMaxRepr;
    }

    int compare(RecordId rhs) const {
        return _repr == rhs._repr ? 0 : _repr < rhs._repr ? -1 : 1;
    }

    /**
     * Hash value for this RecordId. The hash implementation may be modified, and its behavior
     * may differ across platforms. Hash values should not be persisted.
     */
    struct Hasher {
        size_t operator()(RecordId rid) const {
            size_t hash = 0;
            // TODO consider better hashes
            boost::hash_combine(hash, rid.repr());
            return hash;
        }
    };

    /// members for Sorter
    struct SorterDeserializeSettings {};  // unused
    void serializeForSorter(BufBuilder& buf) const {
        buf.appendNum(static_cast<long long>(_repr));
    }
    static RecordId deserializeForSorter(BufReader& buf, const SorterDeserializeSettings&) {
        return RecordId(buf.read<LittleEndian<int64_t>>());
    }
    int memUsageForSorter() const {
        return sizeof(RecordId);
    }
    RecordId getOwned() const {
        return *this;
    }

private:
    int64_t _repr;
};

RecordData 对象

/**
 * A replacement for the Record class. This class represents data in a record store.
 * The _ownedData attribute is used to manage memory ownership.
 */
class RecordData {
public:
    RecordData() : _data(NULL), _size(0) {}
    RecordData(const char* data, int size) : _data(data), _size(size) {}

    RecordData(SharedBuffer ownedData, int size)
        : _data(ownedData.get()), _size(size), _ownedData(std::move(ownedData)) {}

    const char* data() const {
        return _data;
    }

    int size() const {
        return _size;
    }

    /**
     * Returns true if this owns its own memory, and false otherwise
     */
    bool isOwned() const {
        return _ownedData.get();
    }

    SharedBuffer releaseBuffer() {
        return std::move(_ownedData);
    }

    BSONObj toBson() const& {
        return isOwned() ? BSONObj(_ownedData) : BSONObj(_data);
    }

    BSONObj releaseToBson() {
        return isOwned() ? BSONObj(releaseBuffer()) : BSONObj(_data);
    }

    BSONObj toBson() && {
        return releaseToBson();
    }

    RecordData getOwned() const {
        if (isOwned())
            return *this;
        auto buffer = SharedBuffer::allocate(_size);
        memcpy(buffer.get(), _data, _size);
        return RecordData(buffer, _size);
    }

    void makeOwned() {
        if (isOwned())
            return;
        *this = getOwned();
    }

private:
    const char* _data;
    int _size;
    SharedBuffer _ownedData;
};

wiredTiger 中对 insert 接口的实现

virtual Status insertRecords

Status WiredTigerRecordStore::insertRecords(OperationContext* opCtx,
                                            std::vector<Record>* records,
                                            const std::vector<Timestamp>& timestamps) {
    return _insertRecords(opCtx, records->data(), timestamps.data(), records->size());
}


Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx,
                                             Record* records,
                                             const Timestamp* timestamps,
                                             size_t nRecords) {
    dassert(opCtx->lockState()->isWriteLocked()); // 获取到写锁

    // We are kind of cheating on capped collections since we write all of them at once ....
    // Simplest way out would be to just block vector writes for everything except oplog ?
    int64_t totalLength = 0;
    for (size_t i = 0; i < nRecords; i++)
        totalLength += records[i].data.size();

    // caller will retry one element at a time
    // 如果store engine 是内存型的并且插入数据总的大小大于内存大小,之间退出
    /*
     // The capped settings should not be updated once operations have started
    const bool _isCapped;
    // True if the storage engine is an in-memory storage engine
    */
    if (_isCapped && totalLength > _cappedMaxSize)
        return Status(ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize");

    WiredTigerCursor curwrap(_uri, _tableId, true, opCtx); // 通过表id以及uri等参数获取wtCursor对象
    curwrap.assertInActiveTxn();
    WT_CURSOR* c = curwrap.get(); 
    invariant(c);

    RecordId highestId = RecordId(); // 默认recordid
    dassert(nRecords != 0);
    for (size_t i = 0; i < nRecords; i++) {
        auto& record = records[i];
        if (_isOplog) {// 是读oplog进行表数据更新
            StatusWith<RecordId> status =
                oploghack::extractKey(record.data.data(), record.data.size());
            if (!status.isOK())
                return status.getStatus();
            record.id = status.getValue();
        } else {// 直接插入新的记录
            record.id = _nextId(opCtx);
            //获取opCtx下的有效的recordid
            // 会先找到最近一个在使用的id,并在该基础上+1

        }
        dassert(record.id > highestId);
        highestId = record.id;
    }

    for (size_t i = 0; i < nRecords; i++) {
        auto& record = records[i];
        Timestamp ts;
        if (timestamps[i].isNull() && _isOplog) {
            // If the timestamp is 0, that probably means someone inserted a document directly
            // into the oplog.  In this case, use the RecordId as the timestamp, since they are
            // one and the same. Setting this transaction to be unordered will trigger a journal
            // flush. Because these are direct writes into the oplog, the machinery to trigger a
            // journal flush is bypassed. A followup oplog read will require a fresh visibility
            // value to make progress.
            ts = Timestamp(record.id.repr());
            opCtx->recoveryUnit()->setOrderedCommit(false);
        } else {
            ts = timestamps[i];
        }
        if (!ts.isNull()) {
            LOG(4) << "inserting record with timestamp " << ts;
            fassert(39001, opCtx->recoveryUnit()->setTimestamp(ts));
        }
        setKey(c, record.id);
        WiredTigerItem value(record.data.data(), record.data.size());
        c->set_value(c, value.Get());
        int ret = WT_OP_CHECK(c->insert(c));
        if (ret)
            return wtRCToStatus(ret, "WiredTigerRecordStore::insertRecord");
    }

    _changeNumRecords(opCtx, nRecords);
    _increaseDataSize(opCtx, totalLength);

    if (_oplogStones) {
        _oplogStones->updateCurrentStoneAfterInsertOnCommit(
            opCtx, totalLength, highestId, nRecords);
    } else {
        _cappedDeleteAsNeeded(opCtx, highestId);
    }

    return Status::OK();
}



相关文章

网友评论

      本文标题:mongo recordid src

      本文链接:https://www.haomeiwen.com/subject/lofjdhtx.html