C++11 最重要的新特性: 多线程
2 个好处
[1] 可写 `平台无关的 多线程程序, 移植性 提高`
[2] `并发` 提高性能
1 概述
1 并发
(1) 含义
`任务切换`: 能切出去, 也能切回来
任务切得太快, 看起来像 同时执行多个任务 一样
(2) 2 种方式
[1] `多进程`
`进程间通信: 信号 / 套接字 / 文件 / 管道`
`1个 进程 一套 地址空间`
[2] `多线程`
`同一进程 中 所有线程 共享地址空间`
所有线程 访问到 大部分数据—全局变量仍然是全局的,
指针、对象的引用、数据可 在线程间传递
2 并发 优势
(1) 关注点分离 ( SOC )`
code 分 2 part
“DVD播放”
“用户界面”
[1] 单线程: 两者放一起, 以便调用
[2] 多线程: 分隔关注点, 两者不用放一起
线程1: 处理“用户界面” 事件
线程2: “DVD播放”
线程间 交互 / 关联
用户点击“暂停”
(2) ·算法/数据 并行 -> 提高性能`
[1] 数据相同 / 算法拆分
[2] 计算相同 / 数据拆分
3 C++11 多线程 特点
(1) `线程感知` 的 `内存模型`
(2) 直接支持 `原子操作`
coder 无需了解 与平台相关的汇编指令
就可编写 高效/可移植 代码
compiler 负责 `搞定具体平台`
(3) `C++ 线程库 性能`
`接近 直接使用底层 API 时`
4 多线程入门
(1) 单线程
#include<iostream>
int main()
{
std::cout<<"Hello World\n";
}
(2) 多线程
#include <iostream>
#include <thread> //(1)
void hello() //(2)
{
std::cout<<"Hello Concurrent World\n";
}
int main()
{
std::thread t(hello); //(3)
t.join(); //(4)
}
1) 头文件 <thread>
2) `线程 初始函数`: 新线程执行入口
[1] `主 线程: main()`
[2] `其他 线程: std::thread 对象 ctor 中 指定`
3) std:thread 对象创建 完成 -> 开始执行新线程
4) join(): 让 caller 等待 新线程 完成
2 管理线程: thread/join/detach/RAAI/std::ref/std::bind/move ownership
image.png
线索
1 启动线程, 等待它结束 or 放后台运行
2 给 线程函数 传参
3 transfer ownership of thread
from current associated std::thread object to another
4 确定线程数, 识别特殊线程
#1 基本 线程管理
1.1 启动线程
`线程 在 std::thread 对象创建 时 启动`
1 thread 对象
(1) 一旦构造, 立即开始执行 `线程函数`
(2) 不可 copy, 可 move
copy/赋值 deleted
原因
线程对象 `copy/赋值` 可能 `弄丢已 join 的线程`
(3) 可能 `不 表示/关联 任何线程` => safe to destroy & not joinable
after 4场景之一
[1] 默认构造
=> 无 线程 ( 函数 ) 执行
[2] move from
=> 线程 move 给 other thread 对象去管理
[3] detach
=> 线程 可能仍在执行
[4] join
=> 线程 执行完成
2 线程函数
[1] 不与 caller 通信时, 若 抛出异常
std::terminate 被调用
以终止线程函数
[2] return value 或 异常 可 传给 caller
2种方式
——————————————————————————
1] std::promise
2] 共享变量 (可能需 同步)
——————————————————————————
3 std::thread ctor 的 args: 可调用对象
(1) 入口点为 function
void f();
std::thread my_thread(f)
(2) 入口点为 `可调用对象`
可调用对象 copied into thread's storage
并 `invoked` from there
原始可调用对象 可被立即销毁
class F
{
public:
void operator()() const { do_something(); }
};
F f;
std::thread my_thread(f);
问题: `函数对象 临时无名对象 作 std::thread ctor's arg`
会被 C++ 编译器 解释为 `函数声明`
std::thread my_thread( F() );
3 种 解决
————————————————————————————————————————————————————
[1] 函数对象对象 外再加一层小括号
std::thread my_thread( (F() ) );
————————————————————————————————————————————————————
[2] 花括号
std::thread my_thread{ F() };
————————————————————————————————————————————————————
[3] lambda 表达式 启动线程
std::thread my_thread(
[]{ do_something(); }
);
————————————————————————————————————————————————————
4 join 还是 detach ?
(1) `必须用 + 何时用`
原因
`std::thread 对象 销毁` 前线程 `没被 joined 或 detached`
线程对象
——————————————————————————————————————
[1] joinable
——————————————————————————————————————
[2] dtor 调 std::terminate() 结束程序
——————————————————————————————————————
(2) `detached 线程` 要在 `程序结束前 结束`
原因
main return 时, 正在执行的 detached 线程
——————————————————————————————————————
[1] 被暂停
——————————————————————————————————————
[2] 其 thread-local objects 销毁
——————————————————————————————————————
(3) 用 join 还是 detach ?
————————————————————————————————————————————
[1] join
————————————————————————————————————————————
[2] 除非 你想 `更灵活`
并用 `同步` 机制 去 `等待` 线程完成
此时 `detach`
————————————————————————————————————————————
5 std::terminate() in <exception>
被 C++ runtime 调用
——————————————————————————————————————————————
3 种 case
——————————————————————————————————————————
[1] std::thread 初始函数 抛出异常
——————————————————————————————————————————
[2] joinable 线程对象 `被销毁` 或 `被赋值`
——————————————————————————————————————————
[3] `异常` 被抛出却未被捕获
——————————————————————————————————————————————
6 lifetime 问题: 多(比单)线程 更易发生
`正在运行的 detached 子线程`
access `已被 destroyed 的 object`
=> undefined behavior
|
| 如
|/
caller
——————————————————————————————————————————————————
[1] local 变量 ptr/ref -> 传给 callee 线程 以 访问
——————————————————————————————————————————————————
[2] 已 return
——————————————————————————————————————————————————
[3] 相比 caller 线程, `callee 线程 lifetime 更长`
=> `ptr/ref 悬挂` ( dangling )
|
|/
潜在的 访问 隐患
——————————————————————————————————————————————————
|
| 解决
|/
[1] 线程函数 `自包含`
+
[2] `copy data into thread`
struct func
{
int& i;
func(int& i_) : i(i_) {}
void operator() ()
{
for (unsigned j=0 ; j<1000000 ; ++j)
{
// 潜在的访问隐患: dangling reference
do_something(i);
}
}
};
void oops()
{
int local_state = 0;
func my_func(local_state);
std::thread my_thread(my_func);
my_thread.detach();
}
1.2 `等待 线程完成`
1 怎样 `等待 线程完成`
关联的 std::thread object 上调 join()
2 join() 做了啥
清理 所关联线程的 内存
调 1 次 join 后
线程对象
[1] 不再关联 (已完成的)线程
[2] not joinable <=> joinable() == false
3 等待过程中 控制线程
——————————————————————
[1] 检查线程是否完成
1] cv
2] futures 机制
——————————————————————
[2] 等待特定时间
——————————————————————
1.3 异常下的 等待
1 `call join() 的位置 要精挑细选`
(1) 问题
call join() 前 抛出异常
join() 调用 被跳过
`lifetime problem`
(2) 解决 1
让 join() 不被跳过
use try/catch, `catch 中 也调 join()`
缺点
[1] try/catch 冗长
[2] scope 易出错
struct func; // defined in list2.1
void f()
{
int some_local_state=0;
func my_func(some_local_state);
std::thread t(my_func);
try
{
do_something_in_current_thread();
}
catch(...)
{
t.join(); // 1
throw;
}
t.join(); // 2
}
(3) 解决 2
RAII
本质含义
`让 资源管理对象 own 资源, 在 析构时 释放资源
=> `资源的 lifetime 管理` 就和 `资源管理对象 lifetime 管理` 统一起来
=> 只要 `不泄漏 资源管理对象, 就能保证 不泄漏资源`
至于是 `让 ctor 自己 创建资源`,
`还是 把资源创建好 再交给 ctor 保管`,
没有优劣之分
可用 `static Factory Method 创建资源对象`
`client 就不用管 里边怎么实现了`
实现
线程对象
|
| 设
|/
保护类 thread_guard
[1] explicit ctor: 左值引用 para = std::thread& t_
|
| `线程对象` 作 arg 引用传递 构造 `线程保护对象`
|/
[2] init. list 初始化 / 绑定
|
|/
[3] 左值引用 成员 = std::thread& t
=> `线程 的 ownership 没有转移`
=> `caller 可能 已对线程 调 join()`
=> [4] thread_guard `dtor 中 调 join() 前 必须 先判线程是否可 joinable`
=> 线程 `caller 函数 执行 }`
即 ret 指令 `弹栈 逆序销毁` local 对象 时
先 `销毁 保护对象`
`调 保护对象 dtor 中 可能 调 join()`
[5] copy 和 赋值 delete
因 `资源管理对象` 可能 `outlive 其管理的 `线程对象` 的 `scope`
//list2.3 用 RAII 等待线程完成
class thread_guard
{
private:
std::thread& t; // [3]
public:
explicit thread_guard(std::thread& t_) // [1]
:t(t_){} // [2]
~thread_guard()
{
// [4] 先 test joinable(), 再 join()
if( t.joinable() )
{
t.join();
}
}
// [5] copy 和 赋值 delete
thread_guard(thread_guard const&)=delete;
thread_guard& operator=(thread_guard const&)=delete;
};
struct func; // defined in list2.1
void f()
{
int state = 0;
func f(state);
std::thread t(f);
// `新线程对象` 作 arg 引用传递 构造 `线程保护对象`
thread_guard g(t);
do_something_in_current_thread();
}
(3) 解决 3
若 不必 等待线程结束
`detach & copy data into thread`
可避免 `异常安全` 问题
`detach` 会 `打破 线程 与 std::thread 对象` 间 `关联`
只要 detach 线程 先于 main 函数 退出,
1.4 后台 / background 运行线程
1 detach 线程: std::thread instance上调 detach()`
`detached 线程` 特点
[1] 在后台(typically 长时间)运行
称 `守护线程`
应用
1> 监控文件系统
2> 实现发后即忘/fire and forget work:
只管消息发送, 不管消息接收
[2] 无法 获得 关联它的 std::thread object
=> 无法直接 communicate with it
it can `no longer be joined
=> 无法等待它完成`
[3] `ownership 和 control 被传给 C++ runtime 库`
保证了 线程关联资源
在线程结束时 被正确回收
2 application : word processor
`可一次编辑多个文档`
UI级和内部 处理的一种方法:
`1 个窗口 -> 1 个被编辑文档`
`edit_document (编辑文档) 作 线程函数` + 其中 通过 `与用户交互 获取 用户欲执行动作`
若为 打开新文档
`起 新线程 & 递归调 线程函数 edit_document`
else
执行相应 操作
//List 2.4 Detach a thread to handle other document
//para: filename
void edit_document(std::string const& filename)
{
//1. open file
open_document_and_display_gui(filename);
//2. Loop, until edit is done/finished
while( !done_editing() )
{
// 3. get user's input(cmd)
user_command cmd=get_user_input();
//(1) if user's cmd is open new file
if(cmd.type==open_new_document)
{
//(2) get filename from user
std::string const new_name=get_filename_from_user();
//(3) start new thread to open:
//the operations of new and current thread are the same.
//So, entry function reuse the caller function
std::thread t(edit_document,new_name);
//(4) detech new thread
t.detach();
}
else // 5. otherwise, process according to user's choise
{
process_user_input(cmd);
}
}
}
#2 传参 给 线程函数
2.1 thread ctor 接口
——————————————————————————————————————————————————————————————————————————————————————————
线程函数 | thread ctor 的 paraList
——————————————————————————————————————————————————————————————————————————————————————————
非成员函数 | [1] callable obj [2] 相应 `函数调用运算符` 的 para...
——————————————————————————————————————————————————————————————————————————————————————————
成员函数 | [1] `成员函数指针` [2] 相应 对象的指针 [3] 相应 `成员函数` 的 para...
——————————————————————————————————————————————————————————————————————————————————————————
2.2 内部机制
1 `默认下`
std::thread Ctor 实参
[1] copied
副本 在新线程 context 中 `隐式转换 为 相应参数类型`
[2] 除非 用 `std::ref 等`
void f(std::string const& s);
std::thread t(f, "hello");
如
实参 `字符串 字面量`
[0] 是 ptr: char const*
[1] copied
[2] 副本 converted 为 std::string
[3] 表面上 引用传递
实际是 ptr copy 作 arg 调 string Ctor
问题
caller 函数
在 thread ctor 的 `ptr/ref 型 arg` 的
`副本 转换为 相应参数前 退出`
`所指对象销毁`
`副本指针 悬挂`
=> undefined behavior
解决
[1] 先 强转 arg, 再 传给 thread ctor`
[2] ptr 用 具有 `移动语义` 的 对象 管理
std::thread t(f, std::string(buffer) );
2 std::ref() / std::bind wrap 想 `引用传递` 的 `arg`
|
| 想
|/
更新数据
[1] std::ref()
void f(Data& data);
Data data;
std::thread t(f, std::ref(data) );
[2] std::bind <functional>
std::thread ctor 与 std::bind 机制相同
X x;
std::thread t(&X::f, &x);
#3 转移 线程 ownership
1 为什么要引出 move of std::thread
应对 2 种场景
线程 ownership 转交给
——————————————
[1] caller
[2] 其他 函数
——————————————
(1) `ownership moved out of a function`
`形似 pass by value, 实际却是 move 语义`
std::thread caller()
{
void callee();
return std::thread(callee);
}
(2) `ownership moved into a function`
void g(std::thread t);
void f()
{
void thread_func();
g( std::thread(thread_func) );
}
2 move 机制
—————————————————————————————————————————
[1] 隐含自动 move
左 或 右运算对象 至少一侧为 右值
—————————————————————————————————————————
[2] 显式 调 std::move()
—————————————————————————————————————————
2 thread_guard 与 scoped_thread
————————————————————————————————————————————————————————————————————————————
是否转移线程 ownship | 否 | 是
————————————————————————————————————————————————————————————————————————————
成员 | 左值引用 std::thread& t | 值 std::thread t
————————————————————————————————————————————————————————————————————————————
args 传递方式 | 左值引用 传递 | 值传递 + 移动语义
————————————————————————————————————————————————————————————————————————————
dtor 中 call join 前 | |
是否 check joinable | 是 | 否
————————————————————————————————————————————————————————————————————————————
ctor 中 | |
是否 check joinable | 否: 啥也不干 | 是
————————————————————————————————————————————————————————————————————————————
class scoped_thread
{
private:
std::thread t;
public:
explicit scoped_thread(std::thread t_) // 隐式 move
: t( std::move(t_) ) // 显式 move
{
if( !t.joinable() ) // Note
throw std::logic_error(“No thread”);
}
~scoped_thread()
{
t.join();
}
scoped_thread(scoped_thread const&)=delete;
scoped_thread& operator=(scoped_thread const&)=delete;
};
struct F;
void f()
{
int local_state;
scoped_thread st( std::thread( F(local_state) ) );
do_something_in_current_thread();
}
#4 runtime 时 选择 动态数量的线程 : 用 线程 groups 来 divide work
std::thread::hardware_concurrency()
硬件支持的并发线程数 的 hint
int hardware_threads = std::thread::hardware_concurrency();
int n_threads = std::min(hardware_threads != 0 ? hardware_threads : 2);
[1] 线程 vector
std::vector<std::thread> threads(n_threads);
[2] 启动 n_threads 个子线程
for i = 0 ~ n_threads-1
threads[i] = std::thread(
f, block_start, block_end, std::ref(results[i] )
);
[3] wait 子线程
std::for_each(threads.begin(), threads.end(),
std::mem_fn(&std::thread::join) );
#5 识别线程:目的 `把 data 或 behavior 与 线程 关联起来`
1 线程 identifier 是 std::thread::id type
std::thread::id master_thread;
void f()
{
if( std::this_thread::get_id() == master_thread )
{
do_master_thread_work();
}
do_common_work();
}
2 线程ID 可作 关联容器 中 `key`
3 在线程间 共享 data
image.png
image.png
2个目标
[1] 避免潜在问题
[2] 最大化收益
`并发 好处`
线程间 `共享数据` 容易、直接
1.1 线程间 共享 数据
双向链表 delete node == update next + update prev
只 update next 时, `invariant 被 打破了`
若此时另一线程 正 access this node & 没做保护处理
=> 该 `race condition` 会 引起 bug
1 问题: race condition 竞争条件
`结果取决于 多个线程` 上 `操作` 执行的 `相对顺序`
多个线程 竞争地去执行 各自的 `操作`
|
|/
抢票
data race
对象 并发修改 对象
=> undefined behavior
2 解决
(1) `用 mutex wrap 数据结构`
以保证 `只有 真正执行修改的那个线程 可看到 invariants 被打破的状态`
(2) lock-free 编程
修改 数据结构的设计 及其 invariant
`分离 变化` 成 `n 个 不可分割的 更小变化`
每个小变化 保护 其 invariant
(3) STM / software transactional(事务) memory
数据库更新
update 数据结构 == process a transaction/事务
`store` series of `数据修改和读` to a `事务log`
commit it in a step
`commit 失败, 事务重启`
2.1 mutex 机制
1 `mut`ually `ex`clusive 互斥
保护 共享数据
access 前 lock 其关联的 `mutex`
access 后 unlock
2 mutex 并不是 silver bullet 银弹
更重要的是
[1] `设计代码结构` 来 `保护数据` 2.2节
[2] `接口中内在地避免 race conditions` 2.3节
3 mutex 问题
[1] 死锁
[2] 保护范围 要多大
2.2 mutexes 使用
1 mutex 创建/lock/unlock
创建
上/解 锁
成员函数 lock()/unlock()
2 对 mutex 不推荐 手工 调 lock()
原因: mutex 是一种 资源
与 对 thread 不推荐手工调 join() 一样
———————————————————————————————————————————————
资源 | 资源管理类 / RAII 类
———————————————————————————————————————————————
thread 对象 | thread_guard 或 scoped_thread
———————————————————————————————————————————————
mutex 对象 | std::lock_guard
———————————————————————————————————————————————
//list3.1 Protect list with mutex
#include <list>
#include <mutex>
#include <algorithm>
std::list<int> lst; //1 单个 全局变量
std::mutex mut; //2 用 相应 全局 std::mutex 保护
void add_to_list(int val)
{
std::lock_guard<std::mutex> guard(mut);
lst.push_back(val);
}
bool list_contains(int val)
{
std::lock_guard<std::mutex> guard(mut);
return std::find(lst.begin(),lst.end(),val) != lst.end();
}
|
|
| 重构
|
| [1] 函数 作 public 成员
|
| [2] 被保护数据 + mutex 作 private 成员
|/
2.3 Structuring code 以 保护 共享数据
1 问题
`迷途( stray ) ptr/ref` 和 `后门`
若 成员函数 将 被保护数据 的 `ref/ptr 传出` lock 的 scope
|
| 如
|/
——————————————————————————————————————————————————————————————————————————————————
[1] `隐晦地 传出`
成员函数 para 为 `( 用户策略 ) 函数对象`
|
| 函数调用运算符 operator() 的 para 为
|/
ref / ptr 参数
——————————————————————————————————————————————————————————————————————————————————
[2] 显而易见 的 传出
return
——————————————————————————————————————————————————————————————————————————————————
=> 留了 `后门 ( backdoor )`
=> 任何能 访问 ptr/ref 的 code 可
`绕过(bypass) mutex 的 保护` access `被保护数据`
|
|/
不用 lock mutex
(1) code 结构
——————————————————————————————————————————————————————————
[1] ProtectedData + mutex 作 其 管理类(DataWrapper) 成员
——————————————————————————————————————————————————————————
[2] 成员函数 para 为 `函数对象`
接受 `用户(恶意 malicious)策略函数`
|
| [3]
|/
——————————————————————————————————————————————
1] `引用参数` => 可 `取 被保护数据 的 ptr`
2] 绕过 mutex 保护 access 被保护数据
——————————————————————————————————————————————————————————
(2) 根因
没有把 access 数据 的 所有代码段 标记为 `互斥`
`漏掉了 保护 传出的 被保护数据 的 ref`
(3) 更深的 陷阱 ( pitfall )
接口函数间 调用顺序( 接口设计 ) 导致 race conditions — 2.3节
class Data // [] ProtectedData
{
int a;
std::string b;
public:
void mf();
};
class DataWrapper
{
private:
Data data; // [1] ProtectedData + mutex 作 其 管理类(DataWrapper) 成员
std::mutex m;
public:
// [2] 成员函数 para 为 `函数对象`
// 接受 `用户(恶意 malicious)策略函数`
template<typename F>
void processData(F f)
{
std::lock_guard<std::mutex> l(m);
f(data);
}
// 略去 ctor
};
// === client
Data* gpd;
// [3] 用户(恶意 malicious)策略函数: 1] `引用参数` => 可 `取 被保护数据 的 ptr`
void maliciousFunc(Data& protected_data)
{
gpd = &protected_data;
// 2] 绕过 mutex 保护 access 被保护数据
}
DataWrapper x;
void f()
{
x.processData(maliciousFunc);
gpd->mf();
}
2.4 在接口本身中 发现 ( Spotting ) race conditions
1 双向 list
删除 1个 node
线程安全
阻止 前中后共 3个节点 的 并发 accesses
问题
单独保护 每个节点
仍有 race conditions
解决
`单个 mutex` 保护 `整个 数据结构(list)`
2 线程安全 的 栈
// List 3.3 std::stack 的 接口
template<typename T, typename Container = std::deque<T> >
class stack
{
public:
explicit stack(const Container&);
explicit stack(Container&& = Container() );
template <class Alloc> explicit stack(const Alloc&);
template <class Alloc> stack(const Container&, const Alloc&);
template <class Alloc> stack(Container&&, const Alloc&);
template <class Alloc> stack(stack&&, const Alloc&);
size_t size() const;
//
bool empty() const;
T& top();
T const& top() const;
void pop();
void push(T const&);
void push(T&&);
void swap(stack&&);
};
(1) 问题
调用顺序 为 empty() -> top() -> pop() 时, `not 线程安全`
[1] 线程1 empty() top() 间 线程2 pop()
empty() 判非空 -> pop() 使栈 空 -> top()
原因: 接口设计导致
stack<int> s;
if(!s.empty() )
{
int const value = s.top();
s.pop();
do_something(value);
}
[2] 线程1 top() pop() 间 线程2 top()
`2个线程 本打算 分别 处理 顶值和次顶值, 实际 都 处理 顶值`
次顶值 没处理 就被移除了
(2) 解决
( 有 隐患 )
|\
|
[1] mutex + 联合调 top 和 pop
若 stack 对象 copy ctor 可能 抛出异常
Herb Sutter 给出 solution
[2] pop() = std::stack 的 empty() + top() + pop()
————————————————————————————————————————————————
1] `构造 栈元素类型 新值` + pass by reference
+
2] 赋值 给 构造的新值 = `引用 实参/形参`
————————————————————————————————————————————————
缺点
1] 构造 代价太高 for some type
2] popped value type 必须 `可 赋值`
许多 用户定义类型 不支持 assignment
[3] 用 copy ctor 或 move ctor 不抛出异常 的类型
C++11 的 `右值引用`
使 更多类型 move ctor 不会抛出异常, 虽然 copy ctor 会
缺点
不通用
用户自定义类型
1] copy ctor 可能 抛出异常
2] 没有 move ctor
[4] `return ptr` 指向 popped item
`ptr 可安全 copy, 不会 抛出异常`
缺点
要内存管理, 简单类型时 开销大
用 std::shared_ptr
[5] 解决 [2] + [3] 或 [4]
接口简化
例 [2] + [4]
5个操作 变 3个 好处
empty empty
top
pop pop: 2 个 重载版本 [2] [4]
push push
swap
mutex 保护 std::stack 上 `完整 3 操作` empty() -> top() -> pop()
——————————————————————————————————————————————————————————————————————————————————————————————————
| [4] | [2]
——————————————————————————————————————————————————————————————————————————————————————————————————
函数原型 | std::shared_ptr<T> pop() | void pop(T& value)
——————————————————————————————————————————————————————————————————————————————————————————————————
para | 空 | 模板参数 T `引用类型`
——————————————————————————————————————————————————————————————————————————————————————————————————
实现关键 | 似 placement new + args 转发 | 1] pop 的 caller 负责 构造
取出 top 元素 | pop() 负责 构造 | 2] 赋值 给 构造的新值 = `引用 实参/形参` Note: 只是 引用参数, 不是 取出/top 栈中 元素的引用
怎么办 ? | std::shared_ptr<T> const res( | value = stk.top(); // T 可 赋值
用于`构造 栈元素类型 新值`| std::make_shared<T>( stk.top() ) ); |
——————————————————————————————————————————————————————————————————————————————————————————————————
记住 解决 [4] 易 => 解决 [2] 等
——————————————————————————————————————————————————————————————————————————————————————————————————
template< class T, class... Args >
shared_ptr<T> make_shared( Args&&... args ) // <memory>
T 非 数组
|\
| Note
|
构造 T 型 object
wrap 进 std::shared_ptr
就像
::new (pv) T(std::forward<Args>(args)...) // placement new + args 转发
|
| internal
|/
internal void*
// 解决 [4]: 记住 解决 [4] => 解决 [2] 等
template<typename T>
std::shared_ptr<T> threadsafe_stack::pop()
{
std::lock_guard<std::mutex> lock(m);
if( stk.empty() )
throw empty_stack();
std::shared_ptr<T> const res(
std::make_shared<T>( stk.top() ) );
stk.pop();
return res;
}
// 用 sloution [2] + [4] 实现 接口 无 race conditions
#include <exception>
#include <memory>
#include <mutex>
#include <stack>
struct empty_stack: std::exception
{
const char* what() const throw();
};
template<typename T>
class threadsafe_stack
{
private:
std::stack<T> stk; // [1] internal std::stack
mutable std::mutex m;
public:
threadsafe_stack(){}
threadsafe_stack(const threadsafe_stack& rhs)
{
// [2] copy ctor: under mutex
std::lock_guard<std::mutex> lock(rhs.m);
stk = rhs.stk;
}
// [3] Assignment deleted & have no swap()
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
bool
empty() const
{
std::lock_guard<std::mutex> lock(m);
return stk.empty();
}
// 解决 [4]: 记住 解决 [4] => 解决 [2] 等
std::shared_ptr<T>
pop()
{
std::lock_guard<std::mutex> lock(m);
if( stk.empty() )
throw empty_stack();
std::shared_ptr<T> const res(
std::make_shared<T>( stk.top() ) );
stk.pop();
return res;
}
// 解决 [2] : two overloads all empty()->top()->pop()
void
pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if(stk.empty() )
throw empty_stack();
value = stk.top(); // T 可 赋值 // dif
stk.pop();
}
void push(T val)
{
std::lock_guard<std::mutex> lock(m);
stk.push(val);
}
};
3 lock 粒度/范围
————————————————————————————————————————————————————————————————————
[1] 大 + 1个 mutex | 并发增益小
————————————————————————————————————————————————————————————————————
[2] 小 + 1个 mutex | 没有完全保护期望的操作 => race condition
————————————————————————————————————————————————————————————————————
[3] 更小 + 多个 mutex | 死锁: 两线程相互等待, 结果谁也不往前走
————————————————————————————————————————————————————————————————————
2.5 死锁 解决
1 死锁: 2个线程, 2个mutex
2线程 都要 lock 2个 mutex 才能执行
2个线程 各锁 1个mutex 时, 都在 等对方锁住的 mutex 被释放
2个线程都在死等
场景: 无锁 case 下的 死锁
`2 个 thread func` 在 对方 `线程对象` 上 调 join()
|
| std::thread t1;
|/ std::thread t2(f2, std::ref(t1) );
para 为 `线程对象 ref` t1 = std::thread(f1, std::ref(t2) );
void f1(std::thread& t_) { t_.join(); }
死锁: 都等 对方结束 void f2(std::thread& t_) { /* 休眠 5秒 */ t_.join(); }
2.6 避免死锁 的方法
思想
线程1 可能等 线程2 时, 线程2 就不要 等线程 1 了
1 避免 嵌套锁
2 当 持有1个锁 时, 避免调 `用户代码`
|
|/
可能 获得 另一个 锁
=> 可能会 嵌套锁
3 每个线程 以 相同顺序 获得多个锁
总在 mutex1 前 锁住 mutex2
[1] 适用于: 不同 mutexes for 不同 purposes
|
| [2] 问题: 不能 解决 的 死锁
|/
2个 mutex 保护 同 class 2个 instance
swap
|
| 解决
|/
std::lock() + std::lock_guard + std::adopt_lock 作 第 2 参数
| |
| |/
1次 lock 多个 mutex 告诉 std::lock_guard Ctor
[1] 其 第1参数 mutex 已上锁
[2] 构造时只需 接收 mutex 上 mutex 的 ownership
| [3] 问题: 不能 解决 的 死锁
|
| 多个锁 被 `分开 获得`
|/
//List 3.6 swap 操作中用 std::lock() + std::lock_guard + std::adopt_lock
class A;
void swap(A& lhs, A& rhs);
class X
{
private:
A a;
std::mutex m;
public:
X(A const& sd):a(sd){}
friend void swap(X& lhs, X& rhs)
{
if(&lhs==&rhs)
return;
std::lock(lhs.m,rhs.m); //一次锁住2个mutex
std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock);
std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock);
swap(lhs.a, rhs.a);
}
};
4 锁层次
[1] application 分 多层
[2] 某层 已拥有 某 mutex 上的锁
就 `不允许 上一层 lock 另一 mutex`
// List 3.7 用层次锁避免死锁
1] 调 high_level_func()
lock high_level_mutex
-> 调 low_level_func():
lock low_level_mutex, 返回值作为high_level_stuff() 参数
=> 遵守层次锁规则
2] lock other_mutex(低层次值)
-> 调 high_level_func():
lock 高层次值的 mutex
=> 不 遵守层次锁规则,
hierarchical_mutex 报告错误, runtime 时失败
3] 用户自定义 mutex 类 hierarchical_mutex
实现 符合 mutex 概念的
3个成员函数: lock() unlock() try_lock()
=> 可用于 std::lock_guard
// 1. hierarchy value of 10000
hierarchical_mutex high_level_mutex(10000);
//2.
hierarchical_mutex low_level_mutex(5000);
int do_low_level_stuff();
int low_level_func()
{
std::lock_guard<hierarchical_mutex> lk(low_level_mutex);
return do_low_level_stuff();
}
void high_level_stuff(int some_param);
void high_level_func()
{
std::lock_guard<hierarchical_mutex> lk(high_level_mutex);
high_level_stuff( low_level_func() );
}
// ---------------------
void thread_a() // 遵守 层次锁 规则
{
high_level_func();
}
// ---------------------
hierarchical_mutex other_mutex(100);
void do_other_stuff();
void other_stuff()
{
high_level_func();
do_other_stuff();
}
void thread_b() // 不遵守 层次锁 规则
{
std::lock_guard<hierarchical_mutex> lk(other_mutex);
other_stuff();
}
//List 3.8 hierarchical mutex
class hierarchical_mutex
{
// 4个成员变量
//1. 内部标准 mutex, 被 委托 真正 调 lock() unlock() try_lock()
std::mutex internal_mutex;
//2. 该 mutex 当前层次值
// 1)ctor 中 初始化 为指定值
// 2)比 本线程 层次值 小时, 检查通过
// 3)更新时, 最后 赋值 给 本线程层次值
unsigned long const hierarchy_value;
//3.该mutex 先前层次值
// 1)ctor 中 初始化 为0, 先不用
// 2)更新时, 先被 赋值 为 本线程层次值
// 3)unlock()中 用于 恢复 本线程层次值
unsigned long previous_hierarchy_value;
/* 4. 本线程 层次值
1) 1个线程 可能 lock 多个 层次mutex,
保存1个 本线程层次值,
以保证 `lock 的 层次 mutex 链`中,
`各 层次 mutex 的 层次值 依次减小`; 否则, 抛出 逻辑错误 异常
=> 用 thread_local
2) 又因为 属于class 而是 instance
=> 用 static
*/
static thread_local unsigned long this_thread_hierarchy_value;
// 2 个 私有函数: 检查 & 更新
void check_for_hierarchy_violation()
{
if(hierarchy_value >= this_thread_hierarchy_value)
{
throw std::logic_error(“mutex hierarchy violated”);
}
}
void update_hierarchy_value()
{
// 1)记录 次新的 本线程层次值, 以
// `沿着 已 lock 的 各 层次 mutex`,
// `逐层恢复出 原来的 本线程层次值`
previous_hierarchy_value = this_thread_hierarchy_value;
// 2)更新 最新的 本线程层次值,
// 以 lock 下一层上的 层次mutex
this_thread_hierarchy_value = hierarchy_value;
}
public: // 4 个 接口函数
explicit hierarchical_mutex(unsigned long value):
hierarchy_value(value), previous_hierarchy_value(0){}
// 检查 层次锁->委托 标准 mutex lock-> 更新 线程层次值
void lock()
{
check_for_hierarchy_violation();
internal_mutex.lock();
update_hierarchy_value();
}
// 恢复 本线程 次新层次值 -> unlock
void unlock()
{
this_thread_hierarchy_value = previous_hierarchy_value;
internal_mutex.unlock();
}
// 与 lock() 区别: mutex 的lock已被另一线程拥有时,返回false
bool try_lock()
{
check_for_hierarchy_violation();
if( !internal_mutex.try_lock() )
return false;
update_hierarchy_value();
return true;
}
};
//本线程 层次值 初始化为 最大值
thread_local unsigned long
hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);
2.7 std::unique_lock 实现更灵活的 locking
1 思想
松弛 invariants
并不总是拥有 mutex
代价
内部 要存/更新 1个 flag 来标识是否拥有 mutex
2 std::defer_lock 作 第2参数, 告诉 std::unique_lock Ctor, 构造时 `mutex unlocked`
3 std::unique_lock 对象
2种方法 加锁
1) 其 上调用 lock()
2) 传给 std::lock()
因为 std::unique_lock 提供 3 个成员函数
lock() try_lock() unlock()
4 std::lock_guard & std::unique_lock
同 都可
RAII 锁管理
异
1) 成员函数
前者 只有 ctor & dtor: ctor 中 加锁 + dtor 中 解锁
后者 还有 3 个成员函数 lock() try_lock() unlock() => 更灵活
2) 是否可 `管理 mutex 的 lifetime`
否/是
=>前者 最好将 mutex 设为 global 变量
swap 的 std::unique_lock 版本
std::unique_lock + std::defer_lock 作 第2参数 + std::lock(uniLkm1, uniLkm2);
|
|/
告诉 std::unique_lock Ctor, 构造时 `mutex unlocked`
class A;
void swap(A& lhs,A& rhs);
class X
{
private:
A a;
std::mutex m;
public:
X(A const& a_) : a(a_){}
friend void swap(X& lhs, X& rhs)
{
if(&lhs == &rhs)
return;
std::unique_lock<std::mutex> uniLkm1(lhs.m, std::defer_lock);
std::unique_lock<std::mutex> uniLkm2(rhs.m, std::defer_lock);
std::lock(uniLkm1, uniLkm2);
swap(lhs.a,rhs.a);
}
};
2.8 转移 mutex ownership between scope
1 mutex 的 ownership 可在 std::unique_lock 间 move
std::unique_lock: movable 但 not copyable
2 std::unique_lock 3 种灵活应用
(1) 函数 lock mutex, transfer 该 mutex 的 ownership 给其 caller 的 lock
std::unique_lock<std::mutex>
get_lock()
{
extern std::mutex mut; // extern mutex
std::unique_lock<std::mutex> lk(mut);
prepare_data();
// 目的是右值 => implicitly move
return lk;
}
void processData()
{
// 源是右值 => implicitly move
std::unique_lock<std::mutex> lk(get_lock());
do_something();
}
(2) lock 不直接返回, 而是作 gateway class 的 mem,
所有 access to data 都通过 gateway 类
gateway
[1] destory 时, releases lock
[2] movable
(3) std::unique_lock 可在 `销毁前 释放 其 管理的 mutex`
|
|
|/
unlock() 中
=> 提高性能
2.9 合适粒度下 上锁
`真正 访问 共享数据 时 再 lock` mutex
左侧 和 右侧 在不同时刻相等
(可能有场景需要得到这种信息)
两次 read 之间, 两侧值可能已经被改了
void get_and_processData()
{
// 加锁 -> 获取1块处理数据 -> proces 前 解锁
// -> process -> process 后 再加锁 -> process 结果 写到result
std::unique_lock<std::mutex> my_lock(the_mutex);
some_class data_to_process = get_next_T();
my_lock.unlock();
result_type result=process(data_to_process);
my_lock.lock();
write_result(data_to_process,result);
}
//List3.10 Lock one mutex at a time
// in a comparison operator
class Y
{
private:
int a;
mutable std::mutex m;
int get_detail() const
{
std::lock_guard<std::mutex> lock_a(m);
return a;
}
public:
Y(int sd):a(sd){}
friend bool operator==(Y const& lhs, Y const& rhs)
{
if(&lhs==&rhs)
return true;
int const lhs_value=lhs.get_detail();
int const rhs_value=rhs.get_detail();
return lhs_value==rhs_value;
}
};
3.1 `初始化期间` 保护 共享数据
1 延迟 初始化
共享数据 只在 `初始化 并发 access 阶段` 才用 mutex 保护
|
| 如: 打开 数据库连接
|/
构造代价
(1) 单检测: `资源 (指针)` 是否 已初始化
1) 思想
资源 使用前检测
若 没初始化
先 初始化, 再 解锁, 再 使用
|
|/
spr.reset(new R);
2) code 结构
—————————————————————————————————————————————————
[1] 加锁 => 所有线程 串行
[2] 检测
[3] 初始化
[4] 解锁
[5] 使用 资源指针
—————————————————————————————————————————————————
加/解 锁
std::unique_lock + 其 成员函数 unlock()
—————————————————————————————————————————————————
3) 问题
第1次检测 前 加锁 => 检测处 `串行` => `并发性差`
|
| (半)解决
|/
(2) 双检测锁 ( Double-Checked Locking )
1) 思想
`第2次检查 前 才加锁`
2) code 结构
—————————————————————————————————————————————————
[1] 第1次检测
[2] 加锁
[3] 第2次检测
—————————————————————————————————————————————————
加/解锁
std::lock_guard + scope 结束 Dtor 解锁
—————————————————————————————————————————————————
3) 问题
1条 new 语句 分解为 3 条底层语句
——————————-————
[1] 分配 内存
[2] 构造
[3] 指针赋值
————————————————
compiler 可能重排执行顺序为 [1][3][2]
线程 1 资源 `构造未完成 -> 资源 ptr 就 已赋值`
线程 2 检测 资源 ptr 非空
`访问 初始化未完成` 的 资源
data race 型 race condition
undefined behavior
// 单检测
std::shared_ptr<R> spr;
std::mutex mut;
void f()
{
std::unique_lock<std::mutex> lk(mut);
if(!spr)
{
spr.reset(new R);
}
lk.unlock();
spr->mf();
}
// 双检测
void f()
{
if(!spr)
{
std::lock_guard<std::mutex> lk(mut);
if(!spr)
{
spr.reset(new R);
}
}
spr->mf();
}
解决
std::call_once(std::once_flag, 资源初始化函数, func_args)
[1] 多线程 `并发进入` std::call_once 函数 + `同步`
但 `只有1个线程 ( active 线程 ) 真正执行` 第 2 arg: (资源初始化) `函数`
其他线程 ( passive/被动 线程 ) 进入 std::call_once 后
`等待` active 线程 完成初始化 + 返回 后, 再返回
|
|/
同步
[2] Note
当前线程 从 std::call_once 返回时,
资源初始化 已完成, 但可能由 另一线程 完成
[3] std::call_once 参数
template< class Callable, class... Args >
void call_once( std::once_flag& flag, Callable&& f, Args&&... args ) // <mutex>
——————————————————————————————————————————————————————
1] 第1参数 std::once_flag
——————————————————————————————————————————————————————
2] 其余参数 与 std::thread Ctor
——————————————————————————————————————————————————
1> 形式 同 `右值引用 args`
——————————————————————————————————————————————————
2> 目的 不同 `避免 copy, 而不是 实现 move 语义`
因为 不必转移到 另一线程 去执行
——————————————————————————————————————————————————————
[4] std::once_flag
功能相当于 std::mutex
同 `不能 copy`
1) 作 namespace-scope
std::shared_ptr<R> pr; // 资源指针
std::once_flag rFlag; //std::once_flag
void initResource() { pr.reset(new R); }
void f()
{
std::call_once(rFlag, initResource);
pr->mf();
}
2) 作 类的 (private)成员
class X
{
private:
connection_info connection_details;
connection_handle connection; // resource handle
std::once_flag connection_init_flag;
void
open_connection()
{
connection=connection_manager.open(connection_details);
}
public:
X(connection_info const& connection_details_):
connection_details(connection_details_) {}
void
send_data(data_packet const& data)
{
std::call_once(connection_init_flag, &X::open_connection, this);
connection.send_data(data);
}
data_packet
receive_data()
{
std::call_once(connection_init_flag,&X::open_connection, this);
return connection.receive_data();
}
};
2 `static local 变量` 初始化 时 race condition
(1) static 局部变量 初始化
[1] 可能 真正在多个线程上 进行(C++11 前编译器) => problematic race condition
[2] 只能 真正在1个线程上 进行 (C++11 compiler 解决)
=> race condition 只是说 `哪个线程 去 初始化`
(2) 应用
`单例`
需要 `单个 全局 instance` 时
实现 `像 std::call_once 一样` 的 功能
// 单例
class A;
A& get_my_class_instance()
{
static A a;
return a;
}
3.2 很少更新 的 数据结构 的 保护
boost 库 boost::shared_mutex
第4章 同步并发操作 cv & future & async & packaged_task & promise
image.png image.png image.png image.png image.png 多线程 2个要点
[1] 保护 `共享数据`
第 3 章
[2] 线程 `同步`
线程 需 `等待` 另一线程 完成某任务后, 再去完成 自己的任务
实现
cv & futures
|
| cv: 类 std::condition_variable
|/
条件变量/condition variables
#1 循环等待 事件/条件: cv
##1.1 最简单的 方法
(1) code 结构 // 等待线程
|—— ——> `标志变量 flag` = true /false 表示条件 是/否满足
加锁 | - - - - -
| |
check 条件 是否满足 |
| => 周期性 ( 即 循环 )
否 | +
| 等待
解锁 - - - - - |\
|
sleep 固定时间 // Note: sleep 期间, other 线程 可获得锁 并 `modify 条件 使满足`
是
函数退出 => 自动解锁
(2) 问题
sleep 多久 难确定
太短: 浪费太多处理时间
太长: 事件早已发生, 等待线程还在 sleep
1] 丢帧 快节奏游戏 中
2] 过多消耗 时间片 实时应用 中
(3) 解决
cv
bool flag;
std::mutex m;
void wait_for_flag()
{
std::unique_lock<std::mutex> lk(m);
while(!flag)
{
lk.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100) );
lk.lock();
}
}
##1.2 cv class
1 `同步` 基础
可用于 同时 `block/阻塞 多个线程`
until 通知线程
1] 修改 a `shared variable`
|
| 用于表示
|/
条件
2] notifies cv
——————————————————————————————
通知线程 modify & notify cv
|
shared variable
|
等待线程 check & wait on cv
——————————————————————————————
2 2 个问题
(1) 问题1
精确调度事件
=> 可能 `notify called on destroyed cv`
解决1
notify under lock 可能必要
(2) 问题2
notify 发送时, 等待线程 没有 waiting on cv
=> `错过 notification`
原因
————————————————
shared variable
————————————————
1] `atomic`
————————————————
2] 未加锁
————————————————
Note
atomic copy delete
=>
不能用 cv 上 predicated wait 版本
即 不能为
cv.wait(lk, [] { return proceed; } ); // std::atomic<bool> proceed(false);
解决
1) 用 cv 上 non-predicated wait 版本
while ( !proceed ){ cv.wait(lk); }
2) shared variable 即使是 atomic
修改 也要 加锁
3 cv 类
[1] copy deleted
[2] notify_one & notify_all
[3] wait 2个重载
1] non-predicated 版本 (如) while ( !proceed ){ cv.wait(lk); }
非循环
`释放` lock
`阻塞` current thread & 加到 cv 上的 `等待线程 队列`
notified 或 spuriously wake up
`解阻塞`
`relock` + `wait exits`
2] predicated wait 版本 (如) cv.wait(lk, [] { return proceed; } );
循环 + 条件
|
|/
满足 = p() == true 时, 返回
————————————————————————————————————————————
与 non-predicated 版本
————————————————————————————————————————
区别
仅多了1步
relock 后, (循环) 检测 条件是否满足
————————————————————————————————————————
联系
视为 循环调 non-predicated 版本
————————————————————————————————————————————
template<typename Pred>
void std::condition_variable::wait(unique_lock<mutex>& uniLk, Pred pred)
{
while ( !pred() )
wait(uniLk); // 视为 non-predicated 版本
}
//等价于
while ( !pred() )
{
wait(uniLk);
}
4 等待线程
(1) notify_one
通知线程 + 多个等待线程
没必要 持有 the same mutex
否则
`悲观锁`
hurry up and wait
原因
awakened 后 可能无法 获得 mutex ( 被 通知线程 占着 )
|
| 解决
|/
pthreads 实现
1] `识别` 悲观锁场景
2] `notify 内` 把 等待线程
从 `cv 上的 队列` 转移到 `mutex 上的 队列`
而 `不 唤醒它`
(2) notify_all
`惊群现象`
事件发生
`唤醒所有` 所有等待线程
但 `只能有1个线程 获得事件处理权`
`其他线程 重新 陷入等待`
5 spurious wakeup / 假唤醒
notify_one/notify_all 之外的因素导致的
`wait 被唤醒, 但 条件不满足`
|
| 解决
|/
predicate wait 版本
[1] 只影响 non-predicated wait
[2] 不影响 predicated wait
6 应用
//------ App
threadsafe_queue<T> que;
void producer()
{
while( more_data_to_prepare() )
{
T const data = prepare_data();
que.push(data); // [1] 真正 push 前 加锁: que.push() 中 internal std::queue 真正 push 前
}
}
void consumer()
{
while(true)
{
T data;
que.wait_and_pop(data);
process(data); // [2] process 前 解锁: wait_and_pop() scope 结束 + 自动解锁
if( is_last_chunk(data) )
break;
}
}
// 记住这 1个 足够 => others
template<typename T>
std::shared_ptr<T>
threadsafe_queue::wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
// 捕获 this 目的: 可在 lambda 中用 当前类的 成员
cv.wait(lk,
[this]{ return !que.empty();} );
std::shared_ptr<T> res(
std::make_shared<T>( que.front() ) );
que.pop();
return res;
}
(1) List 4.1 等待 data 被处理: 用 std::condition_variable
std::mutex mut;
std::queue<T> que;
std::condition_variable cv;
void producer()
{
while(more_data_to_prepare())
{
T const data=prepare_data();
std::lock_guard<std::mutex> lk(mut);
que.push(data);
cv.notify_one();
}
}
void consumer()
{
while(true)
{
std::unique_lock<std::mutex> lk(mut);
cv.wait(
lk, []{ return !que.empty(); } );
T data = que.front();
que.pop();
lk.unlock(); // unlock mutex
process(data);
if( is_last_chunk(data) )
break;
}
}
|
| (2) 重构: 同步被限制在 queue 本身, 可 极大减少 同步问题 和 竞争条件
|
| 提取 thread-safe queue: using cv
|
| 借鉴 chapter3 thread-safe stack's thought:联合 front() and pop()
|/
//------ App
threadsafe_queue<T> que;
void producer()
{
while( more_data_to_prepare() )
{
T const data = prepare_data();
que.push(data); // [1] 真正 push 前 加锁: que.push() 中 internal std::queue 真正 push 前
}
}
void consumer()
{
while(true)
{
T data;
que.wait_and_pop(data);
process(data); // [2] process 前 解锁: wait_and_pop() scope 结束 + 自动解锁
if( is_last_chunk(data) )
break;
}
}
// ------ threadsafe_queue
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>
template<typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut;
std::queue<T> que;
std::condition_variable cv;
public:
threadsafe_queue(){}
threadsafe_queue(threadsafe_queue const& rhs)
{
std::lock_guard<std::mutex> lk(rhs.mut);
que = rhs.que;
}
void
push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
que.push(new_value);
cv.notify_one();
}
// 记住这 1个 足够 => others
std::shared_ptr<T>
wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
// 捕获 this
cv.wait(lk,
[this]{ return !que.empty();} );
std::shared_ptr<T> res(
std::make_shared<T>( que.front() ) );
que.pop();
return res;
}
void
wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
cv.wait(lk,
[this]{return !que.empty();});
value = que.front();
que.pop();
}
std::shared_ptr<T>
try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if(que.empty()) // dif
return std::shared_ptr<T>(); // empty std::shared_ptr
std::shared_ptr<T> res(
std::make_shared<T>(que.front() ) );
que.pop();
return res;
}
bool
try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut);
if( que.empty() )
return false;
value = que.front(); // dif
que.pop();
return true;
}
bool empty() const //7. const function
{
std::lock_guard<std::mutex> lk(mut);
return que.empty();
}
};
Since locking a mutex is a mutating operation,
the `mutex object must be` marked `mutable`
#2 等待 `1次性事件`: future
线程 以某种方式
获得 future -> 表示事件
场景: 等待航班
##2.0 std::future
`等待 被 异步设定 的 value`
template< class T > class future; (1)
template< class T > class future<T&>; (2)
template<> class future<void>; (3)
1 异步 机制
准备好
异步操作 - - - -> result + 修改 shared state
| /\ |
| 提供 / hold | link
|/ / |
std::future 对象 —— —— —— —— —— —— ——
| |\
| 给 | query / wait for / extract result: 阻塞, 直到 result 准备好
|/ |
creator
std::async / std::packaged_task / std::promise
future shared_future async packaged_task promise
除 async 为 为 function template 外, 其余为 class template
##2.1 std::async
`异步 runs a function` ( 可能 in a new thread)
返回 std::future
future 上调 get(), 当前线程 阻塞, 直到 future ready, 然后 return result
传参
与 std::thread 传参方式相同
##2.2 std::packaged_task
std::packaged_task<> obj 绑定 future to callable object
work.get_future() 返回 future 对象
##2.3 std::promise
1 针对2种场景
[1] work 不能被表达为 函数调用
[2] 结果 来自多个地方
2 机制
// 3大成员函数
[1] get_future()
返回 promised 结果 关联的 future
[2] set_value() : 主动线程
会 make state/future ready
[3] fut.get() : 被动线程
#include <iostream>
#include <numeric> // std::accumulate
#include <vector>
#include <thread>
#include <future>
void accumulate(std::vector<int>::iterator first,
std::vector<int>::iterator last,
std::promise<int> prom)
{
int sum = std::accumulate(first, last, 0);
prom.set_value(sum);
}
int main()
{
// 演示用 promise<int> 在线程间传递结果
std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
//1. 构建 promise obj
std::promise<int> prom;
//2. get_future()
std::future<int> futu = prom.get_future();
//3. 启动新线程, 线程间 move promise
std::thread work_thread(accumulate,
numbers.begin(),
numbers.end(),
std::move(prom) );
//4. future 上 get()
std::cout << "result = " << futu.get() << '\n';
work_thread.join();
}
##2.4 async & packaged_task & promise 机制 & 应用
1 async
//List4.6 用 std::future 得到一个 异步任务的返回值
#include <future>
#include <iostream>
int find_the_answer_to_ltuae(); // long time
void do_other_stuff();
int main()
{
// std::async 可以增加 参数, 作为 异步任务 函数的 args
std::future<int> the_answer=std::async(find_the_answer_to_ltuae);
do_other_stuff();
std::cout<<"The answer is "<< the_answer.get() <<std::endl;
}
//List 4.7 std::async 中 传参数给 异步函数
#include <string>
#include <future>
//-------class1
struct X
{
void foo(int,std::string const&);
std::string bar(std::string const&);
};
X x;
//1. 通过 指针 p/&x 间接 提供对象:
// 调 p->foo(42, "hello"), p 是 &x
auto f1=std::async(&X::foo, &x, 42, "hello");
//2. 直接 提供 (左值)对象 x:
// 调 tmpx.bar("goodbye"), tmpx is a copy of x
auto f2=std::async(&X::bar, x, "goodbye");
//-------class2
struct Y
{
double operator()(double);
};
Y y;
//3. 调 tmpy(3.141), tmpy is move-constructed from Y()
// <=> tmpy is constructed from std::move( Y() )
//(1 )参数 Y(): 调 Y 的 ctor, 产生 右值(无名对象) tmp
//(2) 右值 Y() 被 wrap 成 std::move( Y() ),
// 作为 实参 调 Y 的 移动ctor, 产生 tmpy
//(3) 通过 std::async 调 operator(), tmpy(3.141)
auto f3=std::async( Y(), 3.141);
//4. std::ref (左值对象 y) 提供对象 & 调 y 的 operator()
// => 调 y(2.718)
auto f4=std::async(std::ref(y), 2.718);
//5. 调 baz(x)
// 异步 非 成员函数 & ref(类的对象)
X baz(X&);
std::async(baz, std::ref(x) );
//-------class3
class move_only
{
public:
move_only();
move_only(move_only&&)
move_only(move_only const&) = delete;
move_only& operator=(move_only&&);
move_only& operator=(move_only const&) = delete;
void operator()();
};
//6. 调 tmp() , tmp is constructed from std::move( move_only() )
// 与 3 同理
auto f5=std::async( move_only() );
(3)可用第1参数 `std::launch` 指定, `是让 std::async 起一个新线程; 还是 在 future 上等待时, 才让 任务运行`
//1) 函数调用被 延迟, 直到 在 future 调用 wait()/get()
std::launch::deferred
//2) 函数调用 必须运行在 自己的线程上, 即 起一个新线程
std::launch::async
// 就像调用
std::thread(std::forward<F>(f), std::forward<Args>(args)...)
//3) 由实现决定 <=> 默认选择
std::launch::deferred | std::launch::async
// 新线程中 运行
auto f6=std::async(std::launch::async, Y(), 1.2);
//2.1 future 上 wait() 或 get() 时 运行
auto f7=std::async(std::launch::deferred, baz, std::ref(x) );
//3. 实现选择
//(1)默认
auto f9=std::async(baz, std::ref(x) );
//(2)显式写出
auto f8=std::async(
std::launch::deferred | std::launch::async,
baz, std::ref(x) );
//2.2 唤起 延迟函数 / deferred function
// 此时才调 work f7 的 函数 baz(x)
f7.wait();
2 用 future 关联 任务 std::packaged_task
(1) std::packaged_task<> 机制
实参及实际返回类型 只要能正确地 隐式转换为
函数标签中的 参数类型 和 返回类型 即可
1) 创建 std::packaged_task<> obj ( work )
std::packaged_task<> ties a future to a function 或 callable object
2) work.get_future() 返回 future 对象, 存 work函数 返回值/抛出的异常
3) `唤起` std::packaged_task<> 对象时,
1> 以 相应的 `args 作实参 调该对象的 operator()`, 该 operator() 中再 `转调 work function 或 callable object 的 operator()`
2> makes the future ready
4) future对象 .get() 获得 返回值
(2) std::packaged_task< > 特化版 部分接口
//List 4.8 std::packaged_task< > 特化的 类(部分)定义
template<>
class packaged_task< std::string(std::vector<char>*, int) >
{
public:
template<typename Callable>
explicit packaged_task(Callable&& f);
std::future<std::string> get_future();
void operator()(std::vector<char>*,int);
};
=> `std::packaged_task 对象 是 callable 对象:` 可以
1)被 `wrap` 进 std::function 对象
2)作为 `线程函数` 传给 std::thread ctor
3)传给需要 callable object 的函数
4)直接唤起
3 std::packaged_task 3种用法
// std::packaged_task 3种用法
#include <iostream>
#include <cmath>
#include <thread>
#include <future>
#include <functional>
int f(int x, int y) { return std::pow(x,y); }
void work_lambda()
{
std::packaged_task<int(int,int)> work([](int a, int b) {
return std::pow(a, b);
});
std::future<int> result = work.get_future();
work(2, 9);
std::cout << "work_lambda:\t" << result.get() << '\n';
}
void work_bind()
{
std::packaged_task<int()> work(std::bind(f, 2, 11));
std::future<int> result = work.get_future();
work();
std::cout << "work_bind:\t" << result.get() << '\n';
}
void work_thread()
{
std::packaged_task<int(int,int)> work(f);
std::future<int> fut = work.get_future();
std::thread work_td(std::move(work), 2, 10);
work_td.join();
std::cout << "work_thread:\t" << fut.get() << '\n';
}
int main()
{
work_lambda();
work_bind();
work_thread();
}
4 2个应用
4.1 若 任务 可 划分` 为多个 self-contained `子任务`
-> 每个std::packaged_task<> instance `wrap` 1个子任务
-> 该 instance 被 `传进 任务调度器 或 线程池`
=> 好处: `调度器 只需要处理 各std::packaged_task<> instance`, 而不是 各函数
4.2 `线程间 传递works`
`GUI 框架: 用 指定的线程 更新 GUI`
方法1: 指定一个线程 更新GUI, 某线程想 更新GUI, 给它发消息即可
方法2:
共享/全局 std::packaged_task 任务队列: 放 gui work
std::deque<std::packaged_task<void()> > works;
全局 mutex:
1 何时 lock mutex?
access 共享数据(结构) 时, lock
2 何时 释放/unlock mutex
access 完, 让 std::lock_guard<std::mutex> 对象 作用域 结束, 自动释放
=> 代码中 锁的范围 就很好确定
`std::packaged_task 任务`:
//1. 创建 obj : wrap Func
std::packaged_task<void()> work(f);
//2. push work to deque : 用 std::move()
works.push_back(std::move(work));
//默认 ctor : 对象 用于接收 提取的 work
std::packaged_task<void()>work;
//3. 提取 work: 用 std::move()
work=std::move( works.front() );
//4. invoke work 函数: with 函数参数(本例无参)
//通过 调用 work 的 operator(), 转调 work 函数的 operator()
work()
gui 线程:
循环 : 直到 收到 gui关闭消息 时 跳出
轮询(polling for) 要处理的 GUI messages, 比如 用户 clicks
根据队列是否为空 来决定,
是从 work队列 front 返回 1个 gui work; 还是跳到下一循环
pop work -> 释放锁
执行 work
push 任务的线程:
用接收的 函数 创建1个 work
获得 fut = work.get_future()
push work到 任务队列
返回 fut
调度线程:
以 work 函数 启动 push任务的线程
放 gui 消息 到 GUI 消息队列
启动 gui 线程
从 push线程 返回的 future: 调 get 获取 work 函数 返回值;
或 丢掉 future
发gui 结束消息 给 gui 线程
// 合成完整的list4.9 用 std::packaged_task 在 GUI 线程上 运行代码
#include <deque>
#include <mutex>
#include <future>
#include <thread>
#include <utility>
std::mutex m;
std::deque<std::packaged_task<void()> > works;
bool gui_shutdown_message_received();
void get_and_process_gui_message();
//-----1 GUI 线程
void gui_thread()
{
while( !gui_shutdown_message_received() )
{
get_and_process_gui_message();
// local work: 接收 从 任务队列提取出的 任务
std::packaged_task<void()> work;
{
std::lock_guard<std::mutex> lk(m);
if( works.empty() )
continue;
// 从work队列中 提取 front 任务
work=std::move( works.front() );
works.pop_front();
}// 释放 锁 => 用 块作用域 {}
work(); // 运行任务
}
}
std::thread gui_bg_thread(gui_thread);
//-----2 post_work_for_gui_ 的线程
template<typename Func>
std::future<void> post_work_for_gui_thread(Func f)
{
std::packaged_task<void()> work(f);
std::future<void> res=work.get_future();
std::lock_guard<std::mutex> lk(m);
works.push_back( std::move(work) );
return res;
}
// -----3 调度线程
//(1)在 future 上等待 : 可 知道 任务是否已完成
//(2)丢掉 future
###2.6.3 promises
1 针对2种场景
[1] work 不能被表达为 函数调用
[2] 结果 来自多个地方
2 机制
// 3大成员函数
1 get_future()
返回 promised 结果 关联的 future
2 set_value() : 主动线程
会 make state/future ready
3 fut.get() : 被动线程
3 应用: `网络连接 数 n > 1`
`网络侧 起多线程, 单个线程上 处理 单连接:`
问题:连接数过多时, 消耗 OS 资源过多, 上下文切换过多(线程数超过硬件支持的并发数)
解决: `网络侧 起多线程, 单个线程上 处理 多连接 & 用 promise / future structure`
网络侧 循环处理 来自终端的 多连接 中 每个连接, 连接分2种
(1)上传数据的连接
1)取出 接收data
2)用 接收data的ID 获得 连接关联的 promise
3)promise 结果 : set_value为 data 的 payload/有效载荷
(2)下载数据的连接
1)数据发送队列 中 取出 发送data(与上传线程中 接收data 类型不同)
2)发送 data
3)data关联的 promise 结果 : set_value 为 true, 以告诉 对方线程 发送成功
//List 4.10 用 promises 处理 来自1个线程的 多个 connections
#include <future>
void process_connections(connection_set& connections)
{
//1 循环, 直到 处理完所有连接, 即 done() 返回 true
while( !done(connections) )
{
for(connection_iterator connection=connections.begin(),end=connections.end();
connection!=end;
++connection)
{
if( connection->has_incoming_data() )
{
data_packet data=connection->incoming();
std::promise<payload_type>& p=
connection->get_promise(data.id);
p.set_value(data.payload);
}
if( connection->has_outgoing_data() )
{
outgoing_packet data=
connection->top_of_outgoing_queue();
connection->send(data.payload);
data.promise.set_value(true);
}
}
}
}
###2.6.4 用 future 保存 exception
1 std::async 抛出异常 时, future 内部 存储 异常 & future become ready
-> fut.get() 会再次抛出异常`
double square_root(double x)
{
if(x<0)
{
throw std::out_of_range(“x<0”);
}
return sqrt(x);
}
std::future<double> f=std::async(square_root,-1);
double y=f.get();
2 用 promise 的 set_exception() 来 make future ready`
extern std::promise<double> some_promise;
try
{
some_promise.set_value(calculate_value());
}
catch(...)
{
some_promise.set_exception(std::current_exception()); // std::current_exception(): 恢复异常
}
###2.6.5 从多线程中 等待
1 多线程 access 1个 std::future/std::shared_future 对象时, `data race`
`对future, 并发 access 没有意义; 对shared_future, 用1个 锁来 保护 并发 access`
第1次调用get()之后, future 就没有值可以获取
-> 只有1个线程可以恢复future中的值
-> 并发 access 没有意义
std::promise<int> p;
std::future<int> f( p.get_future() );
assert(f.valid()); // future f is valid
std::shared_future<int> sf( std::move(f) );
assert(!f.valid()); // f 不再 valid
assert(sf.valid()); // sf 是 valid
// shared_future
std::promise<std::string> prom;
//Implicit transfer of ownership
// for rvalue of type std::future<std::string>
std::shared_future<std::string> sf( prom.get_future() );
std::future::`share(): creates a new std::shared_future and transfers ownership to it directly`
2 总结
`future 可以 关联 或 不关联 数据`
`一旦 事件发生, future 就不能被重置`
//(1)类模板: <> 内 参数为 关联的数据类型
//(2)不 关联数据
std:future<void> /std::shared_future<void>
##4.2 `消息传递` + `同步`
1 CSP 通信序列处理
若 `无 shared state, 则 每个线程 可以 完全独立, 是一个 状态机:
收到 1个消息
-> 更新 自身状态 & 发消息 给其他线程 & 处理 是基于初始状态
=> 有限状态机模型`
2 代码分成 3个独立线程
(1) 设备处理 线程
(2) ATM 逻辑 线程
(3) 与银行 communicate
仅 `通过 `传递消息`, 而不是 `共享数据`, `3个线程就可以 communicate—消息传递风格`
如: 当用户 插入卡 / 按下按键 时,
设备处理线程 给 逻辑线程 发一个消息;
而 逻辑线程 给 设备处理线程 发消息 提示取多少钱
// List 4.15 simple implementation of ATM logic class
struct card_inserted
{
std::string account;
};
class atm
{
messaging::receiver incoming;
messaging::sender bank;
messaging::sender interface_hardware;
// function pointer : void (*state)();
// => member function pointer: void (atm::*state)();
void (atm::*state)();
std::string account;
std::string pin;
void waiting_for_card() //1
{
//2 send “waiting insert card” message to interface
interface_hardware.send( display_enter_card() );
incoming.wait() //3
.handle<card_inserted>(
[&](card_inserted const& msg) //4
{
account = msg.account;
pin="";
// send "input PIN" message to interface
interface_hardware.send( display_enter_pin() );
// member function pointer state reassigned:
state = &atm::getting_pin;
}
);
}
void getting_pin();
public:
void run() //5
{
state = &atm::waiting_for_card; //6
try
{
for( ; ; )
{
//7. call function ( signature: void (), return void, no Args )
// by member function pointer atm::*state
// firstly call waiting_for_card() circularly, then call getting_pin()
(this->*state)();
}
}
catch(messaging::close_queue const&) { }
}
};
网友评论