美文网首页
Nemo-NemoRocksDb-Rocksdb调用

Nemo-NemoRocksDb-Rocksdb调用

作者: 白馨_1114 | 来源:发表于2020-05-09 17:52 被阅读0次

    以Hset为例:
    vim .third/nemo/src/nemo_hash.cc

    //hset ht_key ht_field ht_value

    Status Nemo::HSet(const std::string &key, const std::string &field, const std::string &val) {
        if (key.size() >= KEY_MAX_LENGTH || key.size() <= 0) { //ht_key超过255字节,则报错
           return Status::InvalidArgument("Invalid key length");
        }
    
        Status s;
    
        RecordLock l(&mutex_hash_record_, key);  //避免多线程对同一个ht_key操作
        //MutexLock l(&mutex_hash_);
        rocksdb::WriteBatch writebatch;
    
        int ret = DoHSet(key, field, val, writebatch);  //(详细见下)组成writebatch:db_key:"h6ht_key=ht_field”  db_val:ht_value
        if (ret > 0) {
            if (IncrHLen(key, ret, writebatch) == -1) { // (详细见下)此时writebatch为:writebatch[0]:db_key:"h6ht_key=ht_field”  db_val:ht_value
                                                                            //writebatch[1]:db_key:”Hht_key” db_val:1
                //hash_record_.Unlock(key);
                return Status::Corruption("incrhlen error");
            }
        }
        s = hash_db_->WriteWithOldKeyTTL(rocksdb::WriteOptions(), &(writebatch));  //详细见下
    
        //hash_record_.Unlock(key);
        return s;
    }
    

    //传入参数:ht_key,ht_field,ht_value
    //&writebatch最好使用指针,构建writebatch,返回对否新增field

    int Nemo::DoHSet(const std::string &key, const std::string &field, const std::string &val, rocksdb::WriteBatch &writebatch) {
        int ret = 0;
        std::string dbval;
        Status s = HGet(key, field, &dbval); //(详细见下) 
        if (s.IsNotFound()) { // not found  //dbvalue为空
            std::string hkey = EncodeHashKey(key, field);
            writebatch.Put(hkey, val);  //(详细见下)Put会加上kTypeValue 标记,在后面的
            ret = 1;  //新增field
        } else {
            if(dbval != val){  //db_val不为空,但是不为ht_value
                std::string hkey = EncodeHashKey(key, field);
                writebatch.Put(hkey, val);  //hkey:"h6ht_key=ht_field"  val:ht_value
            }
            ret = 0;  //原本就存在该field,本次只是修改值的操作
        }
        std::cout << "bx test already excute DoHset" << std::endl;
        return ret;
    }
    

    //传入参数:ht_key,ht_field,ht_value
    //获取ht_key+ht_field组成的db_key对应db_value,返回获取状态

    Status Nemo::HGet(const std::string &key, const std::string &field, std::string *val) {
        if (key.size() >= KEY_MAX_LENGTH || key.size() <= 0) {
           return Status::InvalidArgument("Invalid key length");
        }
    
        std::string dbkey = EncodeHashKey(key, field); // (详细见下) dbkey="h6ht_key=ht_field"
        Status s = hash_db_->Get(rocksdb::ReadOptions(), dbkey, val);    //如果存在,则dbkey对应的value修改为:ht_value。如果不存在,则返回为nullptr。
        return s;
    }
    

    //传入参数:ht_key,ht_field 返回:h6ht_key=ht_field

    inline std::string EncodeHashKey(const rocksdb::Slice &name, const rocksdb::Slice &key) {
        std::string buf;
        buf.append(1, DataType::kHash); 
        buf.append(1, (uint8_t)name.size());
        buf.append(name.data(), name.size());
        buf.append(1, '=');
        buf.append(key.data(), key.size());
        return buf;
    }
    

    // 传入参数:ht_key 返回ht_key对应field的个数

    int Nemo::IncrHLen(const std::string &key, int64_t incr, rocksdb::WriteBatch &writebatch) {
        int64_t len = HLen(key);  //详细见下,获取ht_key对应的field数量,此时为0
        if (len == -1) {
            return -1;
        }
        len += incr; //加上incr , 此时为1
        std::string size_key = EncodeHsizeKey(key);  
        writebatch.Put(size_key, rocksdb::Slice((char *)&len, sizeof(int64_t)));  
    
       // if (len == 0) {
       //     //writebatch.Delete(size_key);
       //     writebatch.Merge(size_key, rocksdb::Slice((char *)&len, sizeof(int64_t)));
       // } else {
       //     writebatch.Put(size_key, rocksdb::Slice((char *)&len, sizeof(int64_t)));
       // }
        return 0;
    }
    

    //获取ht_key对应的中field个数

    int64_t Nemo::HLen(const std::string &key) {
        std::string size_key = EncodeHsizeKey(key);  //构建 size_key:Hht_key
        std::string val;
        Status s;
    
        s = hash_db_->Get(rocksdb::ReadOptions(), size_key, &val);  //获取value中包含filed的数量
        if (s.IsNotFound()) {
            return 0;
        } else if(!s.ok()) {
            return -1;
        } else {
            if (val.size() != sizeof(uint64_t)) {
                return 0;
            }
            int64_t ret = *(int64_t *)val.data();  //   取里面的数据硬转,因为它存储的是字符类。而不是按数字存储的。
            return ret < 0? 0 : ret;
        }
    }
    

    const char kMetaPrefixKv = '\0';
    const char kMetaPrefixHash = 'H';
    const char kMetaPrefixZset = 'Z';
    const char kMetaPrefixSet = 'S';
    const char kMetaPrefixList = 'L’;

    //WriteBatch::Put

    void WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
                         const Slice& value) {
      WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value);
    }
    
    void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
                                 const SliceParts& key, const SliceParts& value) {
      WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
      if (column_family_id == 0) {
        b->rep_.push_back(static_cast<char>(kTypeValue));  //默认tag
      } else {
        b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
        PutVarint32(&b->rep_, column_family_id);
      }
      PutLengthPrefixedSliceParts(&b->rep_, key);
      PutLengthPrefixedSliceParts(&b->rep_, value);
      b->content_flags_.store(
          b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
          std::memory_order_relaxed);
    }
    

    // pika Iterate:在iterate的过程中,从WriteBatch中依次读取tag,db_key,db_value。
    //按照tag,执行相应的动作。比如,kTypeValue,则 执行handler->PutCF(column_family, key, value);
    //其中PutCF在对应的handler中执行。

    Status WriteBatch::Iterate(Handler* handler) const {
      Slice input(rep_);
      if (input.size() < WriteBatchInternal::kHeader) {
        return Status::Corruption("malformed WriteBatch (too small)");
      }
    
      input.remove_prefix(WriteBatchInternal::kHeader);
      Slice key, value, blob, xid;
      int found = 0;
      Status s;
      while (s.ok() && !input.empty() && handler->Continue()) {
        char tag = 0;
        uint32_t column_family = 0;  // default
    
        s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
                                     &blob, &xid);  //从用户提供的batch中获取tag,key,value:
        if (!s.ok()) {
          return s;
        }
    
        switch (tag) {
          case kTypeColumnFamilyValue:
          case kTypeValue:
            assert(content_flags_.load(std::memory_order_relaxed) &
                   (ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
            s = handler->PutCF(column_family, key, value);  //调用重写的DBNemoImpl::WriteWithOldKeyTTL中PutCF
            found++;
            break;
          case kTypeColumnFamilyDeletion:
          case kTypeDeletion:
            assert(content_flags_.load(std::memory_order_relaxed) &
                   (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
            s = handler->DeleteCF(column_family, key);
            found++;
            break;
    …………
          case kTypeNoop:
            break;
          default:
            return Status::Corruption("unknown WriteBatch tag");
        }
      }
      if (!s.ok()) {
        return s;
      }
      if (found != WriteBatchInternal::Count(this)) {
        return Status::Corruption("WriteBatch has wrong count");
      } else {
        return Status::OK();
      }
    }
    

    //iterator中方法具体实现

    Status DBNemoImpl::WriteWithOldKeyTTL(const WriteOptions& opts, WriteBatch* updates) {
    class Handler : public WriteBatch::Handler {
       public:
        WriteBatch updates_ttl;
        Status batch_rewrite_status;
    
        explicit Handler(Env* env, DB* db, char meta_prefix)
            : env_(env), db_(db),
              meta_prefix_(meta_prefix), version_(0),
              timestamp_(0), is_first_(true) {
                env_->GetCurrentTime(&now_);
              }
    
        virtual Status PutCF(uint32_t column_family_id, const Slice& key,
                             const Slice& value) override {
          std::string value_with_ver_ts;
    
          std::cout << " WriteWithOldKeyTTL PutCF" <<std::endl;
          if (is_first_) { //默认true
            bool find_meta = GetVersionAndTS(db_, meta_prefix_, key, &version_, &timestamp_); //(详细见下)key为db_key:h6ht_key=ht_field,第一次返回为false
            if (!find_meta) {
              std::cout <<  "Update version " << key.ToString() << ", meta not found, use now: " << now_ << std::endl;
              version_ = now_; //当db_key:h6ht_key=ht_field 第一次设置的版本号为当前时间unix时间戳:1586750336
            }
    //4Bytes的Version(用于秒删功能) + 4Bytes的Timestamp(用于记录我们给这个Hash表设置的超时时间的时间戳, 默认为0)
            int64_t curtime;
            if (env_->GetCurrentTime(&curtime).ok()) {
                if (timestamp_ != 0 && timestamp_ < curtime) {
                    std::cout << "Meta expired, version++: ";
                    version_++;  //过期则版本号加1
                    std::cout << version_ << std::endl;
                    std::cout << timestamp_ << std::endl;
                    timestamp_ = 0;
                }
            } else {
                timestamp_ = 0;
            }
    
            is_first_ = false; //设置为false,则第二次db_key:”Hht_key”将不会走true的逻辑
          }
    
          Status st = AppendVersionAndExpiredTime(value, &value_with_ver_ts,
                          env_, version_, timestamp_);    //拼接:value_with_ver_ts:ht_value+version+ttl(ttl为expired time,version为当前时间戳,只有过期才会+1)
          if (!st.ok()) {
            batch_rewrite_status = st;
          } else {
            WriteBatchInternal::Put(&updates_ttl, column_family_id, key,
                                    value_with_ver_ts);  // 写入WriteBatchInternal:中 db_key:h6ht_key=ht_field,value:value_with_ver_ts
    // updates_ttl作为传入write db的参数,
      // WriteBatch methods with column_family_id instead of ColumnFamilyHandle*
    //  static void Put(WriteBatch* batch, uint32_t column_family_id,
      //                const Slice& key, const Slice& value);
    
          }
          return Status::OK();
        }
        virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
                               const Slice& value) override {
          std::string value_with_ver_ts;
          Status st = AppendVersionAndTS(value, &value_with_ver_ts,
                          env_, 0, 0);
          if (!st.ok()) {
            batch_rewrite_status = st;
          } else {
            WriteBatchInternal::Merge(&updates_ttl, column_family_id, key,
                                      value_with_ver_ts);
          }
          return Status::OK();
        }
        virtual Status DeleteCF(uint32_t column_family_id,
                                const Slice& key) override {
          WriteBatchInternal::Delete(&updates_ttl, column_family_id, key);
          return Status::OK();
        }
        virtual void LogData(const Slice& blob) override {
          updates_ttl.PutLogData(blob);
        }
    
       private:
        Env* env_;
        DB* db_;
        char meta_prefix_;
        int64_t now_;
        uint32_t version_;
        int32_t timestamp_;
        bool is_first_;
      };
      //@ADD assign the db pointer
      Handler handler(GetEnv(), db_, meta_prefix_); //初始化handler  例子中meta_prefix_为const char kMetaPrefixHash = 'H';
    //
    //            std::cout << "bx bx bx -------------------------------------WriteWithOldKeyTTL------------------------" << std::endl;
      updates->Iterate(&handler);  //HSet命令将按照tag,调用定义的PutCF
      if (!handler.batch_rewrite_status.ok()) {
        return handler.batch_rewrite_status;
      } else {
        return db_->Write(opts, &(handler.updates_ttl));  //返回的updates_ttl写入rocksdb中
      }
    }
    

    //依次传入:
    //writebatch[0]:db_key:"h6ht_key=ht_field” 、db_val:ht_value 、is_first:true
    //writebatch[1]:db_key:”Hht_key”、db_val:1、is_first:false
    //其他参数:hash_db,h

    bool DBNemoImpl::GetVersionAndTS(DB* db, char meta_prefix,
          const Slice& key, uint32_t* version, int32_t* timestamp) {
      *version = *timestamp = 0;
    
      if (meta_prefix == kMetaPrefixKv) {
        return true;
      }
    
      std::string value;
      Status s;
    
        std::cout << "GetVersionAndTS, meta, " << s.ToString() << " key[0]: " << key.ToString() << " value: " << meta_prefix << std::endl;
      if (meta_prefix == key[0]) {
        s = db->Get(ReadOptions(), key, &value);//获取value为0
        std::cout << "GetVersionAndTS, meta, " << s.ToString() << " key: " << key.ToString() << " value: " << *((int64_t*)value.data()) << std::endl;
      } else {
        if (key.size() == 1) {
          // this is Seperator between meta and data, just ignore
          *version = *timestamp = 0;
          std::cout << "key size is 1: "<< key.ToString() << std::endl;
          return true;
        }
    
        std::string meta_key(1, meta_prefix);
        int32_t len = *((uint8_t*)(key.data()+1));
        meta_key.append(key.data()+2, len);
        s = db->Get(ReadOptions(), meta_key, &value);
        std::cout << "GetVersionAndTS, data, " << s.ToString() << " key: " << meta_key << " value: " << (*(int64_t*)value.data()) << std::endl;
      }
    
      if (s.ok()) { //not found s.ok为false
        *version = DecodeFixed32(value.data() + value.size() - kVersionLength - kTSLength); //返回0
        *timestamp = DecodeFixed32(value.data() + value.size() - kTSLength); 
        return true;
      }
    
      return false;
    
    }
    

    相关文章

      网友评论

          本文标题:Nemo-NemoRocksDb-Rocksdb调用

          本文链接:https://www.haomeiwen.com/subject/vqclnhtx.html