MemTable
前面进行操作流程介绍,API 进行读取时首先读取 MemTable,然后读取 Immutable MemTable,接着读取 SSTable;
而进行写入时会先写入 WALLog,然后写入 MemTable。
读写都会涉及 MemTable 模块,WALLog 文件和 MemTable 文件是一一对应的关系,只有当 MemTable 大小超过设定的阈值,成功生成 SSTable 且刷新到磁盘之后才会将对应的 WALLog 文件删除,此时会开始使用一个新的 MemTable。如果涉及崩溃恢复,则需要从 WALLog文件恢复一个 MemTable 文件。
每个LevelDB实例最多会维护 2个MemTable:
mem_
- Immutable MemTable:
imm_
mem_
可以读写,imm_
只读,最新写入的数据都会保存到 mem_
中。当 mem_
的大小超过 write_buffer_size
时,LevelDB 就会将其切换成 imm_
,并生成新的 mem_
。 LevelDB 的后台线程会将 imm_
compact 成 SSTable 保存在磁盘上。 如果前台的写入速度很快,有可能出现 mem_
的大小已经超过 write_buffer_size
,但是前一个 imm_
还没有被 compact 到磁盘上,无法切换 MemTable,此时就会阻塞写请求。
MemTable 主要支持的操作有:
- 插入单条记录(Add)
- 查询单条记录(Get)
- 遍历查询,生成一个迭代器,该迭代器可以遍历 MemTable
LevelDB 的 MemTable 的主要功能是将:内部编码、内存分配(Arena)和 SkipList
封装在一起。具体代码详细见:db/memtable.h 和 db/memtable.cc
成员,构造函数,析构函数
class MemTable
{
public:
MemTable::MemTable(const InternalKeyComparator &comparator)
: comparator_(comparator), refs_(0), table_(comparator_, &arena_)
{
}
private:
MemTable::~MemTable() { assert(refs_ == 0); }
private:
friend class MemTableIterator;
struct KeyComparator
{
const InternalKeyComparator comparator;
explicit KeyComparator(const InternalKeyComparator &c) : comparator(c) {}
int operator()(const char *a, const char *b) const;
};
typedef SkipList<const char *, KeyComparator> Table;
KeyComparator comparator_;
int refs_;
Arena arena_;
Table table_; // 对skiplist的封装
}
MemTable 插入
插入通过调用Add
方法实现。主要计算该次插入需要的内存大小,分配内存并将数据按照如下面图示的顺序写入,然后调用SkipList的插入方法将数据插入。
typedef uint64_t SequenceNumber;
enum ValueType
{
kTypeDeletion = 0x0,
kTypeValue = 0x1
};
void MemTable::Add(SequenceNumber s, ValueType type, const Slice &key, const Slice &value)
{
size_t key_size = key.size();
size_t val_size = value.size();
size_t internal_key_size = key_size + 8;
const size_t encoded_len = VarintLength(internal_key_size) +
internal_key_size + VarintLength(val_size) +
val_size;
char *buf = arena_.Allocate(encoded_len);
char *p = EncodeVarint32(buf, internal_key_size);
std::memcpy(p, key.data(), key_size);
p += key_size; EncodeFixed64(p, (s << 8) | type);
p += 8; p = EncodeVarint32(p, val_size);
std::memcpy(p, value.data(), val_size);
assert(p + val_size == buf + encoded_len);
table_.Insert(buf);
}
LevelDB 对于元素的删除是采用惰性删除法,标记ValueType == kTypeDeletion
MemTbale 存储 key-value 格式如下:
- kLength 变长的 32 位整数 varint 的编码,表示 internal_key 的长度。
- internal_key
由user_key + seq(7字节) + type(1字节)组成,user_key 表示上层写入的 key,比如"hello" "world" 键值对,那么user_key 表示"hello"- seq
- type
- vLength
变长的 32 位整数 varint 的编码,表示user_value的长度。 - user_value
Get 接口
bool MemTable::Get(const LookupKey &key, std::string *value, Status *s)
{
Slice memkey = key.memtable_key();
// 生成一个 SkipList 迭代器
Table::Iterator iter(&table_);
// 在 SkipList 中查找
iter.Seek(memkey.data());
if (iter.Valid())
{
const char *entry = iter.key();
uint32_t key_length;
// 获取key的值
const char *key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
if (comparator_.comparator.user_comparator()->Compare(Slice(key_ptr, key_length - 8), key.user_key()) == 0)
{
// 如果判断key的值完全相同,则继续取出 ValueType,判断是否已经删除
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff))
{
case kTypeValue:
{
// 如果判断 ValueType 为增加一个键值对,则取出值并返回为 true
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size());
return true;
}
case kTypeDeletion:
// 如果判断 ValueType 为删除一个键值对,则将状态值赋值为 NotFound,并且返回为 true
*s = Status::NotFound(Slice());
return true;
}
}
}
return false;
}
MemTable 迭代器
迭代器通过 NewIterator
方法生成,生成迭代器之后便可以遍历该 MemTable。
Iterator *MemTable::NewIterator() { return new MemTableIterator(&table_); }
使用 NewIterator
方法生成一个 MemTable 迭代器,实际返回的是一个 MemTableIterator
类,生成该类时会将 的table_
成员变量传入MemTableIterator
构造函数
class MemTableIterator : public Iterator
{
public:
explicit MemTableIterator(MemTable::Table *table) : iter_(table) {}
MemTableIterator(const MemTableIterator &) = delete;
MemTableIterator &operator=(const MemTableIterator &) = delete;
~MemTableIterator() override = default;
// 判断迭代器是否值相一个数据,或非法
bool Valid() const override { return iter_.Valid(); }
// 将迭代器指向集合中 key 为 k 的元素
void Seek(const Slice &k) override { iter_.Seek(EncodeKey(&tmp_, k)); }
// 将迭代器指向集合中的第一个元素
void SeekToFirst() override { iter_.SeekToFirst(); }
// 将迭代器指向集合中的最后一个元素
void SeekToLast() override { iter_.SeekToLast(); }
// 将迭代器指向下一个元素
void Next() override { iter_.Next(); }
// 将迭代器指向前一个元素
void Prev() override { iter_.Prev(); }
// 返回迭代器目前指向的元素的 key
Slice key() const override { return GetLengthPrefixedSlice(iter_.key()); }
// 返回迭代器目前指向元素的 data
Slice value() const override
{
Slice key_slice = GetLengthPrefixedSlice(iter_.key());
return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
}
Status status() const override { return Status::OK(); }
private:
MemTable::Table::Iterator iter_;
std::string tmp_;
};
可以看到,MemTable 迭代器的方法实际都是调用自 SkipList 中的迭代器。
KeyComparator
struct KeyComparator
{
const InternalKeyComparator comparator;
explicit KeyComparator(const InternalKeyComparator &c) : comparator(c) {}
int operator()(const char *a, const char *b) const;
};
class InternalKeyComparator : public Comparator
{
private:
const Comparator *user_comparator_;
public:
explicit InternalKeyComparator(const Comparator *c) : user_comparator_(c) {}
const char *Name() const override;
int Compare(const Slice &a, const Slice &b) const override;
void FindShortestSeparator(std::string *start, const Slice &limit) const override;
void FindShortSuccessor(std::string *key) const override;
const Comparator *user_comparator() const { return user_comparator_; }
int Compare(const InternalKey &a, const InternalKey &b) const;
}
int InternalKeyComparator::Compare(const InternalKey &a, const InternalKey &b) const
{
return Compare(a.Encode(), b.Encode());
}
int InternalKeyComparator::Compare(const Slice &akey, const Slice &bkey) const
{
int r = user_comparator_->Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
if (r == 0)
{
const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
if (anum > bnum)
{
r = -1;
}
else if (anum < bnum)
{
r = +1;
}
}
return r;
}
KeyComparator 是对InternalKeyComparator
的包装, 负责从 MemTable 中保存的 key 中提取出 internal_key(user_key + seq(7字节) + type(1字节)),最终comparator_.comparator.user_comparator()->Compare(Slice(key_ptr, key_length - 8), key.user_key()
调用的方法是InternalKeyComparator::Compare
整体的排序规则如下:
- 优先按照 user key 进行排序。
- user key 相同的按照 seq 降序排序。
- user key 和 seq 相同的按照 type 降序排序(逻辑上不会达到这一步,因为一个 LevelDB 的 sequence 是单调递增的)。
所以,在一个 MemTable 中,相同的 user key 的多个版本,新的排在前面,旧的排在后面。
网友评论