《C++文章汇总》
上一篇介绍了《019-智能指针》,本文介绍多线程通信。
多线程并发:在同一时间段内交替处理多个操作,线程切换时间片是很短的(一般为毫秒级),一个时间片多数时候来不及处理完对某一资源的访问;
线程间通信:一个任务被分割为多个线程并发处理,多个线程可能都要处理某一共享内存的数据,多个线程对同一共享内存数据的访问需要准确有序。
同步:是指在不同进程之间的若干程序片断,它们的运行必须严格按照规定的某种先后次序来运行,这种先后次序依赖于要完成的特定的任务。如果用对资源的访问来定义的话,同步是指在互斥的基础上(大多数情况),通过其它机制实现访问者对资源的有序访问。在大多数情况下,同步已经实现了互斥,特别是所有写入资源的情况必定是互斥的。少数情况是指可以允许多个访问者同时访问资源。
互斥:是指散布在不同进程之间的若干程序片断,当某个进程运行其中一个程序片段时,其它进程就不能运行它们之中的任一程序片段,只能等到该进程运行完这个程序片段后才可以运行。如果用对资源的访问来定义的话,互斥某一资源同时只允许一个访问者对其进行访问,具有唯一性和排它性。但互斥无法限制访问者对资源的访问顺序,即访问是无序的。
// mutex1.cpp 通过互斥体lock与unlock保护共享全局变量
#include <chrono>
#include <mutex>
#include <thread>
#include <iostream>
std::chrono::milliseconds interval(100);
std::mutex mutex;
int job_shared = 0; //两个线程都能修改'job_shared',mutex将保护此变量
int job_exclusive = 0; //只有一个线程能修改'job_exclusive',不需要保护
//此线程只能修改 'job_shared'
void job_1()
{
mutex.lock();
std::this_thread::sleep_for(5 * interval); //令‘job_1’持锁等待
++job_shared;
std::cout << "job_1 shared (" << job_shared << ")\n";
mutex.unlock();
}
// 此线程能修改'job_shared'和'job_exclusive'
void job_2()
{
while (true) { //无限循环,直到获得锁并修改'job_shared'
if (mutex.try_lock()) { //尝试获得锁成功则修改'job_shared'
++job_shared;
std::cout << "job_2 shared (" << job_shared << ")\n";
mutex.unlock();
return;
} else { //尝试获得锁失败,接着修改'job_exclusive'
++job_exclusive;
std::cout << "job_2 exclusive (" << job_exclusive << ")\n";
std::this_thread::sleep_for(interval);
}
}
}
int main()
{
std::thread thread_1(job_1);
std::thread thread_2(job_2);
thread_1.join();
thread_2.join();
getchar();
return 0;
}
job_2 shared (1)
job_1 shared (2)
从上面的代码看,创建了两个线程和两个全局变量,其中一个全局变量job_exclusive是排他的,两线程并不共享,不会产生数据竞争,所以不需要锁保护。另一个全局变量job_shared是两线程共享的,会引起数据竞争,因此需要锁保护。线程thread_1持有互斥锁lock的时间较长,线程thread_2为免于空闲等待,使用了尝试锁try_lock,如果获得互斥锁则操作共享变量job_shared,未获得互斥锁则操作排他变量job_exclusive,提高多线程效率。
lock_guard与unique_lock保护共享资源
但lock与unlock必须成对合理配合使用,使用不当可能会造成资源被永远锁住,甚至出现死锁(两个线程在释放它们自己的lock之前彼此等待对方的lock)。是不是想起了C++另一对儿需要配合使用的对象new与delete,若使用不当可能会造成内存泄漏等严重问题,为此C++引入了智能指针shared_ptr与unique_ptr。智能指针借用了RAII技术(Resource Acquisition Is Initialization—使用类来封装资源的分配和初始化,在构造函数中完成资源的分配和初始化,在析构函数中完成资源的清理,可以保证正确的初始化和资源释放)对普通指针进行封装,达到智能管理动态内存释放的效果。同样的,C++也针对lock与unlock引入了智能锁lock_guard与unique_lock,同样使用了RAII技术对普通锁进行封装,达到智能管理互斥锁资源释放的效果。lock_guard与unique_lock的区别如下:
图片.png
从上面两个支持的操作函数表对比来看,unique_lock功能丰富灵活得多。如果需要实现更复杂的锁策略可以用unique_lock,如果只需要基本的锁功能,优先使用更严格高效的lock_guard。两种锁的简单概述与策略对比见下表:
图片.png
如果将上面的普通锁lock/unlock替换为智能锁lock_guard,其中job_1函数代码修改如下:
void job_1()
{
std::lock_guard<std::mutex> lockg(mutex); //获取RAII智能锁,离开作用域会自动析构解锁
std::this_thread::sleep_for(5 * interval); //令‘job_1’持锁等待
++job_shared;
std::cout << "job_1 shared (" << job_shared << ")\n";
}
如果也想将job_2的尝试锁try_lock也使用智能锁替代,由于lock_guard锁策略不支持尝试锁,只好使用unique_lock来替代,代码修改如下(其余代码和程序执行结果与上面相同):
void job_2()
{
while (true) { //无限循环,直到获得锁并修改'job_shared'
std::unique_lock<std::mutex> ulock(mutex, std::try_to_lock); //以尝试锁策略创建智能锁
//尝试获得锁成功则修改'job_shared'
if (ulock) {
++job_shared;
std::cout << "job_2 shared (" << job_shared << ")\n";
return;
} else { //尝试获得锁失败,接着修改'job_exclusive'
++job_exclusive;
std::cout << "job_2 exclusive (" << job_exclusive << ")\n";
std::this_thread::sleep_for(interval);
}
}
}
timed_mutex与recursive_mutex提供更强大的锁
互斥量mutex提供了普通锁lock/unlock和智能锁lock_guard/unique_lock,基本能满足我们大多数对共享数据资源的保护需求。但在某些特殊情况下,我们需要更复杂的功能,比如某个线程中函数的嵌套调用可能带来对某共享资源的嵌套锁定需求,mutex在一个线程中却只能锁定一次;再比如我们想获得一个锁,但不想一直阻塞,只想等待特定长度的时间,mutex也没提供可设定时间的锁。针对这些特殊需求,< mutex >库也提供了下面几种功能更丰富的互斥类,它们间的区别见下表:
image
不同互斥类所支持的互斥锁类型总结如下表:
image
继续用前面的例子,将mutex替换为timed_mutex,将job_2的尝试锁tyr_lock()替换为带时间的尝试锁try_lock_for(duration)。由于改变了尝试锁的时间,所以在真正获得锁之前的尝试次数也有变化,该变化体现在尝试锁失败后对排他变量job_exclusive的最终修改结果或修改次数上。更新后的代码如下所示
#include <stdio.h>
#include <chrono>
#include <mutex>
#include <iostream>
#include <thread>
using namespace std;
std::chrono::milliseconds inteval(100);
std::timed_mutex tmutex;
int job_shared = 0;//两个线程都能修改'job_shared',mutex将保护此变量
int job_exclusive = 0; //只有一个线程能修改'job_exclusive',不需要保护
//此线程只能修改 'job_shared'
void job_1(){
std::lock_guard<std::timed_mutex> lockg(tmutex);//获取RAII锁,离开作用域会自动析构解锁
std::this_thread::sleep_for(5*inteval);//令‘job_1’封锁等待
++job_shared;
std::cout << "job_1 shared (" << job_shared << ")\n";
}
//此线程能修改‘job_shared’ 和 ‘job_exclusive’
void job_2(){
while (true) {//无线循环只要能获得锁并修改“job_shared”
std::unique_lock<std::timed_mutex> ulock(tmutex,std::defer_lock);//创建一个智能锁但先不锁定
//尝试获得锁成功则修改“job_shared”
if (ulock.try_lock_for(3 * inteval)) {
++job_shared;
std::cout << "job_2 shared (" << job_shared << ")\n";
return;
}else{//尝试获得锁失败,接着修改“job_exclusive”
++job_exclusive;
std::cout << "job_2 exclusive (" << job_shared << ")\n";
std::this_thread::sleep_for(inteval);
}
}
}
int main(){
std::thread t1(job_1);
std::thread t2(job_2);
t1.join();
t2.join();
getchar();
return 0;
}
job_2 exclusive (0)
job_1 shared (1)
job_2 shared (2)
C++线程间通信有三种方式
(1)通过条件变量进行线程间的通信
(2)通过标志位来通知线程间的通信
(3)通过std::future来进行线程间的通信
1.通过条件变量进行线程间通信
条件变量使用“通知—唤醒”模型,生产者生产出一个数据后通知消费者使用,消费者在未接到通知前处于休眠状态节约CPU资源;当消费者收到通知后,赶紧从休眠状态被唤醒来处理数据,使用了事件驱动模型,在保证不误事儿的情况下尽可能减少无用功降低对资源的消耗。
I.如何使用条件变量
C++标准库在< condition_variable >中提供了条件变量,借由它,一个线程可以唤醒一个或多个其他等待中的线程。原则上,条件变量的运作如下:
你必须同时包含< mutex >和< condition_variable >,并声明一个mutex和一个condition_variable变量;
那个通知“条件已满足”的线程(或多个线程之一)必须调用notify_one()或notify_all(),以便条件满足时唤醒处于等待中的一个条件变量;
那个等待"条件被满足"的线程必须调用wait(),可以让线程在条件未被满足时陷入休眠状态,当接收到通知时被唤醒去处理相应的任务;
A.cond.notify_all()
#include <iostream>
#include <stdio.h>
#include <thread>
#include <deque>
#include <mutex>
#include <vector>
#include <condition_variable>
using namespace std;
std::mutex mtx;
std::condition_variable cv;
std::vector<int> vec;
int productNum = 5;
void Producer(){
for (int i = 1; i <= productNum; ++i) {
std::unique_lock<std::mutex> lock(mtx);
while (!vec.empty()) {
cv.wait(lock);//vec 不为空时阻塞当前线程
}
vec.push_back(i);
std::cout << "Producer生产产品: " << i << std::endl;
cv.notify_all();//释放线程锁
}
}
void Consumer(){
while (true) {
std::unique_lock<std::mutex> lock(mtx);//vec 为空时等待线程锁。其他线程锁释放时,当前线程继续执行
while (vec.empty()) {
cv.wait(lock);
}
int data = vec.back();
vec.pop_back();
std::cout << "Consumer消费产品: " << data << std::endl;
cv.notify_all();
}
}
int main(int argc, const char * argv[]) {
std::thread t1(Producer);
std::thread t2(Consumer);
t2.join();
t1.join();
std::cin.get();
return 0;
}
Producer生产产品: 1
Consumer消费产品: 1
Producer生产产品: 2
Consumer消费产品: 2
Producer生产产品: 3
Consumer消费产品: 3
Producer生产产品: 4
Consumer消费产品: 4
Producer生产产品: 5
Consumer消费产品: 5
B.cond.notify_one()
//cond_var2.cpp用条件变量解决轮询间隔难题
#include <iostream>
#include <deque>
#include <thread>
#include <mutex>
#include <condition_variable>
std::deque<int> q; //双端队列标准容器全局变量
std::mutex mu; //互斥锁全局变量
std::condition_variable cond; //全局条件变量
//生产者,往队列放入数据
void function_1() {
int count = 10;
while (count > 0) {
std::unique_lock<std::mutex> locker(mu);
q.push_front(count); //数据入队锁保护
locker.unlock();
cond.notify_one(); // 向一个等待线程发出“条件已满足”的通知
std::this_thread::sleep_for(std::chrono::seconds(1)); //延时1秒
count--;
}
}
//消费者,从队列提取数据
void function_2() {
int data = 0;
while ( data != 1) {
std::unique_lock<std::mutex> locker(mu);
while(q.empty()) //判断队列是否为空
cond.wait(locker); // 解锁互斥量并陷入休眠以等待通知被唤醒,被唤醒后加锁以保护共享数据
data = q.back();
q.pop_back(); //数据出队锁保护
locker.unlock();
std::cout << "t2 got a value from t1: " << data << std::endl;
}
}
int main() {
std::thread t1(function_1);
std::thread t2(function_2);
t1.join();
t2.join();
getchar();
return 0;
}
- 在function_2中,在判断队列是否为空的时候,使用的是while(q.empty()),而不是if(q.empty()),这是因为wait()从阻塞到返回,不一定就是由于notify_one()函数造成的,还有可能由于系统的不确定原因唤醒(可能和条件变量的实现机制有关),这个的时机和频率都是不确定的,被称作伪唤醒。如果在错误的时候被唤醒了,执行后面的语句就会错误,所以需要再次判断队列是否为空,如果还是为空,就继续wait()阻塞;
- 在管理互斥锁的时候,使用的是std::unique_lock而不是std::lock_guard,而且事实上也不能使用std::lock_guard。这需要先解释下wait()函数所做的事情,可以看到,在wait()函数之前,使用互斥锁保护了,如果wait的时候什么都没做,岂不是一直持有互斥锁?那生产者也会一直卡住,不能够将数据放入队列中了。所以,wait()函数会先调用互斥锁的unlock()函数,然后再将自己睡眠,在被唤醒后,又会继续持有锁,保护后面的队列操作。lock_guard没有lock和unlock接口,而unique_lock提供了,这就是必须使用unique_lock的原因;
- 使用细粒度锁,尽量减小锁的范围,在notify_one()的时候,不需要处于互斥锁的保护范围内,所以在唤醒条件变量之前可以将锁unlock()。
还可以将cond.wait(locker)换一种写法,wait()的第二个参数可以传入一个函数表示检查条件,这里使用lambda函数最为简单,如果这个函数返回的是true,wait()函数不会阻塞会直接返回,如果这个函数返回的是false,wait()函数就会阻塞着等待唤醒,如果被伪唤醒,会继续判断函数返回值。代码示例如下:
//消费者,从队列提取数据
void function_2() {
int data = 0;
while ( data != 1) {
std::unique_lock<std::mutex> locker(mu);
cond.wait(locker, [](){ return !q.empty();}); //如果条件变量被唤醒,检查队列非空条件是否为真,为真则直接返回,为假则继续等待
data = q.back();
q.pop_back(); //数据出队锁保护
locker.unlock();
std::cout << "t2 got a value from t1: " << data << std::endl;
}
}
下面给出条件变量支持的操作函数表:
image
值得注意的是:
所有通知(notification)都会被自动同步化,所以并发调用notify_one()和notify_all()不会带来麻烦;
所有等待某个条件变量(condition variable)的线程都必须使用相同的mutex,当wait()家族的某个成员被调用时该mutex必须被unique_lock锁定,否则会发生不明确的行为;
wait()函数会执行“解锁互斥量–>陷入休眠等待–>被通知唤醒–>再次锁定互斥量–>检查条件判断式是否为真”几个步骤,这意味着传给wait函数的判断式总是在锁定情况下被调用的,可以安全的处理受互斥量保护的对象;但在"解锁互斥量–>陷入休眠等待"过程之间产生的通知(notification)会被遗失。
线程同步保证了多个线程对共享数据的有序访问,目前我们了解到的多线程间传递数据主要是通过共享数据(全局变量)实现的,全局共享变量的使用容易增加不同任务或线程间的耦合度,也增加了引入bug的风险,所以全局共享变量应尽可能少用。很多时候我们只需要传递某个线程或任务的执行结果,以便参与后续的运算,但我们又不想阻塞等待该线程或任务执行完毕,而是继续执行暂时不需要该线程或任务执行结果参与的运算,当需要该线程执行结果时直接获得,才能更充分发挥多线程并发的效率优势。
2.使用全局变量与条件变量传递结果进行线程间通信
同步:就是在发出一个调用时,在没有得到结果之前,该调用就不返回。但是一旦调用返回,就得到返回值了。换句话说,就是由调用者主动等待这个调用的结果。
异步:调用在发出之后,这个调用就直接返回了,所以没有返回结果。换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果。而是在调用发出后,被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用。
image
如何使用异步编程
在线程库< thread >中并没有获得线程执行结果的方法,通常情况下,线程调用者需要获得线程的执行结果或执行状态,以便后续任务的执行。那么,通过什么方式获得被调用者的执行结果或状态呢?
#include <stdio.h>
#include <vector>
#include <numeric>
#include <iostream>
#include <chrono>
#include <thread>
#include <mutex>
#include <condition_variable>
int res = 0; //保存结果的全局变量
std::mutex mu; //互斥锁全局变量
std::condition_variable cond;//全局条件变量
void accumulate(std::vector<int>::iterator first,std::vector<int>::iterator last){
int sum = std::accumulate(first, last, 0);//标准库求和函数
std::cout << sum << std::endl;
std::unique_lock<std::mutex> locker(mu);
std::cout << "执行了" << res << "次" << std::endl;
res = sum;
locker.unlock();
cond.notify_one();//向一个等待线程发出条件已满足通知
}
int main(){
std::vector<int> numbers = {1,2,3,4,5,6};
std::thread work_thread(accumulate,numbers.begin(),numbers.end());
std::unique_lock<std::mutex> locker(mu);
cond.wait(locker,[](){return res;});//如果条件变量被唤醒,检查结果是否被改变,为真则直接返回,为假则继续等待
std::cout << "wait被卡住" << std::endl;
std::cout << "result=" << res << "\n";
locker.unlock();
work_thread.join();
getchar();
return 0;
}
21
执行了0次
wait被卡住
result=21
从上面的代码可以看出,虽然也实现了获取异步任务执行结果的功能,但需要的全局变量较多,多线程间的耦合度也较高,编写复杂程序时容易引入bug。有没有更好的方式实现异步编程呢?C++ 11新增了一个< future >库函数为异步编程提供了很大的便利。
3.通过std::furture来进行线程间的通信
< future >头文件功能允许对特定提供者设置的值进行异步访问,可能在不同的线程中。
这些提供程序(要么是promise 对象,要么是packaged_task对象,或者是对异步的调用async)与future对象共享共享状态:提供者使共享状态就绪的点与future对象访问共享状态的点同步。< future >头文件的结构如下:
图片.png
注意前面提到的共享状态,多线程间传递的返回值或抛出的异常都是在共享状态中交流的。我们知道多线程间并发访问共享数据是需要保持同步的,这里的共享状态是保证返回值或异常在线程间正确传递的关键,被调用线程可以通过改变共享状态通知调用线程返回值或异常已写入完毕,可以访问或操作了。future的状态(future_status)有以下三种:
- deferred:异步操作还没开始;
- ready:异步操作已经完成;
timeout:异步操作超时。
既然线程间传递返回值或异常是通过共享状态进行的,就涉及到共享状态的提供方与获取方,只有该任务或线程拥有包含共享状态的对象,其他任务或线程才能够通过共享状态的通知机制同步获取到该人物或线程的返回值或异常。我们通常使用的< thread >创建线程并不拥有共享状态,我们需要为该线程提供一个共享状态,以便后续对其返回值或异常的访问。那么,怎么为一个线程提供一个包含共享状态的对象呢?这就需要借助std::promise< T >类模板实现了,其具体用法如下:
image
std::promise< T >构造时,产生一个未就绪的共享状态(包含存储的T值和是否就绪的状态)。可设置T值,并让状态变为ready。也可以通过产生一个future对象获取到已就绪的共享状态中的T值。继续使用上面的程序示例,改为使用promise传递结果,修改后的代码如下:
#include <stdio.h>
#include <vector>
#include <future>
#include <numeric>
#include <iostream>
#include <chrono>
void accumulate(std::vector<int>::iterator first,
std::vector<int>::iterator last,
std::promise<int> accumulate_promise){
int sum = std::accumulate(first, last, 0);
accumulate_promise.set_value(sum);//将结果存入,并让共享状态变为就绪以提醒future
}
int main(){
//演示用promise<int>在线程间传递结果
std::vector<int> numbers = {1,2,3,4,5,6};
std::promise<int> accumulate_promise;
std::future<int> accumulate_future = accumulate_promise.get_future();
std::thread work_thread(accumulate,numbers.begin(),numbers.end(),std::move(accumulate_promise));
accumulate_future.wait();//等待结果
std::cout << "result=" << accumulate_future.get() << std::endl;
work_thread.join();
getchar();
return 0;
}
result=21
std::promise< T >对象的成员函数get_future()产生一个std::future< T >对象,代码示例中已经展示了future对象的两个方法:wait()与get(),下面给出更多操作函数供参考:
图片.png
值得注意的是,std::future< T >在多个线程等待时,只有一个线程能获取等待结果。当需要多个线程等待相同的事件的结果(即多处访问同一个共享状态),需要用std::shared_future< T >来替代std::future < T >,std::future< T >也提供了一个将future转换为shared_future的方法f.share(),但转换后原future状态失效。这有点类似于智能指针std::unique_ptr< T >与std::shared_ptr< T >的关系,使用时需要留心。
3.1使用packaged_task与future传递结果
除了为一个任务或线程提供一个包含共享状态的变量,还可以直接把共享状态包装进一个任务或线程中。这就需要借助std::packaged_task< Func >来实现了,其具体用法如下:
image
std::packaged_task< Func >构造时绑定一个函数对象,也产生一个未就绪的共享状态。通过thread启动或者仿函数形式启动该函数对象。但是相比promise,没有提供set_value()公用接口,而是当执行完绑定的函数对象,其执行结果返回值或所抛异常被存储于能通过 std::future 对象访问的共享状态中。继续使用上面的程序示例,改为使用packaged_task传递结果,修改后的代码如下:
#include <stdio.h>
#include <vector>
#include <future>
#include <numeric>
#include <iostream>
#include <chrono>
int accumulate(std::vector<int>::iterator first,
std::vector<int>::iterator last){
int sum = std::accumulate(first, last, 0);
return sum;
}
int main(){
std::vector<int> numbers = {1,2,3,4,5,6};
std::packaged_task<int(std::vector<int>::iterator,std::vector<int>::iterator)> accumulate_task(accumulate);
std::future<int> accumulate_future = accumulate_task.get_future();
std::thread work_thread(std::move(accumulate_task),numbers.begin(),numbers.end());
accumulate_future.wait();//等待结果
std::cout << "result=" << accumulate_future.get() << '\n';
work_thread.join();//阻塞等待线程执行完成
getchar();
return 0;
}
result=21
一般不同函数间传递数据时,主要是借助全局变量、返回值、函数参数等来实现的。上面第一种方法使用全局变量传递数据,会使得不同函数间的耦合度较高,不利于模块化编程。后面两种方法分别通过函数参数与返回值来传递数据,可以降低函数间的耦合度,使编程和维护更简单快捷。
3.2使用async传递结果
前面介绍的std::promise< T >与std::packaged_task< Func >已经提供了较丰富的异步编程工具,但在使用时既需要创建提供共享状态的对象(promise与packaged_task),又需要创建访问共享状态的对象(future与shared_future),还是觉得使用起来不够方便。有没有更简单的异步编程工具呢?future头文件也确实封装了更高级别的函数std::async,其具体用法如下:
std::future std::async(std::launch policy, Func, Args…)
std::async是一个函数而非类模板,其函数执行完后的返回值绑定给使用std::async的std::futrue对象(std::async其实是封装了thread,packged_task的功能,使异步执行一个任务更为方便)。Func是要调用的可调用对象(function, member function, function object, lambda),Args是传递给Func的参数,std::launch policy是启动策略,它控制std::async的异步行为,我们可以用三种不同的启动策略来创建std::async:
std::launch::async参数 保证异步行为,即传递函数将在单独的线程中执行;
std::launch::deferred参数 当其他线程调用get()/wait()来访问共享状态时,将调用非异步行为;
std::launch::async | std::launch::deferred参数 是默认行为(可省略)。有了这个启动策略,它可以异步运行或不运行,这取决于系统的负载。
继续使用上面的程序示例,改为使用std::async传递结果,修改后的代码如下:
#include <stdio.h>
#include <vector>
#include <future>
#include <numeric>
#include <iostream>
#include <chrono>
int accumulate(std::vector<int>::iterator first,
std::vector<int>::iterator last){
int sum = std::accumulate(first,last,0);
return sum;
}
int main(){
std::vector<int> numbers = {1,2,3,4,5,6};
auto accumulate_future = std::async(std::launch::async, accumulate, numbers.begin(),numbers.end());
std::cout << "result=" << accumulate_future.get() << "\n";
getchar();
return 0;
}
result=21
从上面的代码可以看出使用std::async能在很大程度上简少编程工作量,使我们不用关注线程创建内部细节,就能方便的获取异步执行状态和结果,还可以指定线程创建策略。所以,我们可以使用std::async替代线程的创建,让它成为我们做异步操作的首选。
网友评论