leveldb提供的接口大部分都是线程安全的,包括write。当多个线程并发的调用write时,leveldb会保证这些写操作是串行的写入到log和memtable中。下面来看下leveldb是怎么做的这个串行化。
其实这个问题本质上是在做线程同步,这种情况一般都需要一个锁。最简单的实现:用一个全局mutex,每个线程获取到这个mutex之后就开始进行写入操作,完成写入后释放锁。不过这种实现性能可能不太好,主要是因为每次拿住锁之后,只能执行一个写操作。一个优化的思路是:把每个写操作做成一个任务,然后将这个任务塞入到一个队列里面,同时启动一个消费线程来从队列中读取任务,读取的时候可以连续读取多个写任务来执行。效果就像是合并了多个写操作一样,能够减少实际IO的次数。
使用这种合并的机制需要考虑几个问题:
- 是否需要额外启动消费线程,如果是,启动多少个线程
- 单个写操作完成后,如何让调用线程知道此次写已经完成
这两个问题都不难解决,不过leveldb的实现比较巧妙,它没有启动额外的线程,而是使用调用线程来执行写入任务。另外,使用了条件变量来通知调用线程写入已经完成。
Write函数的入口代码:
Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
Writer w(&mutex_);
w.batch = my_batch;
w.sync = options.sync;
w.done = false;
MutexLock l(&mutex_);
writers_.push_back(&w);
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
if (w.done) {
return w.status;
}
这里的Writer结构代表一个写入任务,writers_是一个队列。首先获取互斥锁mutex,将写入任务添加到队列尾部,然后等待以下两个事件:
- w.done为true,也就是写入任务已经完成
- 写入任务到了队列头部
如果任务已经完成,则直接返回。如果任务到了队列头部,那么当前线程就开始从队列中读取任务来执行。写入完成后,将此次操作的任务依次出队,并通知对应的线程,代码如下:
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}
// Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
最后,如果队列不为空,则唤醒队列头部任务对应的线程。这个唤醒操作很重要,如果没有这一步,会出现没有线程执行来执行队列里面的任务。
在从队列中取出足够多的任务后,线程会释放队列上的互斥锁,然后再开始进行写操作。执行写入操作时是不需要拿住这个锁的,因为此时不会有其他线程也开始执行写任务。释放锁后,其他线程就可以继续向队列尾部添加任务了。
总体来看,leveldb的这段实现并不复杂,但是比较巧妙的避免了另起线程。我自己在工作项目中也用go实现过类似的io操作合并,主要使用chan,WaitGroup来实现,不过都使用了额外的goroutinue来做消费者。
网友评论