美文网首页
mongo recordid src

mongo recordid src

作者: Vackine | 来源:发表于2020-03-08 16:58 被阅读0次

    mongo recordid src

    抽象类RecordStore,封装了一系列,记录操作相关的接口,定义原文如下:

    /**
     * An abstraction used for storing documents in a collection or entries in an index.
     *
     * In storage engines implementing the KVEngine, record stores are also used for implementing
     * catalogs.
     *
     * Many methods take an OperationContext parameter. This contains the RecoveryUnit, with
     * all RecordStore specific transaction information, as well as the LockState. Methods that take
     * an OperationContext may throw a WriteConflictException.
     *
     * This class must be thread-safe for document-level locking storage engines. In addition, for
     * storage engines implementing the KVEngine some methods must be thread safe, see DurableCatalog.
     * Only for MMAPv1 is this class not thread-safe.
     */
    class RecordStore
    

    insert 一条记录相关的源码记录。

    插入记录的接口定义

     /**通过复制传递的record data 将指定的记录插入到RecordStore中,并更新'inOutRecords'的值让其包含
     * 插入数据的id
         * Inserts the specified records into this RecordStore by copying the passed-in record data and
         * updates 'inOutRecords' to contain the ids of the inserted records.
         */
        virtual Status insertRecords(OperationContext* opCtx, // 操作的上下文
                                     std::vector<Record>* inOutRecords, // inOutRecords对象
                                     const std::vector<Timestamp>& timestamps) = 0; // 时戳列表
    
        /**插入单条记录的简单封装
         * A thin wrapper around insertRecords() to simplify handling of single document inserts.
         */
        StatusWith<RecordId> insertRecord(OperationContext* opCtx,
                                          const char* data,
                                          int len,
                                          Timestamp timestamp) {
            std::vector<Record> inOutRecords{Record{RecordId(), RecordData(data, len)}}; //
            // Record{RecordId(), RecordData(data, len)} -> 通过构建recordid 和data构建Record 对象
            //然后构建inOutRecords数组,只包含一个Record对象
            Status status = insertRecords(opCtx, &inOutRecords, std::vector<Timestamp>{timestamp});
            //传入时戳,调用接口进行插入
            if (!status.isOK())
                return status;
            return inOutRecords.front().id;
        }
    

    插入相关的数据结构

    Record对象

    /**
     * The data items stored in a RecordStore.
     */
    struct Record {
        RecordId id;
        RecordData data;
    };
    
    

    RecordId对象

    RecordId对象 是一个collection或RecordStore中记录的唯一标识,是一个有范围的值,
    有几种取值分类,null类型,最小保留值,最大大保留值,同时还有一个hasher哈希值对象,可以生成recordid的哈希值

    /**
     * The key that uniquely identifies a Record in a Collection or RecordStore.
     */
    class RecordId {
    public:
        // This set of constants define the boundaries of the 'normal' and 'reserved' id ranges.
        static constexpr int64_t kNullRepr = 0;
        static constexpr int64_t kMinRepr = LLONG_MIN;
        static constexpr int64_t kMaxRepr = LLONG_MAX;
        static constexpr int64_t kMinReservedRepr = kMaxRepr - (1024 * 1024);
    
        /**
         * Enumerates all ids in the reserved range that have been allocated for a specific purpose.
         */
        enum class ReservedId : int64_t { kWildcardMultikeyMetadataId = kMinReservedRepr };
    
        /**
         * Constructs a Null RecordId.
         */
        RecordId() : _repr(kNullRepr) {}
    
        explicit RecordId(int64_t repr) : _repr(repr) {}
    
        explicit RecordId(ReservedId repr) : RecordId(static_cast<int64_t>(repr)) {}
    
        /**
         * Construct a RecordId from two halves.
         * TODO consider removing.
         */
        RecordId(int high, int low) : _repr((uint64_t(high) << 32) | uint32_t(low)) {}
    
        /**
         * A RecordId that compares less than all ids that represent documents in a collection.
         */
        static RecordId min() {
            return RecordId(kMinRepr);
        }
    
        /**
         * A RecordId that compares greater than all ids that represent documents in a collection.
         */
        static RecordId max() {
            return RecordId(kMaxRepr);
        }
    
        /**
         * Returns the first record in the reserved id range at the top of the RecordId space.
         */
        static RecordId minReserved() {
            return RecordId(kMinReservedRepr);
        }
    
        bool isNull() const {
            return _repr == 0;
        }
    
        int64_t repr() const {
            return _repr;
        }
    
        /**
         * Valid RecordIds are the only ones which may be used to represent Records. The range of valid
         * RecordIds includes both "normal" ids that refer to user data, and "reserved" ids that are
         * used internally. All RecordIds outside of the valid range are sentinel values.
         */
        bool isValid() const {
            return isNormal() || isReserved();
        }
    
        /**
         * Normal RecordIds are those which fall within the range used to represent normal user data,
         * excluding the reserved range at the top of the RecordId space.
         */
        bool isNormal() const {
            return _repr > 0 && _repr < kMinReservedRepr;
        }
    
        /**
         * Returns true if this RecordId falls within the reserved range at the top of the record space.
         */
        bool isReserved() const {
            return _repr >= kMinReservedRepr && _repr < kMaxRepr;
        }
    
        int compare(RecordId rhs) const {
            return _repr == rhs._repr ? 0 : _repr < rhs._repr ? -1 : 1;
        }
    
        /**
         * Hash value for this RecordId. The hash implementation may be modified, and its behavior
         * may differ across platforms. Hash values should not be persisted.
         */
        struct Hasher {
            size_t operator()(RecordId rid) const {
                size_t hash = 0;
                // TODO consider better hashes
                boost::hash_combine(hash, rid.repr());
                return hash;
            }
        };
    
        /// members for Sorter
        struct SorterDeserializeSettings {};  // unused
        void serializeForSorter(BufBuilder& buf) const {
            buf.appendNum(static_cast<long long>(_repr));
        }
        static RecordId deserializeForSorter(BufReader& buf, const SorterDeserializeSettings&) {
            return RecordId(buf.read<LittleEndian<int64_t>>());
        }
        int memUsageForSorter() const {
            return sizeof(RecordId);
        }
        RecordId getOwned() const {
            return *this;
        }
    
    private:
        int64_t _repr;
    };
    
    

    RecordData 对象

    /**
     * A replacement for the Record class. This class represents data in a record store.
     * The _ownedData attribute is used to manage memory ownership.
     */
    class RecordData {
    public:
        RecordData() : _data(NULL), _size(0) {}
        RecordData(const char* data, int size) : _data(data), _size(size) {}
    
        RecordData(SharedBuffer ownedData, int size)
            : _data(ownedData.get()), _size(size), _ownedData(std::move(ownedData)) {}
    
        const char* data() const {
            return _data;
        }
    
        int size() const {
            return _size;
        }
    
        /**
         * Returns true if this owns its own memory, and false otherwise
         */
        bool isOwned() const {
            return _ownedData.get();
        }
    
        SharedBuffer releaseBuffer() {
            return std::move(_ownedData);
        }
    
        BSONObj toBson() const& {
            return isOwned() ? BSONObj(_ownedData) : BSONObj(_data);
        }
    
        BSONObj releaseToBson() {
            return isOwned() ? BSONObj(releaseBuffer()) : BSONObj(_data);
        }
    
        BSONObj toBson() && {
            return releaseToBson();
        }
    
        RecordData getOwned() const {
            if (isOwned())
                return *this;
            auto buffer = SharedBuffer::allocate(_size);
            memcpy(buffer.get(), _data, _size);
            return RecordData(buffer, _size);
        }
    
        void makeOwned() {
            if (isOwned())
                return;
            *this = getOwned();
        }
    
    private:
        const char* _data;
        int _size;
        SharedBuffer _ownedData;
    };
    
    

    wiredTiger 中对 insert 接口的实现

    virtual Status insertRecords

    Status WiredTigerRecordStore::insertRecords(OperationContext* opCtx,
                                                std::vector<Record>* records,
                                                const std::vector<Timestamp>& timestamps) {
        return _insertRecords(opCtx, records->data(), timestamps.data(), records->size());
    }
    
    
    Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx,
                                                 Record* records,
                                                 const Timestamp* timestamps,
                                                 size_t nRecords) {
        dassert(opCtx->lockState()->isWriteLocked()); // 获取到写锁
    
        // We are kind of cheating on capped collections since we write all of them at once ....
        // Simplest way out would be to just block vector writes for everything except oplog ?
        int64_t totalLength = 0;
        for (size_t i = 0; i < nRecords; i++)
            totalLength += records[i].data.size();
    
        // caller will retry one element at a time
        // 如果store engine 是内存型的并且插入数据总的大小大于内存大小,之间退出
        /*
         // The capped settings should not be updated once operations have started
        const bool _isCapped;
        // True if the storage engine is an in-memory storage engine
        */
        if (_isCapped && totalLength > _cappedMaxSize)
            return Status(ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize");
    
        WiredTigerCursor curwrap(_uri, _tableId, true, opCtx); // 通过表id以及uri等参数获取wtCursor对象
        curwrap.assertInActiveTxn();
        WT_CURSOR* c = curwrap.get(); 
        invariant(c);
    
        RecordId highestId = RecordId(); // 默认recordid
        dassert(nRecords != 0);
        for (size_t i = 0; i < nRecords; i++) {
            auto& record = records[i];
            if (_isOplog) {// 是读oplog进行表数据更新
                StatusWith<RecordId> status =
                    oploghack::extractKey(record.data.data(), record.data.size());
                if (!status.isOK())
                    return status.getStatus();
                record.id = status.getValue();
            } else {// 直接插入新的记录
                record.id = _nextId(opCtx);
                //获取opCtx下的有效的recordid
                // 会先找到最近一个在使用的id,并在该基础上+1
    
            }
            dassert(record.id > highestId);
            highestId = record.id;
        }
    
        for (size_t i = 0; i < nRecords; i++) {
            auto& record = records[i];
            Timestamp ts;
            if (timestamps[i].isNull() && _isOplog) {
                // If the timestamp is 0, that probably means someone inserted a document directly
                // into the oplog.  In this case, use the RecordId as the timestamp, since they are
                // one and the same. Setting this transaction to be unordered will trigger a journal
                // flush. Because these are direct writes into the oplog, the machinery to trigger a
                // journal flush is bypassed. A followup oplog read will require a fresh visibility
                // value to make progress.
                ts = Timestamp(record.id.repr());
                opCtx->recoveryUnit()->setOrderedCommit(false);
            } else {
                ts = timestamps[i];
            }
            if (!ts.isNull()) {
                LOG(4) << "inserting record with timestamp " << ts;
                fassert(39001, opCtx->recoveryUnit()->setTimestamp(ts));
            }
            setKey(c, record.id);
            WiredTigerItem value(record.data.data(), record.data.size());
            c->set_value(c, value.Get());
            int ret = WT_OP_CHECK(c->insert(c));
            if (ret)
                return wtRCToStatus(ret, "WiredTigerRecordStore::insertRecord");
        }
    
        _changeNumRecords(opCtx, nRecords);
        _increaseDataSize(opCtx, totalLength);
    
        if (_oplogStones) {
            _oplogStones->updateCurrentStoneAfterInsertOnCommit(
                opCtx, totalLength, highestId, nRecords);
        } else {
            _cappedDeleteAsNeeded(opCtx, highestId);
        }
    
        return Status::OK();
    }
    
    
    
    

    相关文章

      网友评论

          本文标题:mongo recordid src

          本文链接:https://www.haomeiwen.com/subject/lofjdhtx.html