chapter5 C++11 多线程
C++11 中 C++ 对 并发编程 提供了 语言级支持
[1] 增加了
线程
线程相关的类
[2] 提高了
多线程程序 的 可移植性
5.1 线程
1 线程创建
std::thread t( threadFunc / threadCallableObj,
threadFunc / threadCallableObj 的 参数)
void func(int a, int b) {}
std::thread t1(std::bind(func, 1, 2) );
std::thread t1([] (int a, int b) { }, 1, 2 );
2 线程创建 的 问题
应该保证 线程对象 lifetime 长于 线程函数 lifetime
否则, std::thread 对象 出 scope 后 会析构, 可能引入 lifetime 问题
如何保证 ?
[1] join
[2] detach
[3] 将 线程对象 保存到 容器中, 保证其 lifetime
std::vector<std::thread> gVec;
std::vector<std::shared_ptr<std::thread> > gVec2;
std::thread t(func);
gVec.push_back(std::move(t) );
gVec2.push_back(std::make_shared<std::thread>(func) );
3 join
[1] target thread(函数) 将 运行于 线程对象 中
[2] block calling(current) thread,
直到 target(newly created) thread (func) 执行完
[3] 返回值 将被 忽略,
若 线程函数有返回值
4 detach
[1] 分离 target thread 与 线程对象
1] 让线程 在 后台运行
2] detach 之后, 无法再与 target thread 联系 => 不能再 join 等
[2] calling/current thread 不会 block
5 线程 可 move, 不可 copy
std::thread t(func);
std::thread t2(std::move(t) );
t.join();
t2.join();
6 获取 线程ID
threadObj.get_id()
std::this_thread::get_id()
7 获取 CPU 提示支持的 并发数
std::thread::hardware_concurrency()
8 线程休眠
std::this_thread::sleep_for( std::chrono::seconds(3) );
例
// 2-3 threadContainer.cpp
#include <iostream>
#include <vector>
#include <thread>
void func()
{
std::cout << "hello " << std::this_thread::get_id() << " end" << std::endl;
}
std::vector<std::thread> gVec;
std::vector<std::shared_ptr<std::thread> > gVec2;
void createThread()
{
std::thread t(func);
gVec.push_back(std::move(t) );
gVec2.push_back(std::make_shared<std::thread>(func) );
}
int main()
{
createThread();
for(auto& thread: gVec)
thread.join();
for(auto& thread: gVec2)
thread->join();
}
5.2 mutex
1种 同步 原语
用于 protect 多线程 同时访问 的 shared data
1
————————————————————————————————————————
独占 std::mutex
递归 std::recursive_mutex
std::timed_mutex
带超时
std::recursive_timed_mutex
————————————————————————————————————————
2 memFunc
lock()
unlock()
try_lock(): non-block, 成功返回 true
3 应用
[1] lock() 与 unlock() 要成对 -> 异常下, 难保证
mutex 多做 global var, 与 protected shared data 放一起
[2] std::lock_guard
RAII
4 递归 mutex
[1] 允许 同一线程 多次获得 该 mutex
可用于 解决 同一线程 需多次获取 mutex 时的
死锁问题
// eg
shared data + mutex
放 class
1] memFunc1 / memFunc2 / memFunc3 均想 access shared data => 均要 lock mutex
2] memFunc3 调 memFunc1 和 memFunc2
用 std::mutex => 死锁
用 std::recursive_mutex => 死锁解决
[2] 尽量不要用
1] 用 的场合, 问题往往可以简化
2] 效率低: 比 非递归 mutex
3] 可重复获得的 最大次数 未定义 => 超过 可能出错
5.3 cv
用于 wait 的 同步机制
可 block >=1 个 waiting thread, 直到 notifying thread 发出 notify, 才能唤醒 waiting thread
cv 要配合 mutex
2 种 cv
condition_variable + std::mutex
condition_variable_any + 任意带 lock/unlock 语义的 mutex
灵活 但 效率低
同步队列
(1) 思路
生产者 put()
1) notFull() 时
才能 put [2]
put 后, 必然 notEmpty() -> 应该 notify[3] notEmptyCv 上 消费者 (去 take)
|
2) else |
|
notFullCv 上 wait [1] |
|\ |
| 由谁 notify ? 答: 消费者 |
| |
|_ _ _ _ _ _ |
| |
消费者 take() | |
| |
1) notEmpty() 时 | |
| 应该 notify[3] notFullCv |
才能 take [2] | 上 生产者 (去 put) |
| |
take 后, 必然 notFull() |
| |
| |
front() |
+ pop_front() |
|
2) else |
|
notEmptyCv 上 wait [1] <- - - - - - - - - - -
(2) 成员 变量
notFullCv
notEmptyCv
mutex
que: std::list
queMaxSize
(3) std::lock_guard 与 std::unique_lock 比较
1) lock_guard : 只能在 lifetime 结束 才(必须 + 最终) unlock mutex
unique_lock: 可 自由 unlock mutex
2) lock_guard + cv.wait() 的 微妙之处
语义上看似矛盾:
1] lock_guard 出 scope 要 unlock mutex
2] cv.wait() 在 wait 时 会 (提前) unlock mutex
实际没问题
wait 时 unlock mutex + waiting
waiting thread 被 notify_one/notify_all 唤醒后, 又先 relock mutex
<=> lock_guard 的 mutex 在 释放(release) 之后 又 获得(acquire) 了
3) 对 cv.wait() 中的 locker,
用 unique_lock 从语义上看更准确
(4) cv.wait() 2 种重载版本
// 版本 1: 更简洁
std::unique_lock<std::mutex> lk(mut);
conditionSatisfiedCv <=>
wokeUpSemanticCv.wait(lk,
[this] {return wokeUpSemanticFunc(); } ); // bool 函数
// 版本 2
std::unique_lock<std::mutex> lk(mut);
while(! wokeUpSemanticFunc() )
wokeUpSemanticCv.wait(lk);
——————————————————————————————————————————————
语义 | wokeUpSemanticCv
——————————————————————————————————————————————
wait 中 | wokeUpSemanticFunc()
——————————————————————————————————————————————
wait 外 while 中 | ! wokeUpSemanticFunc()
——————————————————————————————————————————————
// eg
std::unique_lock<std::mutex> lk(mut);
notFullCv.wait(lk, [this] {return notFull(); } );
std::unique_lock<std::mutex> lk(mut);
while (! notFull() )
notFullCv.wait(lk);
// SynQue.cpp
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <list>
template<typename T>
class SynQue
{
private:
std::condition_variable notFullCv;
std::condition_variable notEmptyCv;
std::mutex mut;
std::list<T> que;
int queMaxSize;
public:
SynQue(int maxSize): queMaxSize(maxSize) {}
~SynQue() {}
// (1) const ref: not modify the pointed obj
void put(const T& x)
{
std::unique_lock<std::mutex> lk(mut);
notFullCv.wait(lk,
[this] {return notFull(); });
que.push_back(x);
notEmptyCv.notify_one();
}
// (2) ref: modify modify the pointed obj
void take(T& x)
{
std::unique_lock<std::mutex> lk(mut);
notEmptyCv.wait(lk,
[this] {return notEmpty(); });
x = que.front();
que.pop_front();
notFullCv.notify_one();
}
// (3) func of condition sastified (woke up) on cv.wait need not use locker: locker already exist before cv.wait
bool notFull()
{
bool isFull = (que.size() == queMaxSize);
if (isFull)
std::cout << "buf full, waiting ...\n";
return !isFull;
}
bool notEmpty()
{
bool isEmpty = que.empty();
if(isEmpty)
std::cout << "buf empty, waiting ...\n";
return !isEmpty;
// return ! que.empty();
}
};
SynQue<int> synQue(100);
void f1()
{
std::cout << "thread1 start ...\n";
synQue.put(1);
std::cout << "thread1 end ...\n";
}
void f2()
{
int result = 0;
std::cout << "thread2 start ...\n";
synQue.take(result);
std::cout << "thread2 result: " << result << std::endl;
std::cout << "thread2 end ...\n";
}
int main()
{
std::thread t1(f1);
std::thread t2(f2);
t1.join();
t2.join();
}
Note: 有如下替换
unique_lock 换为 lock_guard
condition_variable 换为 condition_variable_any
cv.wait 版本1 换位 版本 2
5.4 原子变量
std::atomic<T>
用 原子变量 就不需要用 mutex 来 protect 该变量 了
// (1) 用 mutex 实现的 计时器
struct Counter
{
int value;
std::mutex mut;
void increment()
{
std::lock_guard<std::mutex> lk(mut);
++value;
}
void decrement()
{
std::lock_guard<std::mutex> lk(mut);
--value;
}
int get()
{
return value;
}
};
// (2) 用 原子变量 实现的 计时器
#include <atomic>
struct Counter
{
std::atomic<int> value;
void increment()
{
++value;
}
void decrement()
{
--value;
}
int get()
{
return value.load(); // read
}
};
5.5 call_once/once_flag
call_once
保证 多线程 下, 函数 只被调用 1 次
如, 初始化函数: 某对象 只能初始化 1 次
once_flag 作 第1参数
// callOnce.cpp
#include <iostream>
#include <thread>
#include <mutex>
std::once_flag onceFlag;
void doOnce()
{
std::call_once(onceFlag,
[]() { std::cout << "called once\n"; } );
}
int main()
{
std::thread t1(doOnce);
std::thread t2(doOnce);
t1.join();
t2.join();
}
5.6 异步操作 类
——————————————————————————————————————————————————————————————————————————————————
std::future 异步操作结果 的 传输通道
便于 get 线程函数 return value
——————————————————————————————————————————————————————————————————————————————————
std::promise 包装/保存 ( shared status's ) value
|
|
绑定 data 与 future: promise.set_value_...(dataValue)
|
|
便于 线程赋值 / 获取 线程函数中 某个 (在 promise 上 set 的) 值
——————————————————————————————————————————————————————————————————————————————————
std::package_task 包装/保存 callableObj: func / funcObject / lambda / bindExpr
绑定 callableObj 与 future
便于 异步调用 / 获取 taskFunc(callableObj) return value
——————————————————————————————————————————————————————————————————————————————————
1 std::future
引入: 线程函数 return value
threadObj.join() 无法 直接 get
(1) 间接获得 (繁琐)
[1] 定义 (global) variable
[2] 线程函数 中 给该 variable 赋值 -> variable 作 return value
[3] join
[4] get 线程函数 return value = variableValue
(2) C++ 语言级支持: thread 库 提供
future 来 `访问 异步(操作的)结果`
|
|
不能马上获取, 只能在 未来 某时 从 某地获取
=> 是 未来的期待值 => 称 future
[1] 查 std::future 的 状态 std::future_status 可知
异步任务 的 执行情况
std::future_status::
1] deferred 还没开始
2] ready 已完成
3] timeout 超时
[2] 异步操作的等待 & 异步结果 的 获取
future 的 memFunc
——————————————————————————————————————————————————————
同步方式
等待 异步操作 完成 1] wait
2] wait_for: 超时等待
并 返回/获取 异步结果 3] get
——————————————————————————————————————————————————————
2 std::promise
(1) 线程函数(callableObj 总称) `引用接收` 外传来的 promise,
对 该 promise set_value_ - - - - - - - - - -
|
(2) 线程函数 完成后, 在 外部 |
|
1] 从 promise 获取 promise 中 future |
|/
2] 从 future 获取 线程函数中 某个 (在 promise 上 set 的) 值
3 std::package_task
(1) 线程函数 `引用接收` 外传来的 packaged_task
packaged_task 保存 taskFunc
=> 线程 真正运行的函数 是保存于 packaged_task 中的 taskFunc
|
(2) 线程函数 完成后, 在 外部 |
|
1] 从 promise 获取 promise 中 future |
|
2] 从 future 获取 taskFunc return value <- - - - - - - -
4 std::future / std::promise / std::package_task 3者关系
(1) std::future 与 另 两者 关系
`异步操作结果` 的 传输通道
|
| 是
|
1] 线程中某个值 - std::promise
2] taskFunc 的 return value - std::package_task
(2) std::promise 与 std::package_task
可将 std::package_task 所 wrap 的 异步操作 的 结果 保存到 std::promise
5 future 可 move 不可 copy
想将 future 放到 容器, 用 shared_future
例
// 1 future
std::future_status status;
do
{
status = future.wait_for(std::chrono::seconds(1) );
if(status == std::future_status::deferred)
;
if(status == std::future_status::ready)
;
if(status == std::future_status::timeout)
;
} while( status != std::future_status::ready);
// 2 promise.cpp
#include <iostream>
#include <thread>
#include <future>
void threadFunc(std::promise<int>& promise)
{
promise.set_value_at_thread_exit(1);
}
int main()
{
std::promise<int> pr; // (1)
// std::thread t(threadFunc, std::ref(pr)); // also right
std::thread t([](std::promise<int>& promise)
{
promise.set_value_at_thread_exit(1); // (3)
},
std::ref(pr)); // (2) Note: pass by ref
std::future<int> fut = pr.get_future(); // (4)
auto result = fut.get(); // (5)
std::cout << "result: " << result << std::endl;
t.join(); // (6) join() thread is necessary
}
// 3 packagedTask.cpp
#include <iostream>
#include <thread>
#include <future>
#include <chrono>
int taskFunc()
{
std::cout << "task thread start...\n";
std::this_thread::sleep_for(std::chrono::seconds(1) );
return 5; // (3)
}
int main()
{
std::packaged_task<int()> packagedTask(taskFunc); // (1)
/*
std::packaged_task<int()> packagedTask( []()
{
std::cout << "task thread start...\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
return 5;
} );
*/
std::cout << "task saved \n";
std::thread t(std::ref(packagedTask) ); // (2) Note: pass by ref
std::future<int> fut = packagedTask.get_future(); // (4) get_future()
auto result = fut.get(); // (5) get()
std::cout << "result: " << result << std::endl;
t.join(); // (6) join() thread is necessary
}
// 4 promisePackagedTask.cpp
#include <iostream>
#include <thread>
#include <future>
#include <chrono>
int packagedTaskFunc()
{
std::cout << "task thread start ...\n";
std::this_thread::sleep_for(std::chrono::seconds(1) );
return 5;
}
void promiseThreadFunc(std::promise<int>& promise, std::packaged_task<int()>& packagedTask)
{
std::cout << "promise thread start ...\n";
std::future<int> fut = packagedTask.get_future();
auto result = fut.get();
std::cout << "packagedTask result: " << result << std::endl;
promise.set_value_at_thread_exit(result);
}
int main()
{
std::packaged_task<int()> packagedTask(packagedTaskFunc);
std::promise<int> pr;
std::thread t1(std::ref(packagedTask));
std::thread t2(promiseThreadFunc, std::ref(pr), std::ref(packagedTask) );
std::future<int> fut = pr.get_future();
auto result = fut.get();
std::cout << "promise result: " << result << std::endl;
t1.join();
t2.join();
}
// 5 shared_future.cpp
#include <iostream>
#include <vector>
#include <thread>
#include <future>
#include <chrono>
// #include <utility>
int func(int x)
{
return x + 2;
}
int main()
{
// (1)
std::packaged_task<int(int)> packagedTask(func);
std::future<int> fut = packagedTask.get_future();
std::thread(std::move(packagedTask), 2 ).detach();
int value = fut.get();
std::cout << "return value: " << value << std::endl;
// (2)
std::vector<std::shared_future<int> > vec;
// Note: shared_future Ctor's arg must be rvalue
// shared_future( std::future<T>&& other ) noexcept;
// => below is error
// auto fut = std::async(std::launch::async,[](int a, int b) {return a + b; }, 2, 3);
// vec.push_back(fut); // error: there's no conversion from lvalue std::shared_future<int> to std::shared_future<int>
vec.push_back( std::async(std::launch::async,
[](int a, int b) {return a + b; }, 2, 3)
);
std::cout << vec[0].get() << std::endl;
}
5.7 异步操作 函数
std::async
比 std::promise / std::packaged_task / std::thread 更高1层 (应 优先使用),
[1] 可用于 直接创建 异步 task
异步任务的结果 保存在 async 返回的 std::future<T> 中
[2] 使 coder 不用关注 线程创建 内部细节, 就能
方便获取 异步 执行状态 和 结果
std::async(std::launch::async | std::launch::deferred,
f, args, ...)
2 种 线程创建策略
1] 默认策略 std::launch::async
调用 async 时, 创建线程
2] std::launch::deferred
直到调用了 future 的 get() / wait() 时, 才 创建线程
例
// promise.cpp
#include <iostream>
#include <vector>
#include <thread>
#include <future>
#include <chrono>
int main()
{
std::future<int> fut = std::async(std::launch::async,
[]()
{
std::this_thread::sleep_for(std::chrono::seconds(3) );
return 5;
} );
std::cout << "calling thread waiting ...\n";
std::future_status futStatus;
do
{
futStatus = fut.wait_for(std::chrono::seconds(1) );
if (futStatus == std::future_status::deferred)
std::cout << "deferred \n";
if (futStatus == std::future_status::ready)
std::cout << "ready \n";
if (futStatus == std::future_status::timeout)
std::cout << "timeout \n";
} while (futStatus != std::future_status::ready);
std::cout << "result: " << fut.get() << std::endl;
}
// possible print
calling thread waiting ...
timeout
timeout
ready
result: 5
网友评论