1. LAB 基本
LAB的做法网路上很多,有困难的小伙伴可以参考这篇:https://blog.csdn.net/LostUnravel/article/details/121430900
我重点会介绍如何实现optional challenge
2. Optional Challenge
maintain the LRU list so that you evict the least-recently used buffer instead of any buffer that is not in use.
make lookup in the buffer cache lock-free. Hint: use gcc's _sync* functions. How do you convince yourself that your implementation is correct?
2.1 从维护一个unused队列开始
首先在基本做法中,我们通过在buf
上多维护一个timestamp
, 然后在同一个hashmap bucket 下的链表里找到refcnt为0,且timestamp最小的,即实现了一种单bucket下的LRU.
如果想实现全局的LRU,我们还是离不开链表这个数据结构。在链表中,我们假设新的UNUSED的BLOCK块会一直从头部插入。所以尾部就是应该需要被淘汰的块。那么本质上我们只要关注链表的2端。
因为2端进出,只会影响这个链表的局部node,所以我们可以做2个细粒度锁,分别去锁链表的头部和尾部。
这样enqueue 和 dequeue 可以并发.
这样我们的整体设计思路就转变为,我们在HASHMAP中维护的是正在使用的block, 而queue里存放的是没有使用的block。当bget 在hashmap里找不到,我们就可以去队头head
选一个unused的block 重新加回hashmap。 如果brelse 成功释放一个block,我们需要把它从hashmap里拿走,放进队尾tail
。
这里有一个小细节,我们是否可以延迟入队列呢?也就是说在brelse 时,先不从hashmap删除;
答案是不方便,
如果我们在brelse直接加入unused队列,当有人把他从unused取走时,再从hashmap删除;但是如果是hashmap遍历时命中,我们需要把他从unused queue中删除。这时他可能就在中间了,我们没法从队头拿走它。
那么基于这个方案,我们需要一个高效的2 lock 算法。
1703924113412.png
1703924134967.png
这边有一个非常好的问题是,为什么我们需要用原子读写。
因为其实当队列里为空或只有一个元素时,是存在race condition的,所以我们需要这个操作是原子;因为当这个操作是原子时,这个race condition 是不会引起问题的。
enqueue()
函数仅操作队尾,而 dequeue()
函数仅操作队首,因此它们不需要使用相同的锁。当队首和队尾指向同一个节点时,存在一个特殊情况,即在初始化时创建的“dummy”节点。你提到的 enqueue()
可能在写入该节点的 next
指针时,dequeue()
正在尝试读取它,这是正确的。
但这种并发读写并不会造成问题。请注意,enqueue()
创建一个新节点,并在使其通过写入 tail->next
对其他代码可见之前,完全初始化该节点。因此,不会有其他代码能看到这个新节点处于半初始化状态。此外,指针的读写是原子的,因此 dequeue()
不可能得到一个“半个”指针。
所以在你描述的场景中,enqueue()
和 dequeue()
同时被调用,存在两种可能:
-
enqueue()
在dequeue()
读取head->next
之前写入tail->next
。在这种情况下,dequeue()
将看到被入队的节点,并将其返回。 -
dequeue()
在enqueue()
写入tail->next
之前读取head->next
。在这种情况下,dequeue()
认为队列为空并返回 false。
所以我们就有了我们第一个优化版本的代码
这里我们减少了锁的竞争,同时我们把最坏的时间复杂度从原来的O(N)
优化到了 O(1)
(假设hashmap 的操作都为O (1))
那么对应几个方法的改动也变得简单
int bhash(uint dev, uint blockno) {
int hash = dev ^ blockno;
hash = hash * 31 + blockno;
if (hash < 0) hash = -hash;
return hash % NBUFBUC;
}
void addhashtable(int key, struct buf *b) {
if (!bcache.hashtable[key]) {
bcache.hashtable[key] = b;
return;
}
bcache.hashtable[key]->prev = b;
b->next = bcache.hashtable[key];
bcache.hashtable[key] = b;
}
void removehashtable(int key, struct buf *b)
{
if (bcache.hashtable[key] == b) {
bcache.hashtable[key] = b->next;
}
if(b->prev)
b->prev->next = b->next;
if(b->next)
b->next->prev = b->prev;
b->next = 0;
b->prev = 0;
}
// Look through buffer cache for block on device dev.
// If not found, allocate a buffer.
// In either case, return locked buffer.
static struct buf*
bget(uint dev, uint blockno)
{
struct buf *b;
int key = bhash(dev, blockno);
acquire(&bcache.locks[key]);
// Is the block already cached?
b = bcache.hashtable[key];
for(; b; b = b->next){
if(b->dev == dev && b->blockno == blockno){
b->refcnt++;
release(&bcache.locks[key]);
acquiresleep(&b->lock);
return b;
}
}
// Recycle the least recently used (LRU) unused buffer.
// todo: maintain LRU linkedlist for free buf, and use lock-free to get head
b = dequeue();
if (!b) panic("bget: no buffers");
b->valid = 0;
b->refcnt = 1;
b->dev = dev;
b->blockno = blockno;
b->next = b->prev = 0;
addhashtable(key, b);
release(&bcache.locks[key]);
acquiresleep(&b->lock);
return b;
}
// Release a locked buffer.
// Move to the head of the most-recently-used list.
void
brelse(struct buf *b)
{
if(!holdingsleep(&b->lock))
panic("brelse");
releasesleep(&b->lock);
int key = bhash(b->dev, b->blockno);
acquire(&bcache.locks[key]);
b->refcnt--;
if (b->refcnt == 0) {
// no one is waiting for it.
removehashtable(key, b);
release(&bcache.locks[key]);
enqueue(b);
return;
}
release(&bcache.locks[key]);
}
void
bpin(struct buf *b) {
if (b->refcnt == 0) panic("bpin error");
int key = bhash(b->dev, b->blockno);
acquire(&bcache.locks[key]);
b->refcnt++;
release(&bcache.locks[key]);
}
void
bunpin(struct buf *b) {
if (b->refcnt <= 1) panic("bunpin error");
int key = bhash(b->dev, b->blockno);
acquire(&bcache.locks[key]);
b->refcnt--;
release(&bcache.locks[key]);
}
2.1.1 测试
1703925430412.png锁的竞争极大减轻,相比之前一个锁的方案。
但是有时运气不好,可能测试依然会失败,这取决于不同进程获取锁的时机。
2.2 把队列优化成无锁
为了让测试可以百分百通过,我们需要使用一个无锁的队列算法。算法的伪代码在这:
https://people.cs.pitt.edu/~jacklange/teaching/cs2510-f12/papers/implementing_lock_free.pdf
我这里简单讲一下算法的原理,
- 对
enqueue
来说,我的目标是当我在改tail 的next的指针是确保,中间没有其他线程同时在改。这个确保是通过CAS来实现的。如果发现有人改了,我就重新读取最新的状态,重新尝试。 - 对
dequeue
来说,我的目标是当我在改head指针时是确保,中间没有其他线程同时在改。这个确保是通过CAS来实现的。如果发现有人改了,我就重新读取最新的状态,重新尝试。
Non-Blocking Concurrent Queue Algorithm
structure pointer_t {ptr: pointer to node_t, count: unsigned integer}
structure node_t {value: data type, next: pointer_t}
structure queue_t {Head: pointer_t, Tail: pointer_t}
initialize(Q: pointer to queue_t)
node = new_node() // Allocate a free node
node->next.ptr = NULL // Make it the only node in the linked list
Q->Head.ptr = Q->Tail.ptr = node // Both Head and Tail point to it
enqueue(Q: pointer to queue_t, value: data type)
E1: node = new_node() // Allocate a new node from the free list
E2: node->value = value // Copy enqueued value into node
E3: node->next.ptr = NULL // Set next pointer of node to NULL
E4: loop // Keep trying until Enqueue is done
E5: tail = Q->Tail // Read Tail.ptr and Tail.count together
E6: next = tail.ptr->next // Read next ptr and count fields together
E7: if tail == Q->Tail // Are tail and next consistent?
// Was Tail pointing to the last node?
E8: if next.ptr == NULL
// Try to link node at the end of the linked list
E9: if CAS(&tail.ptr->next, next, <node, next.count+1>)
E10: break // Enqueue is done. Exit loop
E11: endif
E12: else // Tail was not pointing to the last node
// Try to swing Tail to the next node
E13: CAS(&Q->Tail, tail, <next.ptr, tail.count+1>)
E14: endif
E15: endif
E16: endloop
// Enqueue is done. Try to swing Tail to the inserted node
E17: CAS(&Q->Tail, tail, <node, tail.count+1>)
dequeue(Q: pointer to queue_t, pvalue: pointer to data type): boolean
D1: loop // Keep trying until Dequeue is done
D2: head = Q->Head // Read Head
D3: tail = Q->Tail // Read Tail
D4: next = head.ptr->next // Read Head.ptr->next
D5: if head == Q->Head // Are head, tail, and next consistent?
D6: if head.ptr == tail.ptr // Is queue empty or Tail falling behind?
D7: if next.ptr == NULL // Is queue empty?
D8: return FALSE // Queue is empty, couldn't dequeue
D9: endif
// Tail is falling behind. Try to advance it
D10: CAS(&Q->Tail, tail, <next.ptr, tail.count+1>)
D11: else // No need to deal with Tail
// Read value before CAS
// Otherwise, another dequeue might free the next node
D12: *pvalue = next.ptr->value
// Try to swing Head to the next node
D13: if CAS(&Q->Head, head, <next.ptr, head.count+1>)
D14: break // Dequeue is done. Exit loop
D15: endif
D16: endif
D17: endif
D18: endloop
D19: free(head.ptr) // It is safe now to free the old node
D20: return TRUE // Queue was not empty, dequeue succeeded</pre>
这个算法在使用上有个限制条件,它依赖于一种类型保留的分配器进行内存管理,这种分配器永远不会将队列节点重用为不同类型的对象,也不会将内存返回给操作系统。如果在特定的上下文中这是不可接受的,可以修改代码以纳入hazard pointers, epoch-based reclamation, or interval-based reclamation.
原因是因为上述伪代码D19和D4的race condition。如果2个线程,其中一个有指针,指着head.ptr
, 另一个 free掉之后给操作系统重用了,那么指着这个的线程就会读到错误的数据而做出错误的行为。具体描述
2.2.1 对上述伪码我们的实现版本:
inline int CAS64(uint64* ptr, uint64* expected, uint64 desired) {
return __atomic_compare_exchange_n(ptr, expected, desired, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
}
inline int CAS32(uint32* ptr, uint32* expected, uint32 desired) {
return __atomic_compare_exchange_n(ptr, expected, desired, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
}
void enqueue(struct buf *b) {
int idx = b - bcache.buf;
b->qnext = CNTPTR(0LL, NULL);
uint64 tail, next;
while (1) {
tail = bcache.tail;
next = bcache.buf[PTR(tail)].qnext;
if (tail == bcache.tail) {
if (PTR(next) == NULL) {
if (CAS64(&bcache.buf[PTR(tail)].qnext, &next, CNTPTR(CNT(next) + 1, idx)))
break;
} else {
CAS64(&bcache.tail, &tail, CNTPTR(CNT(tail) + 1, PTR(next)));
}
}
}
CAS64(&bcache.tail, &tail, CNTPTR(CNT(tail) + 1, idx));
}
// we return current dummy as the ret node, then dummy->next become the new dummy
struct buf *dequeue() {
uint64 head, tail, next;
struct buf *ret;
while(1) {
head = bcache.head;
tail = bcache.tail;
next = bcache.buf[PTR(head)].qnext;
if (head == bcache.head)
{
if (PTR(head) == PTR(tail)) {
if (PTR(next) == NULL) return 0;
CAS64(&bcache.tail, &tail, CNTPTR(CNT(tail) + 1, PTR(next)));
} else {
ret = &bcache.buf[PTR(head)];
if(CAS64(&bcache.head, &head, CNTPTR(CNT(head) + 1, PTR(next))))
break;
}
}
}
return ret;
}
我们的场景中不会有这个问题,因为我们的BUF 从DEQUE里出去后,并不会返回给操作系统,也不会被重用, 更不会去free。其实他就是作为BLOCK的CACHE 被上层使用了。而上层使用的时候是对qnext无感的。当他被FREE,重新加进来时,会重新设置它的qnext(包含指针和cnt), 所以这个算法是安全的。
同时在初始化时,我们多开了一个BUF作为初始的dummy node,随着队列的进出,初始的dummy node 可能会变成实际的cache node,某一个原来的cache node 会转变为dummy node的身份。
2.2.2 qnext解析
我们这里的qnext 就是论文伪码中的structure pointer_t {ptr: pointer to node_t, count: unsigned integer}
存了一个指针,和一个count
- 为什么需要count?
这是为了解决一个队列CAS实现中的ABA的问题;
在JAVA中的 ConcurrentLinkedQueue,他优化掉了这个数据结构,是因为它会被java的GC给handle住ABA的问题。
因为我们没有GC,所以我们来看看,我们如果不加COUNT,会遇到什么问题
- 队列初始状态为: A(head)->B(tail)->null
- thread 1 想要把head 从dummy A挪到B,并且return dummy A;
所以他会执行CAS64(&bcache.head, &head, CNTPTR(CNT(head) + 1, PTR(next)))
, 但是在执行前,它停住了。CAS的语义为, 比较head 还是不是A,如果是,就改成B - thread 2 先 成功 pop A,此时状态为:B(head, tail)->null
- thread 2 再 成功 push A, 此时状态为: B(head)->A(tail)->null
- thread 2 再 成功 pop B 此时状态为: A(head/tail)->null
- thread 1,苏醒, CAS 成功,状态变为: B(head)->null (出错)
到这里就出错了,因为thread1 错误的认为 中间没有发生过任何事,因为A的值没变化。
要打破这个ABA的问题,就要引入这个CNT,每次操作CNT+1, 这样thread 1苏醒了,虽然A还是A, 但是CNT已经不是原来的CNT了。
- 为什么我们是uint64
可以看到我们使用了一个uint64,前32位当作CNT,后32位当作IDX, 这个IDX本质是指向一个BUF的指针,实际因为我们的BUF数量在一开始就固定下来了。所以我们使用一个数组下标,就可以代替一个64位的指针。
这样做的好处是,我们可以在一个CAS操作下同时改掉CNT和IDX 2个值。这是算法要求的,并且我们RISCV64不支持128位的CAS,所以只有这样可以还原论文里的算法。
2.2.3 测试:
1703929156789.png至此,我们成功优化掉了一半的锁
2.3 把 HASHMAP 优化成无锁
其实另外一半的锁,我们可以通过一个lock free hashmap 去搞定。具体的思想可以参考这篇博客
要完整实现上面的代码,可以做到非常好的扩展性。但是这里我们可以针对xv6的bio做一个简单版本的无锁实现。
这里我们需要把hashmap优化成一个大的数组,因为我们知道xv6定义了最大的blockno,这样的好处是我们可以简化HASHMAP的REMOVE操作,因为如果要真的无锁的REMOVE,是需要实现RESIZE的REMAP这个操作。
那么有了一个大数组后,我们其实原先HASHMAP 的LOCK 要保护的2个变量(refcnt, 和hashmap linked list next/prev指针) 都可以融合进一个整数里,就和上面的方式一样,用CAS的操作去维护。
这里我们不用担心ABA的问题。 和之前队列 会根据当前的状态(这个状态可能过时了,但因为ABA,线程无法发现)去设置NEXT指针不同。我们的HASHMAP 的CAS只会把entry从在MAP里变成空值。或者是从空值变为在HASHMAP中,且REFCNT为1.
1703936408027.png 1703936876785.png
1703936912337.png
1703936931899.png
2.3.1 测试
我们使用8个CPU,并且把测试代码开出8个进程,去看会不会发生异常。
1703936315489.png
1703936735450.png
运行make grade
1703937025610.png2.4 如何验证无锁正确性
首先我通过跑100次测试,确保可以全部通过。
其次我是根据common-pitfalls-in-writing-lock-free-algorithms, 里提到的几个要点进行自查
1. Segfault
前面提到,我们的算法,不会有free 和 new的操作,所以是segfault 安全的。
2. Corruption
上面实现时分析过,我们在hashmap 和 queue中都保证了ABA问题不会引起多线程下的错误。
3. not lock-free
这个实现bcache的代码中没有引用外部加锁的函数。
4. Data races
这里我的理解,倒不在于原子性。因为我们涉及到的数据共享都是32位和64位的整数。所以riscv-64编译器在对这些基础变量产生的读写指令,默认都为原子。这里文中提到的要用atomic_store,我觉得更多的问题反而是因为每个CPU有自己的缓存,所以会有内存可见性的问题。读没有拿到最新写引起的。
首先我们确保我们的CAS是不会有这个问题的,我在成功和失败时,都使用了最严格级别的memory fence: __ATOMIC_SEQ_CST
我们可以看到编译器会加上这样一行fence iorw,ow
fence iorw,ow
确保了所有在该指令之前的读写操作都在对内存进行后续写操作之前完成。这是一种强有力的同步机制,用于防止内存操作的重排序,确保数据的一致性和可见性。
其他部分会在执行过CAS后,获得内存可见性,所以即使一开始数据延迟,也会在CAS后,拿到内存中最新的数据。
5. Memory reordering
非CAS部分,指令顺序重排序并不影响算法正确性。
2.5 性能优化
我们有了第一版无锁O(1)算法,但是我们发现原来跑bcachetest只要9秒就可以完成,现在我们要花37秒。无锁算法反而降低了性能,这是为什么呢?
首先是因为N不是很大,所以原来的算法里,最坏情况也就是31次遍历。
很多情况下,也就是2,3次遍历后就可以返回。
所以这部分性能提升不算大。
然后原来算法做的更好的一个地方是,即使某个block refcnt直接为0了,但是还没有其他blockno要替换走它时。它依然会在bget优先被选出,这样做的一个优势是,它不用去设置b->valid = 0
,而可以直接重用内存里的数据。
这个b->valid = 0
,一旦被设置, bread时就会不得不从磁盘中把数据读出来,放进这个block cache里。这部操作是非常慢的。
下面我们基于现有的无锁算法,来优化。
2.5.1 核心问题
核心问题在于 我们在brelse
时,当refcnt
为0时,我们就把该条目直接从hashmap移除,随后把这个buf加到queue里。这样所有refcnt为0的,都不会被相同的blockno回来重用,因为它们已经从hashmap里移走了。
要解决这个问题,我们只能在更新 hashmap时,只做-1 refcnt
操作,而不做REMOVE
操作(也就是把IDX
设置为NONE
)
但是为了O(1)
找到 一个可用的refcnt
为0的块,我们依然需要把这个block推进queue
里
这样会引入2个问题:
- 同一个block会多次进queue,造成queue的链表指针错乱:
比如:
a->b->c->null
如果block b,再bget
时 重用了,那么他的refcnt++
, 之后再brelse
时,又refcnt--
, 此时他的refcnt
又为0,会再次推进队列里。
那么队列的指针会变成如下的中间状态:
a(head)->b->null, c->b->null
c这个节点就消失了
-
bget
时,对队列里出栈的元素,可能已经被另一个线程refcnt++
, 不满足可以被替换的要求了
2.5.2 解决方案
一种直觉的思路是,当我们bget
时, 再REFCNT++,如果是从0 加上去的,应该把它从队列里删除。但是此时他可能不再队尾,我们需要一个更复杂的lock-free数据结构
所以我们得另寻出路。
一个可行的想法是,我们要找到一种机制,可以知道一个buf
是不是已经在queue
里。只要知道了这个状态,我们就可以避免第一个问题。因为入队时,如果它还在队列中,我们就忽略这个入队操作即可。
第二个问题,我们可以通过延迟删除来解决,如果一个bget
dequeue
出队一个buf
, 必须要把它从HASHMAP移除,通过CAS,且CAS的期望状态必须为refcnt为0,如果这个操作失败,则代表,此时有别的线程对它进行了refcnt++
的操作。就应该丢弃这个buf
2.5.3 并发问题
下面我就是要看要在什么时机去设置这个在queue的状态,来确保无锁的情况下,也是正确的。
首先它应该是在实际入栈前就发生,因为入栈后再设置,依然会发生同一个BUF重复入栈的问题。所以只有第一个CAS这个位成功的线程,可以获得这个BUF的入栈资格。其他的就放弃入栈。除非其他的执行CAS的时机,恰好是该元素入栈完,又出栈(bget拿去使用)了。(ABA问题)这时他可以入栈;之后别人取出来使用时,会判断它的refcnt是否为0,(通过上面提到的CAS操作),如果refcnt不为0,则会丢弃,所以是安全的。
下面就是考虑什么时候把这个状态置为0,答案也是必须成功在deque时,释放该元素后。
下面就是分析这种操作是否又潜在的并发问题:
情况1
- 1号线程 在
enqueue
如果CAS
该位从0到1,之后线程暂停。此时并未入队。 - 如果有一个线程打算操作
enque
,会CAS
失败,直接丢弃。这是安全的,因为1号线程最终会确保该BUF
入队成功。 - 如果有一个线程打算操作
deque
, 因为此时该buf
并未入队, 所以它pop
出来的buf
不会是同一个buf
. - 如果有一个线程在操作
hashmap
,hashmap
不依赖这个位,所以安全
综上入队时在2个操作之间 暂停任意时间 都安全。
情况2
-
1号线程 在
dequeue
如果移动HEAD
,之后线程暂停。此时该位并未设置回0。 -
如果有一个线程打算操作enque,会CAS失败,直接丢弃。这是安全的,情况就如同,
ret buf
还在队列里,该位没设置,这个buf
就不会被外部拿到尝试从hashmap移除后使用。 -
如果有一个线程打算操作deque, 因为此时该
buf
并未入队, 所以它pop
出来的buf
不会是同一个buf
. 也是安全的。 -
如果有一个线程在操作hashmap,hashmap不依赖这个位,所以安全
综上所有情况都是安全,那么我只要保证内存可见性使用一个memory fence ,确保该位被设置后。别的线程一定可见即可。
2.5.4 最终代码
// Buffer cache.
//
// The buffer cache is a linked list of buf structures holding
// cached copies of disk block contents. Caching disk blocks
// in memory reduces the number of disk reads and also provides
// a synchronization point for disk blocks used by multiple processes.
//
// Interface:
// * To get a buffer for a particular disk block, call bread.
// * After changing buffer data, call bwrite to write it to disk.
// * When done with the buffer, call brelse.
// * Do not use the buffer after calling brelse.
// * Only one process at a time can use a buffer,
// so do not keep them longer than necessary.
#include "types.h"
#include "param.h"
#include "spinlock.h"
#include "sleeplock.h"
#include "riscv.h"
#include "defs.h"
#include "fs.h"
#include "buf.h"
struct {
struct buf buf[NBUF + 1];
// Linked list of all buffers, through prev/next.
// Sorted by how recently the buffer was used.
// head.next is most recent, head.prev is least.
uint32 hashtable[FSSIZE];
uint64 head;
uint64 tail;
} bcache;
inline int CAS64(uint64* ptr, uint64* expected, uint64 desired) {
return __atomic_compare_exchange_n(ptr, expected, desired, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
}
inline int CAS32(uint32* ptr, uint32* expected, uint32 desired) {
return __atomic_compare_exchange_n(ptr, expected, desired, 0, __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST);
}
void enqueue(struct buf *b) {
uint32 expect = 0;
if (!CAS32(&b->in_q, &expect, 1)) return;
int idx = b - bcache.buf;
b->qnext = CNTPTR(0LL, NULL);
uint64 tail, next;
while (1) {
tail = bcache.tail;
next = bcache.buf[PTR(tail)].qnext;
if (tail == bcache.tail) {
if (PTR(next) == NULL) {
if (CAS64(&bcache.buf[PTR(tail)].qnext, &next, CNTPTR(CNT(next) + 1, idx)))
break;
} else {
CAS64(&bcache.tail, &tail, CNTPTR(CNT(tail) + 1, PTR(next)));
}
}
}
CAS64(&bcache.tail, &tail, CNTPTR(CNT(tail) + 1, idx));
}
struct buf *dequeue() {
uint64 head, tail, next;
struct buf *ret;
while(1) {
head = bcache.head;
tail = bcache.tail;
next = bcache.buf[PTR(head)].qnext;
if (head == bcache.head)
{
if (PTR(head) == PTR(tail)) {
if (PTR(next) == NULL) return 0;
CAS64(&bcache.tail, &tail, CNTPTR(CNT(tail) + 1, PTR(next)));
} else {
ret = &bcache.buf[PTR(head)];
if(CAS64(&bcache.head, &head, CNTPTR(CNT(head) + 1, PTR(next)))) {
__atomic_store_n(&ret->in_q, 0, __ATOMIC_RELEASE);
break;
}
}
}
}
return ret;
}
void
binit(void)
{
struct buf *b = bcache.buf+NBUF;
// init hashmap
for (int i = 0; i < FSSIZE; i++)
bcache.hashtable[i] = REFCNT_IDX(0, NONE);
// init dummy node for queue
bcache.head = bcache.tail = CNTPTR(0LL, NBUF);
initsleeplock(&b->lock, "buffer");
b->qnext = CNTPTR(0LL, NULL);
b->in_q = 0;
// init data node in queue
for(b = bcache.buf; b < bcache.buf+NBUF; b++){
b->in_q = 0;
enqueue(b);
initsleeplock(&b->lock, "buffer");
}
}
int bhash(uint dev, uint blockno) {
int hash = dev ^ blockno;
hash = hash * 31 + blockno;
if (hash < 0) hash = -hash;
return hash % NBUFBUC;
}
// Look through buffer cache for block on device dev.
// If not found, allocate a buffer.
// In either case, return locked buffer.
static struct buf*
bget(uint dev, uint blockno)
{
if (dev != 1) panic("bget");
struct buf *b;
b = 0;
while(1) {
uint32 refcnt_idx = bcache.hashtable[blockno];
// Is the block already cached?
if (IDX(refcnt_idx) != NONE) {
if (CAS32(&bcache.hashtable[blockno], &refcnt_idx, INC_REFCNT(refcnt_idx))){
if(b) enqueue(b);
acquiresleep(&bcache.buf[IDX(refcnt_idx)].lock);
return &bcache.buf[IDX(refcnt_idx)];
}
} else {
// Recycle the least recently used (LRU) unused buffer.
if (!b) {
while (1) {
b = dequeue();
if (!b) panic("bget: no buffers");
// b->blockno is threadsafe since until this func return,
// other thread cannot deque same buf, and reset its blockno
uint32 blockno = b->blockno, expect = REFCNT_IDX(0, IDX(bcache.hashtable[blockno]));
if (CAS32(&bcache.hashtable[blockno], &expect, REFCNT_IDX(0, NONE)))
break;
}
}
// assert(refcnt_idx = REFCNT_IDX(0, NONE));
int idx = b - bcache.buf;
if (CAS32(&bcache.hashtable[blockno], &refcnt_idx, REFCNT_IDX(1, idx))){
acquiresleep(&b->lock);
b->valid = 0;
b->dev = dev;
b->blockno = blockno;
return b;
}
}
}
}
// Return a locked buf with the contents of the indicated block.
struct buf*
bread(uint dev, uint blockno)
{
struct buf *b;
b = bget(dev, blockno);
if(!b->valid) {
virtio_disk_rw(b, 0);
b->valid = 1;
}
return b;
}
// Write b's contents to disk. Must be locked.
void
bwrite(struct buf *b)
{
if(!holdingsleep(&b->lock))
panic("bwrite");
virtio_disk_rw(b, 1);
}
// Release a locked buffer.
// Move to the head of the most-recently-used list.
void
brelse(struct buf *b)
{
if (b->dev != 1) panic("brelse");
if(!holdingsleep(&b->lock))
panic("brelse");
releasesleep(&b->lock);
int blockno = b->blockno;
while (1) {
uint32 refcnt_idx = bcache.hashtable[blockno];
// assert(blockno == b->blockno && IDX(refcnt_idx) != NONE && REFCNT(refcnt_idx) > 0);
int new_refcnt = REFCNT(refcnt_idx) - 1, idx = IDX(refcnt_idx);
if (CAS32(&bcache.hashtable[blockno], &refcnt_idx, REFCNT_IDX(new_refcnt, idx))) {
if (new_refcnt == 0) enqueue(b);
return;
}
}
}
void
bpin(struct buf *b) {
int blockno = b->blockno;
uint32 refcnt_idx;
do {
refcnt_idx = bcache.hashtable[blockno];
// assert(blockno == b->blockno && IDX(refcnt_idx) != NONE && REFCNT(refcnt_idx) > 0);
} while(!CAS32(&bcache.hashtable[blockno], &refcnt_idx, INC_REFCNT(refcnt_idx)));
}
void
bunpin(struct buf *b) {
int blockno = b->blockno;
uint32 refcnt_idx;
do {
refcnt_idx = bcache.hashtable[blockno];
// assert(blockno == b->blockno && IDX(refcnt_idx) != NONE && REFCNT(refcnt_idx) > 1);
} while(!CAS32(&bcache.hashtable[blockno], &refcnt_idx, DEC_REFCNT(refcnt_idx)));
}
2.5.5 测试
成功把时间优化回9秒左右:
1704009786968.png
测试100次:
1704025039128.png
网友评论