美文网首页
Nemo-NemoRocksDb-Rocksdb调用

Nemo-NemoRocksDb-Rocksdb调用

作者: 白馨_1114 | 来源:发表于2020-05-09 17:52 被阅读0次

以Hset为例:
vim .third/nemo/src/nemo_hash.cc

//hset ht_key ht_field ht_value

Status Nemo::HSet(const std::string &key, const std::string &field, const std::string &val) {
    if (key.size() >= KEY_MAX_LENGTH || key.size() <= 0) { //ht_key超过255字节,则报错
       return Status::InvalidArgument("Invalid key length");
    }

    Status s;

    RecordLock l(&mutex_hash_record_, key);  //避免多线程对同一个ht_key操作
    //MutexLock l(&mutex_hash_);
    rocksdb::WriteBatch writebatch;

    int ret = DoHSet(key, field, val, writebatch);  //(详细见下)组成writebatch:db_key:"h6ht_key=ht_field”  db_val:ht_value
    if (ret > 0) {
        if (IncrHLen(key, ret, writebatch) == -1) { // (详细见下)此时writebatch为:writebatch[0]:db_key:"h6ht_key=ht_field”  db_val:ht_value
                                                                        //writebatch[1]:db_key:”Hht_key” db_val:1
            //hash_record_.Unlock(key);
            return Status::Corruption("incrhlen error");
        }
    }
    s = hash_db_->WriteWithOldKeyTTL(rocksdb::WriteOptions(), &(writebatch));  //详细见下

    //hash_record_.Unlock(key);
    return s;
}

//传入参数:ht_key,ht_field,ht_value
//&writebatch最好使用指针,构建writebatch,返回对否新增field

int Nemo::DoHSet(const std::string &key, const std::string &field, const std::string &val, rocksdb::WriteBatch &writebatch) {
    int ret = 0;
    std::string dbval;
    Status s = HGet(key, field, &dbval); //(详细见下) 
    if (s.IsNotFound()) { // not found  //dbvalue为空
        std::string hkey = EncodeHashKey(key, field);
        writebatch.Put(hkey, val);  //(详细见下)Put会加上kTypeValue 标记,在后面的
        ret = 1;  //新增field
    } else {
        if(dbval != val){  //db_val不为空,但是不为ht_value
            std::string hkey = EncodeHashKey(key, field);
            writebatch.Put(hkey, val);  //hkey:"h6ht_key=ht_field"  val:ht_value
        }
        ret = 0;  //原本就存在该field,本次只是修改值的操作
    }
    std::cout << "bx test already excute DoHset" << std::endl;
    return ret;
}

//传入参数:ht_key,ht_field,ht_value
//获取ht_key+ht_field组成的db_key对应db_value,返回获取状态

Status Nemo::HGet(const std::string &key, const std::string &field, std::string *val) {
    if (key.size() >= KEY_MAX_LENGTH || key.size() <= 0) {
       return Status::InvalidArgument("Invalid key length");
    }

    std::string dbkey = EncodeHashKey(key, field); // (详细见下) dbkey="h6ht_key=ht_field"
    Status s = hash_db_->Get(rocksdb::ReadOptions(), dbkey, val);    //如果存在,则dbkey对应的value修改为:ht_value。如果不存在,则返回为nullptr。
    return s;
}

//传入参数:ht_key,ht_field 返回:h6ht_key=ht_field

inline std::string EncodeHashKey(const rocksdb::Slice &name, const rocksdb::Slice &key) {
    std::string buf;
    buf.append(1, DataType::kHash); 
    buf.append(1, (uint8_t)name.size());
    buf.append(name.data(), name.size());
    buf.append(1, '=');
    buf.append(key.data(), key.size());
    return buf;
}

// 传入参数:ht_key 返回ht_key对应field的个数

int Nemo::IncrHLen(const std::string &key, int64_t incr, rocksdb::WriteBatch &writebatch) {
    int64_t len = HLen(key);  //详细见下,获取ht_key对应的field数量,此时为0
    if (len == -1) {
        return -1;
    }
    len += incr; //加上incr , 此时为1
    std::string size_key = EncodeHsizeKey(key);  
    writebatch.Put(size_key, rocksdb::Slice((char *)&len, sizeof(int64_t)));  

   // if (len == 0) {
   //     //writebatch.Delete(size_key);
   //     writebatch.Merge(size_key, rocksdb::Slice((char *)&len, sizeof(int64_t)));
   // } else {
   //     writebatch.Put(size_key, rocksdb::Slice((char *)&len, sizeof(int64_t)));
   // }
    return 0;
}

//获取ht_key对应的中field个数

