1 c++中的锁和条件变量实现状态同步
#include <queue>
#include <string>
#include <mutex>
#include <thread>
#include <chrono>
#include <iostream>
using std::vector;
using std::string;
using std::queue;
using std::mutex;
using std::thread;
class Message {
public:
void Put(const string& msg) {
std::lock_guard<std::mutex> guard(msgs_lock_);
msgs_.push(msg);
}
bool Get(string& out_msg) {
std::lock_guard<std::mutex> guard(msgs_lock_);
if (!msgs_.empty()) {
out_msg = msgs_.front();
msgs_.pop();
return true;
}
return false;
}
private:
queue<string> msgs_;
mutex msgs_lock_;
};
Message g_message_mgr;
void produce() {
for(;;) {
string msg = "some rand msg";
g_message_mgr.Put(msg);
std::this_thread::sleep_for(std::chrono::seconds(2));
}
}
void cosumer() {
for(;;) {
string msg;
if (g_message_mgr.Get(msg)) {
// do sth with msg
std::cout << msg << std::endl;
}
}
}
int main() {
thread t1(produce);
thread t2(cosumer);
t1.join();
t2.join();
return 0;
}
- 以上程序是一个生产者和一个消费者典型例子, 运行之后发现程序cpu高达99%.
- 改善Message类
class Message {
public:
void Put(const string& msg) {
std::unique_lock<std::mutex> guard(msgs_lock_);
bool need_notify = false;
if (msgs_.empty()) {
need_notify = true;
}
msgs_.push(msg);
if (need_notify) {
cv_.notify_all();
}
}
bool Get(string& out_msg) {
std::unique_lock<std::mutex> guard(msgs_lock_);
if (!msgs_.empty()) {
out_msg = msgs_.front();
msgs_.pop();
return true;
} else {
cv_.wait(guard, [this]{return msgs_.empty() == false;});
}
return false;
}
private:
queue<string> msgs_;
mutex msgs_lock_;
condition_variable cv_;
};
-
改善之后cpu使用率降低为0%
-
由上可以看出:
- mutex只是起到了互斥作用, 并不知道资源实际使用情况, 造成了在没消息的时候去死循环检查, 浪费cpu
- condition_variable条件变量可以根据条件来选择挂起和触发, 没消息的时候选择休眠, 有消息的时候被唤醒或者不会被挂起, 避免消费者自己死循环的去查找而浪费cpu.
在实际编程中, 生产者可能想关闭队列并且希望消费者都能捕获这个消息. 那么就要加一个close方法来做处理. close之后需要注意几点:
- close之后生产者不能再放消息进去
- close之后消费者需要消费掉所有的消息再捕获到close的事件
- 另外, Get获取不到需要阻塞, 如果没消息了就返回false.
改善后的代码如下:
class Message {
public:
enum {
GET_NORMAL,
GET_NONE,
GET_OVER,
};
bool Put(const string& msg) {
std::unique_lock<std::mutex> guard(msgs_lock_);
if (is_over_) {
return false;
}
bool need_notify = false;
if (msgs_.empty()) {
need_notify = true;
}
msgs_.push(msg);
if (need_notify) {
cv_.notify_all();
}
return true;
}
bool Get(string& out_msg) {
for(;;) {
int ret = get(out_msg);
if (ret == GET_NORMAL) {
return true;
} else if (ret == GET_NONE) {
continue;
} else if (ret == GET_OVER) {
return false;
} else {
assert(false);
}
}
}
int get(string& out_msg) {
std::unique_lock<std::mutex> guard(msgs_lock_);
if (!msgs_.empty()) {
out_msg = msgs_.front();
msgs_.pop();
return GET_NORMAL;
} else if (is_over_) {
return GET_OVER;
} else {
cv_.wait(guard, [this]{return msgs_.empty() == false || is_over_;}); // 这里is_over 注意要捕获, 不然可能会阻塞在这里.
return GET_NONE;
}
}
void Close() {
std::unique_lock<std::mutex> guard(msgs_lock_);
is_over_ = true;
cv_.notify_all();
}
private:
queue<string> msgs_;
mutex msgs_lock_;
condition_variable cv_;
bool is_over_ = false;
};
2 信号量的同步
从第一节看出是由锁和条件变量来实现状态同步的, 而信号量也可以实现同样的功能, 参考伪代码:
produce:
P(emptyCount)
P(useQueue)
putItemIntoQueue(item)
V(useQueue)
V(fullCount)
consume:
P(fullCount)
P(useQueue)
item ← getItemFromQueue()
V(useQueue)
V(emptyCount)
-
emptyCount 初始资源大小为 N, fullCount 初始化大小是 0, 并且useQueue is 初始化大小是1.
-
P代表减1, V代表加1
-
通过P(emptyCount)来检查共享资源, 当资源不够时候线程被挂起, 直到V(emptyCount)释放了资源. fullCount同理.
以上P和V都是原子操作, 提供了检查资源, 挂起, 事件注册与通知等功能. 简而言之是一种共享锁(相对互斥锁来说). 当资源总共为1的时候, 同互斥锁.
信号量也可以实现类似wait功能, 但是参考golang里面的waitgroup, 还是有一些不足, 例如在通知的时候信号量如何做到广播通知所有wait的线程, 相对来说信号量更偏向对资源的同步控制.
3 锁与信号量
锁和信号量都能实现类似的功能, 在一些复杂的场景, 例如多种相互依赖的状态需要同步的时候通过锁和条件变量更方便实现, 但是要注意锁的串行特性, 不要锁住耗时多的代码.
网友评论