好久没有分析brpc相关的源码了,这次分析下mru相关的实现,在分析此之前,先分析下lru,以leveldb中的lru实现来分析,不涉及leveldb中的其他知识。
此外还有另一个原因,在review已上线一年半项目中的一些代码时,发现有些业务场景使用错了lru,而且lru实现的复杂度较高,没有很好管理资源的生命周期,导致可能出现再次访问已经被释放,数据被重置的或被分配给其他业务模块的资源,虽然新项目中暂时没有使用,后期会改成其他实现,即类似leveldb中的lru并且实现资源回收功能。
有些基础知识不再说明,对于需要cache的数据,在一定空间的情况下,当没有空间时要替换掉并cache住新的数据。之前在操作系统这门课程中学习过相关的知识,不过好像目前只清楚lru,即Least Recent Used,网上有较多的解释和例子关于lru的,这里不再说明。lru在leveldb中和redis中有使用场景,对于能写出bugfree能适用于生产环境中的工业级代码,这个可能不是那么容易。
要实现lru功能,需要考虑到哪些方面?比如时间和空间复杂度,资源生命周期的管理,不造成访问已被回收的,或者访问已被重新分配的资源。另外,关于如何组织lru中各元素和需要哪些字段,选择哪种数据结构,一般对于insert/remove操作要快。如果用双向链表,那么这些操作都是常数时间,但是查找一个元素可能要遍历,当然可以借助hashtable这样的数据结构,在leveldb中,元素节点使用结构:
40 // An entry is a variable length heap-allocated structure. Entries
41 // are kept in a circular doubly linked list ordered by access time.
42 struct LRUHandle {
43 void* value;
44 void (*deleter)(const Slice&, void* value);
45 LRUHandle* next_hash;
46 LRUHandle* next;
47 LRUHandle* prev;
48 size_t charge; // TODO(opt): Only allow uint32_t?
49 size_t key_length;
50 bool in_cache; // Whether entry is in the cache.
51 uint32_t refs; // References, including cache reference, if present.
52 uint32_t hash; // Hash of key(); used for fast sharding and comparisons
53 char key_data[1]; // Beginning of key
54
55 Slice key() const {
56 // next_ is only equal to this if the LRU handle is the list head of an
57 // empty list. List heads never have meaningful keys.
58 assert(next != this);
59
60 return Slice(key_data, key_length);
61 }
62 };
重点需要关注的是next/prev/next_hash/refs这几个数据成员。leveldb中自己实现了一个hashtable,比较简单:
69 class HandleTable {
70 public:
71 HandleTable() : length_(0), elems_(0), list_(nullptr) { Resize(); }
72 ~HandleTable() { delete[] list_; }
104 private:
105 // The table consists of an array of buckets where each bucket is
106 // a linked list of cache entries that hash into the bucket.
107 uint32_t length_;
108 uint32_t elems_;
109 LRUHandle** list_;
147 };
构造hashtable的时候会进行resize,初始大小为4个桶,后期当插入元素时如果已有元素个数大于桶的大小时进行resize:
122 void Resize() {
123 uint32_t new_length = 4;
124 while (new_length < elems_) {
125 new_length *= 2;
126 }
127 LRUHandle** new_list = new LRUHandle*[new_length];
128 memset(new_list, 0, sizeof(new_list[0]) * new_length);
129 uint32_t count = 0;
130 for (uint32_t i = 0; i < length_; i++) {
131 LRUHandle* h = list_[i];
132 while (h != nullptr) {
133 LRUHandle* next = h->next_hash;
134 uint32_t hash = h->hash;
135 LRUHandle** ptr = &new_list[hash & (new_length - 1)];
136 h->next_hash = *ptr;
137 *ptr = h;
138 h = next;
139 count++;
140 }
141 }
142 assert(elems_ == count);
143 delete[] list_;
144 list_ = new_list;
145 length_ = new_length;
146 }
以上resize实现会处理每个桶,对此桶内的元素逐个进行rehash操作,hash到同一个bucket的节点进行头插法;不过在扩容时是以2的倍数增大,这样在rehash时直接与一下求hash的操作更快。
hashtable的其他操作如insert/remove:
74 LRUHandle* Lookup(const Slice& key, uint32_t hash) {
75 return *FindPointer(key, hash);
76 }
77
78 LRUHandle* Insert(LRUHandle* h) {
79 LRUHandle** ptr = FindPointer(h->key(), h->hash);
80 LRUHandle* old = *ptr;
81 h->next_hash = (old == nullptr ? nullptr : old->next_hash);
82 *ptr = h;
83 if (old == nullptr) {
84 ++elems_;
85 if (elems_ > length_) {
86 // Since each cache entry is fairly large, we aim for a small
87 // average linked list length (<= 1).
88 Resize();
89 }
90 }
91 return old;
92 }
93
94 LRUHandle* Remove(const Slice& key, uint32_t hash) {
95 LRUHandle** ptr = FindPointer(key, hash);
96 LRUHandle* result = *ptr;
97 if (result != nullptr) {
98 *ptr = result->next_hash;
99 --elems_;
100 }
101 return result;
102 }
111 // Return a pointer to slot that points to a cache entry that
112 // matches key/hash. If there is no such cache entry, return a
113 // pointer to the trailing slot in the corresponding linked list.
114 LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
115 LRUHandle** ptr = &list_[hash & (length_ - 1)];
116 while (*ptr != nullptr && ((*ptr)->hash != hash || key != (*ptr)->key())) {
117 ptr = &(*ptr)->next_hash;
118 }
119 return ptr;
120 }
最终都会调用FindPointer方法,insert插入新(更新)节点并返回旧的,并可能存在resize操作;以上的hashtable和LRUHandle是实现lru的基本元素。
149 // A single shard of sharded cache.
150 class LRUCache {
151 public:
152 LRUCache();
153 ~LRUCache();
185 // Dummy head of LRU list.
186 // lru.prev is newest entry, lru.next is oldest entry.
187 // Entries have refs==1 and in_cache==true.
188 LRUHandle lru_ GUARDED_BY(mutex_);
189
190 // Dummy head of in-use list.
191 // Entries are in use by clients, and have refs >= 2 and in_cache==true.
192 LRUHandle in_use_ GUARDED_BY(mutex_);
193
194 HandleTable table_ GUARDED_BY(mutex_);
195 };
带头节点的指针in_use_和lru_,当insert一个节点时,自增refs一次,它是位于in_use_列表中的,如果支持cache则会设置上相应的标志,并insert到in_use_中,此时还会判断当前使用的空间是否达到上限,是的话则从lru_列表中淘汰最近未使用到的节点:
266 Cache::Handle* LRUCache::Insert(const Slice& key, uint32_t hash, void* value,
267 size_t charge,
268 void (*deleter)(const Slice& key,
269 void* value)) {
270 MutexLock l(&mutex_);
271
272 LRUHandle* e =
273 reinterpret_cast<LRUHandle*>(malloc(sizeof(LRUHandle) - 1 + key.size()));
274 e->value = value;
275 e->deleter = deleter;
276 e->charge = charge;
277 e->key_length = key.size();
278 e->hash = hash;
279 e->in_cache = false;
280 e->refs = 1; // for the returned handle.
281 memcpy(e->key_data, key.data(), key.size());
283 if (capacity_ > 0) {
284 e->refs++; // for the cache's reference.
285 e->in_cache = true;
286 LRU_Append(&in_use_, e);
287 usage_ += charge;
288 FinishErase(table_.Insert(e));
289 } else { // don't cache. (capacity_==0 is supported and turns off caching.)
290 // next is read by key() in an assert, so it must be initialized
291 e->next = nullptr;
292 }
293 //more code...
300 }
301
302 return reinterpret_cast<Cache::Handle*>(e);
303 }
261 void LRUCache::Release(Cache::Handle* handle) {
262 MutexLock l(&mutex_);
263 Unref(reinterpret_cast<LRUHandle*>(handle));
264 }
对于要cache的节点,会再次增加refs一次,并根据头插法挂到in_use_列表中,这期间可能会替换相同的key值:
305 // If e != nullptr, finish removing *e from the cache; it has already been
306 // removed from the hash table. Return whether e != nullptr.
307 bool LRUCache::FinishErase(LRUHandle* e) {
308 if (e != nullptr) {
309 assert(e->in_cache);
310 LRU_Remove(e);
311 e->in_cache = false;
312 usage_ -= e->charge;
313 Unref(e);
314 }
315 return e != nullptr;
316 }
225 void LRUCache::Unref(LRUHandle* e) {
226 assert(e->refs > 0);
227 e->refs--;
228 if (e->refs == 0) { // Deallocate.
229 assert(!e->in_cache);
230 (*e->deleter)(e->key(), e->value);
231 free(e);
232 } else if (e->in_cache && e->refs == 1) {
233 // No longer in use; move to lru_ list.
234 LRU_Remove(e);
235 LRU_Append(&lru_, e);
236 }
237 }
217 void LRUCache::Ref(LRUHandle* e) {
218 if (e->refs == 1 && e->in_cache) { // If on lru_ list, move to in_use_ list.
219 LRU_Remove(e);
220 LRU_Append(&in_use_, e);
221 }
222 e->refs++;
223 }
当refs为零时表示没有被cache引用住,这里回调用户的释放函数并释放相关的内存空间。当一个节点在cache中且refs为1的时候才会进入到lru_列表中,并在LRUCache空间达到限制时:
293 while (usage_ > capacity_ && lru_.next != &lru_) {
294 LRUHandle* old = lru_.next; //lru.prev is newest entry, lru.next is oldest entry.
295 assert(old->refs == 1);
296 bool erased = FinishErase(table_.Remove(old->key(), old->hash));
297 if (!erased) { // to avoid unused variable when compiled NDEBUG
298 assert(erased);
299 }
300 }
以上,对于insert且要cache的节点,refs肯定是为2的,使用完后要Release(Lookup操作也是),lru_列表中的refs都为1。如果要lookup的节点在lru_列表中则会从lru_中删除并挂到in_use_中,自增refs。一个节点不可能同时出现在in_use_和lru_中。
回到brpc中的mru,我看了下貌似只在test中使用到mru,其他现有的代码没有mru的相关使用场景。brpc中的mru实现,只是简单的使用现有stl容器去存储一些元素:
44 class MRUCacheBase {
45 public:
46 // The payload of the list. This maintains a copy of the key so we can
47 // efficiently delete things given an element of the list.
48 typedef std::pair<KeyType, PayloadType> value_type;
50 private:
51 typedef std::list<value_type> PayloadList;
52 typedef typename MapType<KeyType,
53 typename PayloadList::iterator>::Type KeyIndex;
90 iterator Put(const KeyType& key, const PayloadType& payload) {
91 // Remove any existing payload with that key.
92 typename KeyIndex::iterator index_iter = index_.find(key);
93 if (index_iter != index_.end()) {
94 // Erase the reference to it. This will call the deletor on the removed
95 // element. The index reference will be replaced in the code below.
96 Erase(index_iter->second);
97 } else if (max_size_ != NO_AUTO_EVICT) {
98 // New item is being inserted which might make it larger than the maximum
99 // size: kick the oldest thing out if necessary.
100 ShrinkToSize(max_size_ - 1);
101 }
102
103 ordering_.push_front(value_type(key, payload));
104 index_.insert(std::make_pair(key, ordering_.begin()));
105 return ordering_.begin();
106 }
这里通过list的push_front进行头插法,然后通过map建立key和iterator的关系,这样后期get时:
113 iterator Get(const KeyType& key) {
114 typename KeyIndex::iterator index_iter = index_.find(key);
115 if (index_iter == index_.end())
116 return end();
117 typename PayloadList::iterator iter = index_iter->second;
118
119 // Move the touched item to the front of the recency ordering.
120 ordering_.splice(ordering_.begin(), ordering_, iter);
121 return ordering_.begin();
122 }
先通过map查找到对应的iterator,在O(logN)时间内,然后通过iterator定位到list中的value,在O(1),此时通过splice把iter移到最前面去,这里访问哪个就把它移到最前面。
当然,这里关于key-iterator关系容器结构的选择不止是map,也可以是hash_map这种,查找更快些。
对比leveldb中实现的lru,后者有引用计数,防止资源管理方面出现问题,而前者却无,可能是使用场景不一样,所以这里实现比较简单,mru本身很少见,所以这里算是自己再分析下lru实现,加深印象,其他的分片等实现不再分析。
网友评论