int64_t Nemo::HLen(const std::string &key) {
    std::string size_key = EncodeHsizeKey(key);  //构建 size_key:Hht_key
    std::string val;
    Status s;

    s = hash_db_->Get(rocksdb::ReadOptions(), size_key, &val);  //获取value中包含filed的数量
    if (s.IsNotFound()) {
        return 0;
    } else if(!s.ok()) {
        return -1;
    } else {
        if (val.size() != sizeof(uint64_t)) {
            return 0;
        }
        int64_t ret = *(int64_t *)val.data();  //   取里面的数据硬转,因为它存储的是字符类。而不是按数字存储的。
        return ret < 0? 0 : ret;
    }
}

const char kMetaPrefixKv = '\0';
const char kMetaPrefixHash = 'H';
const char kMetaPrefixZset = 'Z';
const char kMetaPrefixSet = 'S';
const char kMetaPrefixList = 'L’;

//WriteBatch::Put

void WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
                     const Slice& value) {
  WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value);
}

void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
                             const SliceParts& key, const SliceParts& value) {
  WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
  if (column_family_id == 0) {
    b->rep_.push_back(static_cast<char>(kTypeValue));  //默认tag
  } else {
    b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
    PutVarint32(&b->rep_, column_family_id);
  }
  PutLengthPrefixedSliceParts(&b->rep_, key);
  PutLengthPrefixedSliceParts(&b->rep_, value);
  b->content_flags_.store(
      b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
      std::memory_order_relaxed);
}

// pika Iterate:在iterate的过程中,从WriteBatch中依次读取tag,db_key,db_value。
//按照tag,执行相应的动作。比如,kTypeValue,则 执行handler->PutCF(column_family, key, value);
//其中PutCF在对应的handler中执行。

