以Hset为例:
vim .third/nemo/src/nemo_hash.cc
//hset ht_key ht_field ht_value
Status Nemo::HSet(const std::string &key, const std::string &field, const std::string &val) {
if (key.size() >= KEY_MAX_LENGTH || key.size() <= 0) { //ht_key超过255字节,则报错
return Status::InvalidArgument("Invalid key length");
}
Status s;
RecordLock l(&mutex_hash_record_, key); //避免多线程对同一个ht_key操作
//MutexLock l(&mutex_hash_);
rocksdb::WriteBatch writebatch;
int ret = DoHSet(key, field, val, writebatch); //(详细见下)组成writebatch:db_key:"h6ht_key=ht_field” db_val:ht_value
if (ret > 0) {
if (IncrHLen(key, ret, writebatch) == -1) { // (详细见下)此时writebatch为:writebatch[0]:db_key:"h6ht_key=ht_field” db_val:ht_value
//writebatch[1]:db_key:”Hht_key” db_val:1
//hash_record_.Unlock(key);
return Status::Corruption("incrhlen error");
}
}
s = hash_db_->WriteWithOldKeyTTL(rocksdb::WriteOptions(), &(writebatch)); //详细见下
//hash_record_.Unlock(key);
return s;
}
//传入参数:ht_key,ht_field,ht_value
//&writebatch最好使用指针,构建writebatch,返回对否新增field
int Nemo::DoHSet(const std::string &key, const std::string &field, const std::string &val, rocksdb::WriteBatch &writebatch) {
int ret = 0;
std::string dbval;
Status s = HGet(key, field, &dbval); //(详细见下)
if (s.IsNotFound()) { // not found //dbvalue为空
std::string hkey = EncodeHashKey(key, field);
writebatch.Put(hkey, val); //(详细见下)Put会加上kTypeValue 标记,在后面的
ret = 1; //新增field
} else {
if(dbval != val){ //db_val不为空,但是不为ht_value
std::string hkey = EncodeHashKey(key, field);
writebatch.Put(hkey, val); //hkey:"h6ht_key=ht_field" val:ht_value
}
ret = 0; //原本就存在该field,本次只是修改值的操作
}
std::cout << "bx test already excute DoHset" << std::endl;
return ret;
}
//传入参数:ht_key,ht_field,ht_value
//获取ht_key+ht_field组成的db_key对应db_value,返回获取状态
Status Nemo::HGet(const std::string &key, const std::string &field, std::string *val) {
if (key.size() >= KEY_MAX_LENGTH || key.size() <= 0) {
return Status::InvalidArgument("Invalid key length");
}
std::string dbkey = EncodeHashKey(key, field); // (详细见下) dbkey="h6ht_key=ht_field"
Status s = hash_db_->Get(rocksdb::ReadOptions(), dbkey, val); //如果存在,则dbkey对应的value修改为:ht_value。如果不存在,则返回为nullptr。
return s;
}
//传入参数:ht_key,ht_field 返回:h6ht_key=ht_field
inline std::string EncodeHashKey(const rocksdb::Slice &name, const rocksdb::Slice &key) {
std::string buf;
buf.append(1, DataType::kHash);
buf.append(1, (uint8_t)name.size());
buf.append(name.data(), name.size());
buf.append(1, '=');
buf.append(key.data(), key.size());
return buf;
}
// 传入参数:ht_key 返回ht_key对应field的个数
int Nemo::IncrHLen(const std::string &key, int64_t incr, rocksdb::WriteBatch &writebatch) {
int64_t len = HLen(key); //详细见下,获取ht_key对应的field数量,此时为0
if (len == -1) {
return -1;
}
len += incr; //加上incr , 此时为1
std::string size_key = EncodeHsizeKey(key);
writebatch.Put(size_key, rocksdb::Slice((char *)&len, sizeof(int64_t)));
// if (len == 0) {
// //writebatch.Delete(size_key);
// writebatch.Merge(size_key, rocksdb::Slice((char *)&len, sizeof(int64_t)));
// } else {
// writebatch.Put(size_key, rocksdb::Slice((char *)&len, sizeof(int64_t)));
// }
return 0;
}
//获取ht_key对应的中field个数
int64_t Nemo::HLen(const std::string &key) {
std::string size_key = EncodeHsizeKey(key); //构建 size_key:Hht_key
std::string val;
Status s;
s = hash_db_->Get(rocksdb::ReadOptions(), size_key, &val); //获取value中包含filed的数量
if (s.IsNotFound()) {
return 0;
} else if(!s.ok()) {
return -1;
} else {
if (val.size() != sizeof(uint64_t)) {
return 0;
}
int64_t ret = *(int64_t *)val.data(); // 取里面的数据硬转,因为它存储的是字符类。而不是按数字存储的。
return ret < 0? 0 : ret;
}
}
const char kMetaPrefixKv = '\0';
const char kMetaPrefixHash = 'H';
const char kMetaPrefixZset = 'Z';
const char kMetaPrefixSet = 'S';
const char kMetaPrefixList = 'L’;
//WriteBatch::Put
void WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) {
WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value);
}
void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
const SliceParts& key, const SliceParts& value) {
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeValue)); //默认tag
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSliceParts(&b->rep_, key);
PutLengthPrefixedSliceParts(&b->rep_, value);
b->content_flags_.store(
b->content_flags_.load(std::memory_order_relaxed) | ContentFlags::HAS_PUT,
std::memory_order_relaxed);
}
// pika Iterate:在iterate的过程中,从WriteBatch中依次读取tag,db_key,db_value。
//按照tag,执行相应的动作。比如,kTypeValue,则 执行handler->PutCF(column_family, key, value);
//其中PutCF在对应的handler中执行。
Status WriteBatch::Iterate(Handler* handler) const {
Slice input(rep_);
if (input.size() < WriteBatchInternal::kHeader) {
return Status::Corruption("malformed WriteBatch (too small)");
}
input.remove_prefix(WriteBatchInternal::kHeader);
Slice key, value, blob, xid;
int found = 0;
Status s;
while (s.ok() && !input.empty() && handler->Continue()) {
char tag = 0;
uint32_t column_family = 0; // default
s = ReadRecordFromWriteBatch(&input, &tag, &column_family, &key, &value,
&blob, &xid); //从用户提供的batch中获取tag,key,value:
if (!s.ok()) {
return s;
}
switch (tag) {
case kTypeColumnFamilyValue:
case kTypeValue:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_PUT));
s = handler->PutCF(column_family, key, value); //调用重写的DBNemoImpl::WriteWithOldKeyTTL中PutCF
found++;
break;
case kTypeColumnFamilyDeletion:
case kTypeDeletion:
assert(content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_DELETE));
s = handler->DeleteCF(column_family, key);
found++;
break;
…………
case kTypeNoop:
break;
default:
return Status::Corruption("unknown WriteBatch tag");
}
}
if (!s.ok()) {
return s;
}
if (found != WriteBatchInternal::Count(this)) {
return Status::Corruption("WriteBatch has wrong count");
} else {
return Status::OK();
}
}
//iterator中方法具体实现
Status DBNemoImpl::WriteWithOldKeyTTL(const WriteOptions& opts, WriteBatch* updates) {
class Handler : public WriteBatch::Handler {
public:
WriteBatch updates_ttl;
Status batch_rewrite_status;
explicit Handler(Env* env, DB* db, char meta_prefix)
: env_(env), db_(db),
meta_prefix_(meta_prefix), version_(0),
timestamp_(0), is_first_(true) {
env_->GetCurrentTime(&now_);
}
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
std::string value_with_ver_ts;
std::cout << " WriteWithOldKeyTTL PutCF" <<std::endl;
if (is_first_) { //默认true
bool find_meta = GetVersionAndTS(db_, meta_prefix_, key, &version_, ×tamp_); //(详细见下)key为db_key:h6ht_key=ht_field,第一次返回为false
if (!find_meta) {
std::cout << "Update version " << key.ToString() << ", meta not found, use now: " << now_ << std::endl;
version_ = now_; //当db_key:h6ht_key=ht_field 第一次设置的版本号为当前时间unix时间戳:1586750336
}
//4Bytes的Version(用于秒删功能) + 4Bytes的Timestamp(用于记录我们给这个Hash表设置的超时时间的时间戳, 默认为0)
int64_t curtime;
if (env_->GetCurrentTime(&curtime).ok()) {
if (timestamp_ != 0 && timestamp_ < curtime) {
std::cout << "Meta expired, version++: ";
version_++; //过期则版本号加1
std::cout << version_ << std::endl;
std::cout << timestamp_ << std::endl;
timestamp_ = 0;
}
} else {
timestamp_ = 0;
}
is_first_ = false; //设置为false,则第二次db_key:”Hht_key”将不会走true的逻辑
}
Status st = AppendVersionAndExpiredTime(value, &value_with_ver_ts,
env_, version_, timestamp_); //拼接:value_with_ver_ts:ht_value+version+ttl(ttl为expired time,version为当前时间戳,只有过期才会+1)
if (!st.ok()) {
batch_rewrite_status = st;
} else {
WriteBatchInternal::Put(&updates_ttl, column_family_id, key,
value_with_ver_ts); // 写入WriteBatchInternal:中 db_key:h6ht_key=ht_field,value:value_with_ver_ts
// updates_ttl作为传入write db的参数,
// WriteBatch methods with column_family_id instead of ColumnFamilyHandle*
// static void Put(WriteBatch* batch, uint32_t column_family_id,
// const Slice& key, const Slice& value);
}
return Status::OK();
}
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
std::string value_with_ver_ts;
Status st = AppendVersionAndTS(value, &value_with_ver_ts,
env_, 0, 0);
if (!st.ok()) {
batch_rewrite_status = st;
} else {
WriteBatchInternal::Merge(&updates_ttl, column_family_id, key,
value_with_ver_ts);
}
return Status::OK();
}
virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
WriteBatchInternal::Delete(&updates_ttl, column_family_id, key);
return Status::OK();
}
virtual void LogData(const Slice& blob) override {
updates_ttl.PutLogData(blob);
}
private:
Env* env_;
DB* db_;
char meta_prefix_;
int64_t now_;
uint32_t version_;
int32_t timestamp_;
bool is_first_;
};
//@ADD assign the db pointer
Handler handler(GetEnv(), db_, meta_prefix_); //初始化handler 例子中meta_prefix_为const char kMetaPrefixHash = 'H';
//
// std::cout << "bx bx bx -------------------------------------WriteWithOldKeyTTL------------------------" << std::endl;
updates->Iterate(&handler); //HSet命令将按照tag,调用定义的PutCF
if (!handler.batch_rewrite_status.ok()) {
return handler.batch_rewrite_status;
} else {
return db_->Write(opts, &(handler.updates_ttl)); //返回的updates_ttl写入rocksdb中
}
}
//依次传入:
//writebatch[0]:db_key:"h6ht_key=ht_field” 、db_val:ht_value 、is_first:true
//writebatch[1]:db_key:”Hht_key”、db_val:1、is_first:false
//其他参数:hash_db,h
bool DBNemoImpl::GetVersionAndTS(DB* db, char meta_prefix,
const Slice& key, uint32_t* version, int32_t* timestamp) {
*version = *timestamp = 0;
if (meta_prefix == kMetaPrefixKv) {
return true;
}
std::string value;
Status s;
std::cout << "GetVersionAndTS, meta, " << s.ToString() << " key[0]: " << key.ToString() << " value: " << meta_prefix << std::endl;
if (meta_prefix == key[0]) {
s = db->Get(ReadOptions(), key, &value);//获取value为0
std::cout << "GetVersionAndTS, meta, " << s.ToString() << " key: " << key.ToString() << " value: " << *((int64_t*)value.data()) << std::endl;
} else {
if (key.size() == 1) {
// this is Seperator between meta and data, just ignore
*version = *timestamp = 0;
std::cout << "key size is 1: "<< key.ToString() << std::endl;
return true;
}
std::string meta_key(1, meta_prefix);
int32_t len = *((uint8_t*)(key.data()+1));
meta_key.append(key.data()+2, len);
s = db->Get(ReadOptions(), meta_key, &value);
std::cout << "GetVersionAndTS, data, " << s.ToString() << " key: " << meta_key << " value: " << (*(int64_t*)value.data()) << std::endl;
}
if (s.ok()) { //not found s.ok为false
*version = DecodeFixed32(value.data() + value.size() - kVersionLength - kTSLength); //返回0
*timestamp = DecodeFixed32(value.data() + value.size() - kTSLength);
return true;
}
return false;
}
网友评论