Status WriteBatch::Iterate(Handler* handler) const {
  Slice input(rep_);
  if (input.size() < WriteBatchInternal::kHeader) {
    return Status::Corruption("malformed WriteBatch (too small)");
  }

  input.remove_prefix(WriteBatchInternal::kHeader);
  Slice key, value, blob, xid;
  int found = 0;
  Status s;
  while (s.ok() && !input.empty() && handler->Continue()) {
    char tag = 0;
    uint32_t column_family = 0;  // default

    s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
                                 &blob, &xid);  //从用户提供的batch中获取tag,key,value:
    if (!s.ok()) {
      return s;
    }

    switch (tag) {
      case kTypeColumnFamilyValue:
      case kTypeValue:
        assert(content_flags_.load(std::memory_order_relaxed) &
               (ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
        s = handler->PutCF(column_family, key, value);  //调用重写的DBNemoImpl::WriteWithOldKeyTTL中PutCF
        found++;
        break;
      case kTypeColumnFamilyDeletion:
      case kTypeDeletion:
        assert(content_flags_.load(std::memory_order_relaxed) &
               (ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
        s = handler->DeleteCF(column_family, key);
        found++;
        break;
…………
      case kTypeNoop:
        break;
      default:
        return Status::Corruption("unknown WriteBatch tag");
    }
  }
  if (!s.ok()) {
    return s;
  }
  if (found != WriteBatchInternal::Count(this)) {
    return Status::Corruption("WriteBatch has wrong count");
  } else {
    return Status::OK();
  }
}

//iterator中方法具体实现

Status DBNemoImpl::WriteWithOldKeyTTL(const WriteOptions& opts, WriteBatch* updates) {
class Handler : public WriteBatch::Handler {
   public:
    WriteBatch updates_ttl;
    Status batch_rewrite_status;

    explicit Handler(Env* env, DB* db, char meta_prefix)
        : env_(env), db_(db),
          meta_prefix_(meta_prefix), version_(0),
          timestamp_(0), is_first_(true) {
            env_->GetCurrentTime(&now_);
          }

    virtual Status PutCF(uint32_t column_family_id, const Slice& key,
                         const Slice& value) override {
      std::string value_with_ver_ts;

      std::cout << " WriteWithOldKeyTTL PutCF" <<std::endl;
      if (is_first_) { //默认true
        bool find_meta = GetVersionAndTS(db_, meta_prefix_, key, &version_, &timestamp_); //(详细见下)key为db_key:h6ht_key=ht_field,第一次返回为false
        if (!find_meta) {
          std::cout <<  "Update version " << key.ToString() << ", meta not found, use now: " << now_ << std::endl;
          version_ = now_; //当db_key:h6ht_key=ht_field 第一次设置的版本号为当前时间unix时间戳:1586750336
        }
//4Bytes的Version(用于秒删功能) + 4Bytes的Timestamp(用于记录我们给这个Hash表设置的超时时间的时间戳, 默认为0)
        int64_t curtime;
        if (env_->GetCurrentTime(&curtime).ok()) {
            if (timestamp_ != 0 && timestamp_ < curtime) {
                std::cout << "Meta expired, version++: ";
                version_++;  //过期则版本号加1
                std::cout << version_ << std::endl;
                std::cout << timestamp_ << std::endl;
                timestamp_ = 0;
            }
        } else {
            timestamp_ = 0;
        }

        is_first_ = false; //设置为false,则第二次db_key:”Hht_key”将不会走true的逻辑
      }

      Status st = AppendVersionAndExpiredTime(value, &value_with_ver_ts,
                      env_, version_, timestamp_);    //拼接:value_with_ver_ts:ht_value+version+ttl(ttl为expired time,version为当前时间戳,只有过期才会+1)
      if (!st.ok()) {
        batch_rewrite_status = st;
      } else {
        WriteBatchInternal::Put(&updates_ttl, column_family_id, key,
                                value_with_ver_ts);  // 写入WriteBatchInternal:中 db_key:h6ht_key=ht_field,value:value_with_ver_ts
// updates_ttl作为传入write db的参数,
  // WriteBatch methods with column_family_id instead of ColumnFamilyHandle*
//  static void Put(WriteBatch* batch, uint32_t column_family_id,
  //                const Slice& key, const Slice& value);

      }
      return Status::OK();
    }
    virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
                           const Slice& value) override {
      std::string value_with_ver_ts;
      Status st = AppendVersionAndTS(value, &value_with_ver_ts,
                      env_, 0, 0);
      if (!st.ok()) {
        batch_rewrite_status = st;
      } else {
        WriteBatchInternal::Merge(&updates_ttl, column_family_id, key,
                                  value_with_ver_ts);
      }
      return Status::OK();
    }
    virtual Status DeleteCF(uint32_t column_family_id,
                            const Slice& key) override {
      WriteBatchInternal::Delete(&updates_ttl, column_family_id, key);
      return Status::OK();
    }
    virtual void LogData(const Slice& blob) override {
      updates_ttl.PutLogData(blob);
    }

   private:
    Env* env_;
    DB* db_;
    char meta_prefix_;
    int64_t now_;
    uint32_t version_;
    int32_t timestamp_;
    bool is_first_;
  };
  //@ADD assign the db pointer
  Handler handler(GetEnv(), db_, meta_prefix_); //初始化handler  例子中meta_prefix_为const char kMetaPrefixHash = 'H';
//
//            std::cout << "bx bx bx -------------------------------------WriteWithOldKeyTTL------------------------" << std::endl;
  updates->Iterate(&handler);  //HSet命令将按照tag,调用定义的PutCF
  if (!handler.batch_rewrite_status.ok()) {
    return handler.batch_rewrite_status;
  } else {
    return db_->Write(opts, &(handler.updates_ttl));  //返回的updates_ttl写入rocksdb中
  }
}

//依次传入:
//writebatch[0]:db_key:"h6ht_key=ht_field” 、db_val:ht_value 、is_first:true
//writebatch[1]:db_key:”Hht_key”、db_val:1、is_first:false
//其他参数:hash_db,h

bool DBNemoImpl::GetVersionAndTS(DB* db, char meta_prefix,
      const Slice& key, uint32_t* version, int32_t* timestamp) {
  *version = *timestamp = 0;

  if (meta_prefix == kMetaPrefixKv) {
    return true;
  }

  std::string value;
  Status s;

    std::cout << "GetVersionAndTS, meta, " << s.ToString() << " key[0]: " << key.ToString() << " value: " << meta_prefix << std::endl;
  if (meta_prefix == key[0]) {
    s = db->Get(ReadOptions(), key, &value);//获取value为0
    std::cout << "GetVersionAndTS, meta, " << s.ToString() << " key: " << key.ToString() << " value: " << *((int64_t*)value.data()) << std::endl;
  } else {
    if (key.size() == 1) {
      // this is Seperator between meta and data, just ignore
      *version = *timestamp = 0;
      std::cout << "key size is 1: "<< key.ToString() << std::endl;
      return true;
    }

    std::string meta_key(1, meta_prefix);
    int32_t len = *((uint8_t*)(key.data()+1));
    meta_key.append(key.data()+2, len);
    s = db->Get(ReadOptions(), meta_key, &value);
    std::cout << "GetVersionAndTS, data, " << s.ToString() << " key: " << meta_key << " value: " << (*(int64_t*)value.data()) << std::endl;
  }

  if (s.ok()) { //not found s.ok为false
    *version = DecodeFixed32(value.data() + value.size() - kVersionLength - kTSLength); //返回0
    *timestamp = DecodeFixed32(value.data() + value.size() - kTSLength); 
    return true;
  }

  return false;

}

相关文章

网友评论

      本文标题:Nemo-NemoRocksDb-Rocksdb调用

      本文链接:https://www.haomeiwen.com/subject/vqclnhtx.html