美文网首页
C++ concurrency in action: 1~4 章

C++ concurrency in action: 1~4 章

作者: my_passion | 来源:发表于2021-01-10 23:39 被阅读0次
        C++11 最重要的新特性: 多线程
    
            2 个好处
    
                [1] 可写 `平台无关的 多线程程序, 移植性 提高`
    
                [2] `并发` 提高性能
    

    1 概述

        1   并发
    
            (1) 含义
    
                `任务切换`: 能切出去, 也能切回来
    
                任务切得太快, 看起来像 同时执行多个任务 一样
    
            (2) 2 种方式
    
                [1] `多进程` 
    
                    `进程间通信: 信号 / 套接字 / 文件 / 管道`
    
                        `1个 进程 一套 地址空间`
    
                [2] `多线程`
    
                    `同一进程 中 所有线程 共享地址空间`
    
                        所有线程 访问到 大部分数据—全局变量仍然是全局的,
                            指针、对象的引用、数据可 在线程间传递
    
        2   并发 优势
    
            (1) 关注点分离 ( SOC )`
    
                code 分 2 part
    
                    “DVD播放”
    
                    “用户界面”
    
                [1] 单线程: 两者放一起, 以便调用
    
                [2] 多线程: 分隔关注点, 两者不用放一起
    
                    线程1:    处理“用户界面” 事件
    
                    线程2:    “DVD播放”
    
                    线程间 交互 / 关联
                        用户点击“暂停”
    
            (2) ·算法/数据 并行 -> 提高性能`
    
                [1] 数据相同 / 算法拆分
    
                [2] 计算相同 / 数据拆分
    
        3   C++11 多线程 特点
    
            (1) `线程感知` 的 `内存模型`
    
            (2) 直接支持 `原子操作`
    
                coder 无需了解 与平台相关的汇编指令
                
                    就可编写 高效/可移植 代码
    
                compiler 负责 `搞定具体平台`
    
            (3) `C++ 线程库 性能` 
                    
                    `接近 直接使用底层 API 时`
    
        4    多线程入门
    
            (1) 单线程
                #include<iostream>
                int main()
                {
                    std::cout<<"Hello World\n";
                }
    
            (2) 多线程
            
                #include <iostream>
                #include <thread>           //(1)
                void hello()                //(2)
                {
                    std::cout<<"Hello Concurrent World\n";
                }
                int main()
                {
                    std::thread t(hello);   //(3)
                    t.join();               //(4)
                }
    
                    1)  头文件 <thread>
    
                    2)  `线程 初始函数`: 新线程执行入口
    
                            [1] `主   线程: main()`
    
                            [2] `其他 线程: std::thread 对象 ctor 中 指定`
    
                    3) std:thread 对象创建 完成 -> 开始执行新线程
    
                    4) join(): 让 caller 等待 新线程 完成
    

    2 管理线程: thread/join/detach/RAAI/std::ref/std::bind/move ownership

    image.png
        线索
        
            1   启动线程, 等待它结束 or 放后台运行
            
            2   给 线程函数 传参
            
            3   transfer ownership of thread 
            
                    from current associated std::thread object to another
                    
            4   确定线程数, 识别特殊线程
        
    
        #1  基本 线程管理
    
            1.1 启动线程
    
                `线程 在 std::thread 对象创建 时 启动`
    
                1   thread 对象
        
                    (1) 一旦构造, 立即开始执行 `线程函数`
                                        
                    (2) 不可 copy, 可 move
                            
                            copy/赋值 deleted
                                
                                原因 
                                    线程对象 `copy/赋值` 可能 `弄丢已 join 的线程`
    
                    (3) 可能 `不 表示/关联 任何线程` => safe to destroy & not joinable
                
                            after 4场景之一
    
                            [1] 默认构造 
                    
                                    => 无 线程 ( 函数 ) 执行
    
                            [2] move from
                    
                                    => 线程 move 给 other thread 对象去管理
    
                            [3] detach
    
                                    => 线程 可能仍在执行
    
                            [4] join
                    
                                    => 线程 执行完成
                                        
                2   线程函数
                        
                    [1] 不与 caller 通信时, 若 抛出异常
                        
                        std::terminate 被调用
                            以终止线程函数
    
                    [2] return value 或 异常 可 传给 caller
                        
                        2种方式
                            ——————————————————————————
                            1] std::promise
                            
                            2] 共享变量 (可能需 同步)
                            ——————————————————————————
                                            
                3   std::thread ctor 的 args: 可调用对象 
    
                    (1) 入口点为 function
    
                        void f();
                        std::thread my_thread(f)
                
                    (2) 入口点为 `可调用对象`
    
                        可调用对象 copied into thread's storage
                            
                            并 `invoked` from there
    
                                原始可调用对象 可被立即销毁
            
                                    class F
                                    {
                                    public:
                                        void operator()() const { do_something(); }
                                    };
    
                                    F f;
                                    std::thread my_thread(f);
    
                        问题: `函数对象 临时无名对象 作 std::thread ctor's arg`
    
                            会被 C++ 编译器 解释为 `函数声明`
    
                                std::thread my_thread( F() );
                
                        3 种 解决
                            ————————————————————————————————————————————————————
                            [1] 函数对象对象 外再加一层小括号
                                
                                std::thread my_thread( (F() ) ); 
                            ————————————————————————————————————————————————————
                            [2] 花括号
                                
                                std::thread my_thread{ F() };
                            ————————————————————————————————————————————————————
                            [3] lambda 表达式 启动线程
                                
                                std::thread my_thread( 
                                        []{ do_something(); } 
                                    );
                            ————————————————————————————————————————————————————
                            
                4   join 还是 detach ?
    
                    (1) `必须用 + 何时用` 
                        
                        原因
                            
                            `std::thread 对象 销毁` 前线程 `没被 joined 或 detached`
    
                                线程对象 
                                    ——————————————————————————————————————
                                    [1] joinable 
                                    ——————————————————————————————————————
                                    [2] dtor 调 std::terminate() 结束程序
                                    ——————————————————————————————————————
                                    
                    (2) `detached 线程` 要在 `程序结束前 结束`
                        
                        原因
                        
                            main return 时, 正在执行的 detached 线程 
                            
                                ——————————————————————————————————————
                                [1] 被暂停
                                ——————————————————————————————————————
                                [2] 其 thread-local objects 销毁
                                ——————————————————————————————————————
                                
                    (3) 用 join 还是 detach ?
                    
                        ————————————————————————————————————————————
                        [1] join
                        ————————————————————————————————————————————
                        [2] 除非 你想 `更灵活`
                            
                                并用 `同步` 机制 去 `等待` 线程完成 
                                    
                                    此时 `detach`
                        ————————————————————————————————————————————
    
                5   std::terminate() in <exception>
    
                        被  C++ runtime 调用 
                        
                            ——————————————————————————————————————————————
                            3 种 case 
                                ——————————————————————————————————————————
                                [1] std::thread 初始函数 抛出异常
                                ——————————————————————————————————————————
                                [2] joinable 线程对象 `被销毁` 或 `被赋值`
                                ——————————————————————————————————————————
                                [3] `异常` 被抛出却未被捕获
                            ——————————————————————————————————————————————
                                
                6   lifetime  问题:   多(比单)线程 更易发生
    
                        `正在运行的 detached 子线程` 
                            
                            access `已被 destroyed 的 object` 
                            
                                =>  undefined behavior
                                
                            |
                            |   如 
                            |/
                            
                        caller 
                            ——————————————————————————————————————————————————
                            [1] local 变量 ptr/ref -> 传给 callee 线程 以 访问 
                            ——————————————————————————————————————————————————
                            [2] 已 return 
                            ——————————————————————————————————————————————————
                            [3] 相比 caller 线程, `callee 线程 lifetime 更长`
                            
                                => `ptr/ref 悬挂` ( dangling )
                                            |
                                            |/
                                    潜在的 访问 隐患
                            ——————————————————————————————————————————————————
                            |
                            |   解决 
                            |/
                                    
                        [1] 线程函数 `自包含`
                         +
                        [2] `copy data into thread`
                                    
                        struct func
                        {
                            int& i;
                            
                            func(int& i_) : i(i_) {}
    
                            void operator() ()
                            {
                                for (unsigned j=0 ; j<1000000 ; ++j)
                                {
                                    // 潜在的访问隐患: dangling reference
                                    do_something(i);        
                                }
                            }
                        };
    
                        void oops()
                        {
                            int local_state = 0;
                            func my_func(local_state);   
                            std::thread my_thread(my_func); 
                            my_thread.detach();             
                        }  
    
            1.2 `等待 线程完成`
    
                1   怎样 `等待 线程完成`
    
                    关联的 std::thread object 上调 join()
    
                2   join() 做了啥
                    
                    清理 所关联线程的 内存
                 
                        调 1 次 join 后
                            线程对象 
                                [1] 不再关联 (已完成的)线程 
                                [2] not joinable <=> joinable() == false
    
                3   等待过程中 控制线程
                        ——————————————————————
                        [1] 检查线程是否完成
                        
                            1] cv
                            
                            2] futures 机制
                        ——————————————————————  
                        [2] 等待特定时间
                        ——————————————————————
    
            1.3 异常下的 等待
    
                1   `call join() 的位置 要精挑细选`
    
                    (1) 问题
    
                        call join() 前 抛出异常
                            
                            join() 调用 被跳过
                            
                                `lifetime problem`
                     
    
                    (2) 解决 1
    
                        让 join() 不被跳过
                            
                            use try/catch, `catch 中 也调 join()`
    
                                缺点
                                    [1] try/catch 冗长
                                    [2] scope 易出错
                    
                        struct func; // defined in list2.1
                        void f()
                        {
                            int some_local_state=0;
                            func my_func(some_local_state);
                            std::thread t(my_func);
                            try
                            {
                                do_something_in_current_thread();
                            }
                            catch(...)
                            {
                                t.join(); // 1
                                throw;
                            }
                            t.join();     // 2
                        }
    
                    (3) 解决 2
    
                        RAII
    
                            本质含义
                            
                                `让 资源管理对象 own 资源, 在 析构时 释放资源
    
                                    => `资源的 lifetime 管理` 就和 `资源管理对象 lifetime 管理` 统一起来
    
                                        => 只要 `不泄漏 资源管理对象, 就能保证 不泄漏资源`
    
                                            至于是 `让 ctor 自己 创建资源`,
                                                
                                                `还是 把资源创建好 再交给 ctor 保管`,
                                                    
                                                    没有优劣之分
    
                                                        可用 `static Factory Method 创建资源对象`
                                                            `client 就不用管 里边怎么实现了`
                            实现 
                            
                                线程对象 
                                    |
                                    |   设 
                                    |/
                                保护类 thread_guard
    
                                    [1] explicit ctor: 左值引用 para = std::thread& t_
                                                                        |
                                                                        |   `线程对象` 作 arg 引用传递 构造 `线程保护对象`  
                                                                        |/
                                    [2] init. list  初始化         /       绑定 
                                                                        |
                                                                        |/
                                    [3] 左值引用 成员                  =  std::thread& t
    
    
                                        => `线程 的 ownership 没有转移`
    
                                            => `caller 可能 已对线程 调 join()` 
                                                    
                                                => [4] thread_guard `dtor 中 调 join() 前 必须 先判线程是否可 joinable`
    
                                                        =>  线程 `caller 函数 执行 }`
                                                            
                                                                即 ret 指令 `弹栈 逆序销毁` local 对象 时 
                                                                    
                                                                    先 `销毁 保护对象`
    
                                                                        `调 保护对象 dtor 中 可能 调 join()`
    
                                    [5] copy 和 赋值 delete
                                        
                                            因 `资源管理对象` 可能 `outlive 其管理的 `线程对象` 的 `scope`
    
                            //list2.3 用 RAII 等待线程完成
                            class thread_guard
                            {
                            private:
                                std::thread& t;                        // [3]
                            public:
                                explicit thread_guard(std::thread& t_) // [1]
                                    :t(t_){}                           // [2]
    
                                ~thread_guard()
                                {
                                    // [4] 先 test joinable(), 再 join()
                                    if( t.joinable() )
                                    {
                                        t.join();     
                                    }
                                }
    
                                // [5] copy 和 赋值 delete  
                                thread_guard(thread_guard const&)=delete; 
                                thread_guard& operator=(thread_guard const&)=delete;
                            };
    
                            struct func; // defined in list2.1
    
                            void f()
                            {
                                int state = 0;
    
                                func f(state);
    
                                std::thread t(f);
                                
                                // `新线程对象` 作 arg 引用传递 构造 `线程保护对象`  
                                thread_guard g(t); 
                                
                                do_something_in_current_thread();
                            } 
                        
                    (3) 解决 3
    
                        若 不必 等待线程结束
    
                            `detach & copy data into thread`
     
                                可避免 `异常安全` 问题 
    
                                    `detach` 会 `打破 线程 与 std::thread 对象` 间 `关联`
                                        
                                        只要 detach 线程 先于 main 函数 退出, 
                    
            1.4 后台 / background 运行线程
    
                1   detach 线程: std::thread instance上调 detach()`
    
                    `detached 线程` 特点
    
                    [1] 在后台(typically 长时间)运行
    
                        称 `守护线程`
                        
                        应用
                            1> 监控文件系统
                            
                            2> 实现发后即忘/fire and forget work: 
                                只管消息发送, 不管消息接收
    
                    [2] 无法 获得 关联它的 std::thread object 
    
                        => 无法直接 communicate with it
                                it can `no longer be joined 
                        => 无法等待它完成`
    
                    [3] `ownership 和 control 被传给 C++ runtime 库` 
                    
                        保证了 线程关联资源 
                            
                            在线程结束时 被正确回收
    
                2   application : word processor
                
                    `可一次编辑多个文档`
    
                    UI级和内部 处理的一种方法:
    
                    `1 个窗口 -> 1 个被编辑文档`
    
                    `edit_document (编辑文档) 作 线程函数` + 其中 通过 `与用户交互 获取 用户欲执行动作`
                
                    若为 打开新文档 
                        
                        `起 新线程 & 递归调 线程函数 edit_document` 
            
                    else
                        执行相应 操作
    
                    //List 2.4 Detach a thread to handle other document
    
                    //para: filename
                    void edit_document(std::string const& filename)
                    {
                        //1. open file
                        open_document_and_display_gui(filename);
                        
                        //2. Loop, until edit is done/finished
                        while( !done_editing() ) 
                        {
                            // 3. get user's input(cmd)     
                            user_command cmd=get_user_input();
                            
                            //(1) if user's cmd is open new file
                            if(cmd.type==open_new_document)
                            {
                                //(2) get filename from user
                                std::string const new_name=get_filename_from_user();
                            
                                //(3) start new thread to open: 
                                //the operations of new and current thread are the same.
                                //So, entry function reuse the caller function 
                                std::thread t(edit_document,new_name); 
    
                                //(4) detech new thread
                                t.detach();                            
                            }
                            else // 5. otherwise, process according to user's choise
                            {
                                process_user_input(cmd);
                            }
                        }
                    }
    
        #2  传参 给 线程函数
    
            2.1 thread ctor 接口
    
                    ——————————————————————————————————————————————————————————————————————————————————————————  
                    线程函数    |       thread ctor 的 paraList
                    ——————————————————————————————————————————————————————————————————————————————————————————
                    非成员函数  |    [1] callable obj    [2] 相应 `函数调用运算符` 的 para... 
                    ——————————————————————————————————————————————————————————————————————————————————————————
                    成员函数    |   [1] `成员函数指针`    [2] 相应 对象的指针  [3] 相应 `成员函数` 的 para... 
                    ——————————————————————————————————————————————————————————————————————————————————————————
                    
            2.2 内部机制
    
                1   `默认下` 
                    
                    std::thread Ctor 实参  
                    
                        [1] copied
                            
                            副本 在新线程 context 中 `隐式转换 为 相应参数类型`
                    
                        [2] 除非 用 `std::ref 等`
    
                            void f(std::string const& s);
                            std::thread t(f, "hello");
                    
                    如
                        实参 `字符串 字面量` 
                            
                            [0] 是 ptr:  char const*
                            
                            [1] copied
                            
                            [2] 副本 converted 为 std::string
                                
                            [3] 表面上 引用传递 
                                
                                实际是 ptr copy 作 arg 调 string Ctor
                                
                            问题 
                                
                                caller 函数 
                                
                                    在 thread ctor 的 `ptr/ref 型 arg` 的 
                                        
                                        `副本 转换为 相应参数前 退出`
                                            
                                            `所指对象销毁` 
                                                    
                                                `副本指针 悬挂`
                                                
                                                    => undefined behavior
                    
                            解决
                                [1] 先 强转 arg, 再 传给 thread ctor`
                                
                                [2] ptr 用 具有 `移动语义` 的 对象 管理
                                
                                    std::thread t(f, std::string(buffer) ); 
                    
                2   std::ref() / std::bind wrap 想 `引用传递` 的 `arg`
                                                    |
                                                    |   想 
                                                    |/
                                                更新数据 
                    [1] std::ref()
                    
                        void f(Data& data);
                        Data data;
                        std::thread t(f, std::ref(data) );
    
                    [2] std::bind   <functional>
    
                        std::thread ctor 与 std::bind 机制相同
    
                            X x;
                            std::thread t(&X::f, &x);   
    
            #3  转移 线程 ownership
                
                1   为什么要引出 move of std::thread
    
                        应对 2 种场景
    
                            线程 ownership 转交给  
                                ——————————————
                                [1] caller 
                                
                                [2] 其他 函数
                                ——————————————
                                
                    (1) `ownership moved out of a function`
    
                        `形似 pass by value, 实际却是 move 语义`
    
                            std::thread caller()
                            {
                                void callee();
                                return std::thread(callee);
                            }
    
                    (2) `ownership moved into a function`
    
                        void g(std::thread t);
    
                        void f()
                        {
                            void thread_func();             
                            g( std::thread(thread_func) ); 
                        }
                        
                2   move 机制 
                
                    —————————————————————————————————————————
                    [1] 隐含自动 move 
                            
                            左 或 右运算对象 至少一侧为 右值
                    —————————————————————————————————————————
                    [2] 显式 调 std::move()
                    —————————————————————————————————————————
                    
                2   thread_guard 与 scoped_thread
    
                        ————————————————————————————————————————————————————————————————————————————
                        是否转移线程 ownship  |   否                       |       是 
                        ————————————————————————————————————————————————————————————————————————————
                        成员                  |   左值引用 std::thread& t |   值  std::thread t
                        ————————————————————————————————————————————————————————————————————————————    
                        args 传递方式           |   左值引用 传递         |   值传递 + 移动语义 
                        ————————————————————————————————————————————————————————————————————————————
                        dtor 中 call join 前  |                           |
                            是否 check joinable | 是                       |       否 
                        ————————————————————————————————————————————————————————————————————————————
                        ctor 中              |                           |
                            是否 check joinable | 否: 啥也不干             |       是 
                        ————————————————————————————————————————————————————————————————————————————
                        
                        class scoped_thread
                        {
                        private:
                            std::thread t;
                        public:
                            explicit scoped_thread(std::thread t_)  // 隐式 move 
                                : t( std::move(t_) )                // 显式 move
                            {
                                if( !t.joinable() ) // Note
                                    throw std::logic_error(“No thread”);
                            }
                            ~scoped_thread()
                            {
                                t.join(); 
                            }
                            
                            scoped_thread(scoped_thread const&)=delete;
                            scoped_thread& operator=(scoped_thread const&)=delete;
                        };
    
                        struct F; 
    
                        void f()
                        {
                            int local_state;
                            scoped_thread st( std::thread( F(local_state) ) ); 
                            
                            do_something_in_current_thread();
                        }
    
            #4  runtime 时 选择 动态数量的线程 : 用 线程 groups 来 divide work
    
                std::thread::hardware_concurrency()
    
                    硬件支持的并发线程数 的 hint
                    
                    int hardware_threads = std::thread::hardware_concurrency();
                        
                    int n_threads = std::min(hardware_threads != 0 ? hardware_threads : 2);
                        
                    [1] 线程 vector 
                        
                        std::vector<std::thread> threads(n_threads);
                    
                    [2] 启动 n_threads 个子线程  
                        for i = 0 ~ n_threads-1
                            threads[i] = std::thread(
                                                f, block_start, block_end, std::ref(results[i] )
                                            );
                            
                    [3] wait 子线程 
                    
                        std::for_each(threads.begin(), threads.end(),
                            std::mem_fn(&std::thread::join) );      
                            
            #5  识别线程:目的 `把 data 或 behavior 与 线程 关联起来`
    
                1   线程 identifier 是 std::thread::id type
    
                    std::thread::id master_thread;
                    
                    void f()
                    {
                        if( std::this_thread::get_id() == master_thread )
                        {
                            do_master_thread_work();
                        }
                        
                        do_common_work();
                    }
            
                2   线程ID 可作 关联容器 中 `key`
    

    3 在线程间 共享 data

    image.png image.png
        2个目标 
            
            [1] 避免潜在问题 
            
            [2] 最大化收益
    
        `并发 好处`
    
            线程间 `共享数据` 容易、直接 
    
        1.1 线程间 共享 数据
    
            双向链表 delete node == update next + update prev
            
                只 update next 时, `invariant 被 打破了`
                
                    若此时另一线程 正 access this node & 没做保护处理
                    
                        => 该 `race condition` 会 引起 bug
    
            1   问题: race condition 竞争条件 
    
                `结果取决于 多个线程` 上 `操作` 执行的 `相对顺序` 
                    
                    多个线程 竞争地去执行 各自的 `操作`
                                                    |
                                                    |/
                                                  抢票
                
                data race 
                    
                    对象 并发修改 对象 
                        
                        => undefined behavior
    
            2   解决
            
                (1) `用 mutex wrap 数据结构`
                    
                        以保证 `只有 真正执行修改的那个线程 可看到 invariants 被打破的状态`
    
                (2) lock-free 编程
    
                        修改 数据结构的设计 及其 invariant
                        
                            `分离 变化` 成 `n 个 不可分割的 更小变化`
                            
                                每个小变化 保护 其 invariant
    
                (3) STM / software transactional(事务) memory
    
                        数据库更新
                        
                            update 数据结构 == process a transaction/事务
    
                                `store` series of `数据修改和读` to a `事务log` 
                                    
                                    commit it in a step
    
                                        `commit 失败, 事务重启`
    
        2.1 mutex 机制
                
            1   `mut`ually `ex`clusive 互斥
                
                保护 共享数据
                
                    access 前    lock    其关联的 `mutex` 
                    
                    access 后    unlock
    
            2   mutex 并不是 silver bullet 银弹
    
                更重要的是 
                
                    [1] `设计代码结构` 来 `保护数据`       2.2节 
                    
                    [2] `接口中内在地避免 race conditions`  2.3节
    
            3   mutex 问题
                
                    [1] 死锁 
                    
                    [2] 保护范围 要多大 
    
        2.2 mutexes 使用 
    
            1   mutex 创建/lock/unlock
    
                创建   
    
                上/解 锁
                
                    成员函数 lock()/unlock()
    
            2   对 mutex 不推荐 手工 调 lock()
                    
                    原因: mutex 是一种 资源 
                    
                        与 对 thread 不推荐手工调 join() 一样 
                            ———————————————————————————————————————————————
                                资源  |   资源管理类 / RAII 类 
                            ———————————————————————————————————————————————
                            thread 对象 | thread_guard 或 scoped_thread    
                            ———————————————————————————————————————————————
                            mutex  对象 | std::lock_guard
                            ———————————————————————————————————————————————
                            
                                //list3.1 Protect list with mutex
                                #include <list>
                                #include <mutex>
                                #include <algorithm>
    
                                std::list<int> lst;     //1 单个 全局变量
                                std::mutex mut;         //2 用 相应 全局 std::mutex 保护
    
                                void add_to_list(int val)
                                {
                                    std::lock_guard<std::mutex> guard(mut); 
                                    lst.push_back(val);
                                }
                                bool list_contains(int val) 
                                {
                                    std::lock_guard<std::mutex> guard(mut); 
                                    return std::find(lst.begin(),lst.end(),val) != lst.end();
                                }
    
                |
                |
                |   重构
                |
                |       [1] 函数              作 public  成员 
                |
                |       [2] 被保护数据 + mutex  作 private 成员
                |/          
        2.3 Structuring code 以 保护 共享数据
    
            1   问题
                
                `迷途( stray ) ptr/ref` 和 `后门`
            
                    若 成员函数 将 被保护数据 的 `ref/ptr 传出` lock 的 scope
                                                            |
                                                            |   如 
                                                            |/      
                                ——————————————————————————————————————————————————————————————————————————————————
                                [1] `隐晦地 传出` 
                                        
                                        成员函数 para 为 `( 用户策略 ) 函数对象` 
                                                                        |
                                                                        |   函数调用运算符 operator() 的 para 为 
                                                                        |/
                                                                ref / ptr 参数 
                                ——————————————————————————————————————————————————————————————————————————————————      
                                [2] 显而易见 的 传出 
                                
                                        return
                                ——————————————————————————————————————————————————————————————————————————————————
                                
                        => 留了 `后门 ( backdoor )`
    
                            => 任何能 访问 ptr/ref 的 code 可
                                
                                    `绕过(bypass) mutex 的 保护` access `被保护数据`
                                            |
                                            |/
                                        不用 lock mutex
                                        
                    (1) code 结构 
                        
                        ——————————————————————————————————————————————————————————
                        [1] ProtectedData + mutex  作 其 管理类(DataWrapper) 成员 
                        ——————————————————————————————————————————————————————————
                        [2] 成员函数 para 为 `函数对象`
                                
                                接受 `用户(恶意 malicious)策略函数`
                                                |
                                                |   [3]
                                                |/
                                    ——————————————————————————————————————————————
                                    1] `引用参数` => 可 `取 被保护数据 的 ptr`
                                    
                                    2] 绕过 mutex 保护 access 被保护数据
                        ——————————————————————————————————————————————————————————
                        
                    (2) 根因
                        
                            没有把 access 数据 的 所有代码段 标记为 `互斥` 
    
                                `漏掉了 保护 传出的 被保护数据 的 ref`
    
                    (3) 更深的 陷阱 ( pitfall )
                        
                        接口函数间 调用顺序( 接口设计 ) 导致 race conditions — 2.3节
                
                class Data  // [] ProtectedData        
                {
                    int a;
                    std::string b;
                public:
                    void mf();
                };
                
                class DataWrapper      
                {
                private:
                    Data data;      // [1] ProtectedData + mutex  作 其 管理类(DataWrapper) 成员 
                    std::mutex m;           
                public:     
                    // [2] 成员函数 para 为 `函数对象`
                    //      接受 `用户(恶意 malicious)策略函数`
                    template<typename F>
                    void processData(F f)   
                    {
                        std::lock_guard<std::mutex> l(m); 
                        f(data); 
                    }
                    // 略去 ctor
                };
    
                // === client 
                Data* gpd; 
    
                // [3] 用户(恶意 malicious)策略函数: 1] `引用参数` => 可 `取 被保护数据 的 ptr` 
                void maliciousFunc(Data& protected_data)  
                {
                    gpd = &protected_data; 
                    
                    //  2] 绕过 mutex 保护 access 被保护数据
                }
    
                DataWrapper x;
    
                void f()
                {
                    x.processData(maliciousFunc); 
                    gpd->mf();
                }
    
        2.4 在接口本身中 发现 ( Spotting ) race conditions
    
            1    双向 list 
                    
                    删除 1个 node
                        
                        线程安全
                            
                            阻止 前中后共 3个节点 的 并发 accesses
    
                问题 
    
                    单独保护 每个节点
                        仍有 race conditions
    
                解决 
                    
                    `单个 mutex` 保护 `整个 数据结构(list)`
    
            2   线程安全 的 栈
    
                    // List 3.3 std::stack 的 接口
                    template<typename T, typename Container = std::deque<T> >
                    class stack
                    {
                    public:
                        explicit stack(const Container&);
                        explicit stack(Container&& = Container() );
                        template <class Alloc> explicit stack(const Alloc&);
                        template <class Alloc> stack(const Container&, const Alloc&);
                        template <class Alloc> stack(Container&&, const Alloc&);
                        template <class Alloc> stack(stack&&, const Alloc&);
    
                        size_t size() const;
                        
                        // 
                        bool empty() const;
                        
                        T& top();
                        T const& top() const;
    
                        void pop();
                        
                        void push(T const&);
                        void push(T&&);
                        
                        void swap(stack&&);
                    };
    
                (1) 问题 
                
                        调用顺序 为 empty() -> top() -> pop() 时, `not 线程安全`
    
                    [1] 线程1 empty() top() 间 线程2 pop()
    
                            empty() 判非空 -> pop() 使栈 空 -> top()
    
                                原因: 接口设计导致
                            
                                    stack<int> s;
                                    if(!s.empty() ) 
                                    {
                                        int const value = s.top(); 
                                        s.pop(); 
                                        do_something(value);
                                    }
    
                    [2] 线程1 top() pop() 间 线程2 top()
    
                            `2个线程 本打算 分别 处理 顶值和次顶值, 实际 都 处理 顶值` 
    
                                次顶值 没处理 就被移除了
    
                (2) 解决 
                
                                ( 有 隐患 )
                                    |\
                                    |
                    [1] mutex + 联合调 top 和 pop
    
                            若 stack 对象 copy ctor 可能 抛出异常 
    
                                Herb Sutter 给出 solution
    
                    [2] pop() = std::stack 的 empty() + top() + pop()
                    
                        ————————————————————————————————————————————————
                        1]  `构造 栈元素类型 新值` + pass by reference
                        +
                        2]  赋值 给 构造的新值 = `引用 实参/形参`
                        ————————————————————————————————————————————————
                        
                        缺点 
                            
                            1]  构造 代价太高 for some type
                            
                            2]  popped value type 必须 `可 赋值`
                                
                                    许多 用户定义类型 不支持 assignment
            
                    [3] 用 copy ctor 或 move ctor 不抛出异常 的类型
    
                            C++11 的 `右值引用`
                                            
                                使 更多类型 move ctor 不会抛出异常, 虽然 copy ctor 会
    
                        缺点
                        
                            不通用
                                
                                用户自定义类型 
                                    1]  copy ctor 可能 抛出异常
                                    
                                    2]  没有 move ctor
            
                    [4] `return ptr` 指向 popped item
    
                            `ptr 可安全 copy, 不会 抛出异常`
    
                        缺点
                            
                            要内存管理, 简单类型时 开销大
                                
                                用 std::shared_ptr
    
                    [5] 解决 [2] + [3] 或 [4]
                            
                        接口简化
                            
                            例 [2] + [4]
                                5个操作 变 3个 好处
                                        empty   empty
                                        top
                                        pop     pop: 2 个 重载版本 [2] [4]
                                        push    push
                                        swap 
                                
                                    mutex 保护 std::stack 上 `完整 3 操作` empty() -> top() -> pop()
                        
                                        ——————————————————————————————————————————————————————————————————————————————————————————————————
                                                                  | [4]                                     |   [2]
                                        ——————————————————————————————————————————————————————————————————————————————————————————————————          
                                        函数原型                  | std::shared_ptr<T> pop()                |   void pop(T& value)
                                        ——————————————————————————————————————————————————————————————————————————————————————————————————      
                                        para                      | 空                                       |   模板参数 T `引用类型`   
                                        ——————————————————————————————————————————————————————————————————————————————————————————————————                          
                                        实现关键                  | 似 placement new + args 转发           |   1] pop 的 caller 负责 构造
                                            取出 top 元素         | pop() 负责 构造                         |   2] 赋值 给 构造的新值 = `引用 实参/形参`  Note: 只是 引用参数, 不是 取出/top 栈中 元素的引用
                                                怎么办 ?         | std::shared_ptr<T> const res(           |   value = stk.top(); // T 可 赋值                            
                                        用于`构造 栈元素类型 新值`|        std::make_shared<T>( stk.top() ) ); |   
                                        ——————————————————————————————————————————————————————————————————————————————————————————————————  
                                        记住 解决 [4] 易 => 解决 [2] 等 
                                        ——————————————————————————————————————————————————————————————————————————————————————————————————  
                        
                                    template< class T, class... Args >
                                        shared_ptr<T> make_shared( Args&&... args )             //  <memory>
                
                                            T 非 数组 
                                             |\
                                             |  Note
                                             |
                                        构造 T 型 object
                                            
                                            wrap 进 std::shared_ptr
    
                                                就像 
                                                    ::new (pv) T(std::forward<Args>(args)...)   // placement new + args 转发 
                                                            |
                                                            |   internal 
                                                            |/
                                                        internal void*
                                                        
                                                        
                        // 解决 [4]: 记住 解决 [4] => 解决 [2] 等 
                        template<typename T>
                        std::shared_ptr<T> threadsafe_stack::pop() 
                        {
                            std::lock_guard<std::mutex> lock(m);
                            
                            if( stk.empty() ) 
                                throw empty_stack(); 
                            
                            std::shared_ptr<T> const res( 
                                std::make_shared<T>( stk.top() ) ); 
                                
                            stk.pop(); 
                            
                            return res;
                        }
                                
                                
                            // 用 sloution [2] + [4] 实现 接口 无 race conditions
                            #include <exception>
                            #include <memory>
                            #include <mutex>
                            #include <stack>
    
                            struct empty_stack: std::exception
                            {
                                const char* what() const throw();
                            };
    
                            template<typename T>
                            class threadsafe_stack
                            {
                            private:
                                std::stack<T> stk;   // [1] internal std::stack
                                mutable std::mutex m;   
                            public:
                                threadsafe_stack(){}
                                
                                threadsafe_stack(const threadsafe_stack& rhs)
                                {
                                    // [2] copy ctor: under mutex
                                    std::lock_guard<std::mutex> lock(rhs.m);
                                    
                                    stk = rhs.stk; 
                                }
                                
                                // [3] Assignment deleted & have no swap()
                                threadsafe_stack& operator=(const threadsafe_stack&) = delete;
    
                                bool 
                                empty() const
                                {
                                    std::lock_guard<std::mutex> lock(m);
                                    return stk.empty();
                                }
    
                                // 解决 [4]: 记住 解决 [4] => 解决 [2] 等 
                                std::shared_ptr<T> 
                                pop() 
                                {
                                    std::lock_guard<std::mutex> lock(m);
                                    
                                    if( stk.empty() ) 
                                        throw empty_stack(); 
                                    
                                    std::shared_ptr<T> const res( 
                                        std::make_shared<T>( stk.top() ) ); 
                                        
                                    stk.pop(); 
                                    
                                    return res;
                                }
    
                                // 解决 [2] : two overloads all empty()->top()->pop()
                                void 
                                pop(T& value)
                                {
                                    std::lock_guard<std::mutex> lock(m);
                                    
                                    if(stk.empty() ) 
                                        throw empty_stack();
                                        
                                    value = stk.top(); // T 可 赋值 // dif
                                     
                                    stk.pop();
                                }
    
                                void push(T val)
                                {
                                    std::lock_guard<std::mutex> lock(m);
                                    stk.push(val);
                                }
                            };
    
            3   lock 粒度/范围
                    ————————————————————————————————————————————————————————————————————
                    [1] 大 + 1个 mutex        |   并发增益小
                    ————————————————————————————————————————————————————————————————————
                    [2] 小 + 1个 mutex        |   没有完全保护期望的操作 => race condition
                    ————————————————————————————————————————————————————————————————————
                    [3] 更小 + 多个 mutex   |   死锁: 两线程相互等待, 结果谁也不往前走
                    ————————————————————————————————————————————————————————————————————
                    
        2.5 死锁 解决
    
            1   死锁: 2个线程, 2个mutex
            
                2线程 都要 lock 2个 mutex 才能执行
                    2个线程 各锁 1个mutex 时, 都在 等对方锁住的 mutex 被释放
                        2个线程都在死等
                        
                        
                场景: 无锁 case 下的 死锁
    
                    `2 个 thread func` 在 对方 `线程对象` 上 调 join()
                            |
                            |                                   std::thread t1;
                            |/                                  std::thread t2(f2, std::ref(t1) );
                        para 为 `线程对象 ref`                   t1 = std::thread(f1, std::ref(t2) );
                                                                void f1(std::thread& t_) { t_.join(); }
                            死锁: 都等 对方结束             void f2(std::thread& t_) { /* 休眠 5秒  */ t_.join(); }
                        
        2.6 避免死锁 的方法
    
            思想 
                线程1 可能等 线程2 时, 线程2 就不要 等线程 1 了
    
                1   避免 嵌套锁
    
                2   当 持有1个锁 时, 避免调 `用户代码`
                                                |
                                                |/
                                        可能 获得 另一个 锁 
                                            
                                            => 可能会 嵌套锁
    
                3   每个线程 以 相同顺序 获得多个锁
                    
                    总在 mutex1 前 锁住 mutex2
                        
                    [1]   适用于: 不同 mutexes for 不同 purposes
                            |
                            |    [2] 问题:  不能 解决 的 死锁
                            |/
                        2个 mutex 保护 同 class 2个 instance
                                swap    
                            |
                            |   解决 
                            |/
                        std::lock() + std::lock_guard + std::adopt_lock 作 第 2 参数 
                            |                                   |
                            |                                   |/
                        1次 lock 多个 mutex                    告诉 std::lock_guard Ctor 
                                                                    
                                                                [1] 其 第1参数 mutex 已上锁
                                                                
                                                                [2] 构造时只需 接收 mutex 上 mutex 的 ownership
    
                            |   [3] 问题: 不能 解决 的 死锁
                            | 
                            |       多个锁 被 `分开 获得`
                            |/
                            
                                    //List 3.6 swap 操作中用 std::lock() + std::lock_guard + std::adopt_lock
                                    class A;
    
                                    void swap(A& lhs, A& rhs);
    
                                    class X
                                    {
                                    private:
                                        A a;
                                        std::mutex m;
                                    public:
                                        X(A const& sd):a(sd){}
                                        friend void swap(X& lhs, X& rhs)
                                        {
                                            if(&lhs==&rhs) 
                                                return; 
                                                
                                            std::lock(lhs.m,rhs.m); //一次锁住2个mutex
                                            std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock); 
                                            std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock); 
                                            
                                            swap(lhs.a, rhs.a);
                                        }
                                    };
                                    
                    
                4   锁层次
    
                    [1] application 分 多层
    
                    [2] 某层 已拥有 某 mutex 上的锁
                            
                            就 `不允许 上一层 lock 另一 mutex`
    
                        // List 3.7 用层次锁避免死锁
                        
                        1]  调 high_level_func()
                        
                                lock high_level_mutex 
                                -> 调 low_level_func(): 
                                lock low_level_mutex, 返回值作为high_level_stuff() 参数 
                                => 遵守层次锁规则
    
                        2]  lock other_mutex(低层次值)
                        
                            -> 调 high_level_func():
                            lock 高层次值的 mutex 
                            => 不 遵守层次锁规则, 
                            hierarchical_mutex 报告错误, runtime 时失败
                            
                        3]  用户自定义 mutex 类 hierarchical_mutex 
                                
                                实现 符合 mutex 概念的 
                                    
                                    3个成员函数: lock()      unlock()    try_lock()  
                                
                                        => 可用于 std::lock_guard
    
                                // 1. hierarchy value of 10000
                                hierarchical_mutex high_level_mutex(10000);
    
                                //2. 
                                hierarchical_mutex low_level_mutex(5000);
    
                                int do_low_level_stuff();
    
                                int low_level_func()
                                {
                                    std::lock_guard<hierarchical_mutex> lk(low_level_mutex); 
                                    return do_low_level_stuff();
                                }
                                void high_level_stuff(int some_param);
                                void high_level_func()
                                {
                                    std::lock_guard<hierarchical_mutex> lk(high_level_mutex); 
                                    high_level_stuff( low_level_func() ); 
                                }
    
                                // ---------------------
                                void thread_a()  // 遵守 层次锁 规则
                                {
                                    high_level_func();
                                }
    
                                // ---------------------
                                hierarchical_mutex other_mutex(100); 
                                void do_other_stuff();
                                void other_stuff()
                                {
                                    high_level_func(); 
                                    do_other_stuff();
                                }
    
                                void thread_b() // 不遵守 层次锁 规则
                                {
                                    std::lock_guard<hierarchical_mutex> lk(other_mutex); 
                                    other_stuff();
                                }
    
                                
                                //List 3.8 hierarchical mutex
    
                                class hierarchical_mutex
                                {
                                    // 4个成员变量
                                    //1. 内部标准 mutex, 被 委托 真正 调 lock() unlock() try_lock()
                                    std::mutex internal_mutex; 
                                        
                                    //2. 该 mutex 当前层次值
                                    // 1)ctor 中 初始化 为指定值
                                    // 2)比 本线程 层次值 小时, 检查通过
                                    // 3)更新时, 最后 赋值 给 本线程层次值
                                    unsigned long const hierarchy_value;
    
                                    //3.该mutex 先前层次值
                                    // 1)ctor 中 初始化 为0, 先不用
                                    // 2)更新时, 先被 赋值 为 本线程层次值
                                    // 3)unlock()中 用于 恢复 本线程层次值
                                    unsigned long previous_hierarchy_value; 
                                    
                                    /* 4. 本线程 层次值
                                    1) 1个线程 可能 lock 多个 层次mutex, 
                                    保存1个 本线程层次值, 
                                    以保证 `lock 的 层次 mutex 链`中,
                                    `各 层次 mutex 的 层次值 依次减小`; 否则, 抛出 逻辑错误 异常
                                    => 用 thread_local 
                                    2) 又因为 属于class 而是 instance 
                                    => 用 static
                                    */
                                    static thread_local unsigned long this_thread_hierarchy_value; 
                                    
                                    // 2 个 私有函数: 检查 & 更新
                                    void check_for_hierarchy_violation()
                                    {
                                        if(hierarchy_value >= this_thread_hierarchy_value)
                                        {
                                            throw std::logic_error(“mutex hierarchy violated”);
                                        }
                                    }
    
                                    void update_hierarchy_value()
                                    {
                                        // 1)记录 次新的 本线程层次值, 以 
                                        // `沿着 已 lock 的 各 层次 mutex`, 
                                        // `逐层恢复出 原来的 本线程层次值`
                                        previous_hierarchy_value = this_thread_hierarchy_value; 
    
                                        // 2)更新 最新的 本线程层次值, 
                                        // 以 lock 下一层上的 层次mutex       
                                        this_thread_hierarchy_value = hierarchy_value; 
                                    }
    
                                public: // 4 个 接口函数
                                    explicit hierarchical_mutex(unsigned long value):
                                        hierarchy_value(value), previous_hierarchy_value(0){}
                                        
                                    // 检查 层次锁->委托 标准 mutex lock-> 更新 线程层次值
                                    void lock() 
                                    {
                                        check_for_hierarchy_violation();
                                        internal_mutex.lock(); 
                                        update_hierarchy_value(); 
                                    }
    
                                    // 恢复 本线程 次新层次值 -> unlock
                                    void unlock() 
                                    {        
                                        this_thread_hierarchy_value = previous_hierarchy_value; 
                                        internal_mutex.unlock();
                                    }
    
                                    // 与 lock() 区别: mutex 的lock已被另一线程拥有时,返回false
                                    bool try_lock() 
                                    {
                                        check_for_hierarchy_violation();
    
                                        if( !internal_mutex.try_lock() ) 
                                            return false;
    
                                        update_hierarchy_value();
                                        return true;
                                    }
                                };
    
                                //本线程 层次值 初始化为 最大值
                                thread_local unsigned long
                                    hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);
    
        2.7 std::unique_lock 实现更灵活的 locking
    
            1   思想
    
                    松弛 invariants
    
                        并不总是拥有 mutex
    
                代价
                    内部 要存/更新 1个 flag 来标识是否拥有 mutex
                        
            2   std::defer_lock 作 第2参数, 告诉 std::unique_lock Ctor, 构造时 `mutex unlocked`
    
            3   std::unique_lock 对象
    
                2种方法 加锁
    
                    1) 其 上调用 lock()
    
                    2) 传给 std::lock() 
                            
                            因为 std::unique_lock 提供 3 个成员函数
                            
                                lock() try_lock() unlock() 
    
            4   std::lock_guard & std::unique_lock
    
                同 都可 
            
                    RAII 锁管理
    
                异
                    1) 成员函数
                    
                        前者  只有 ctor & dtor: ctor 中 加锁 + dtor 中 解锁
    
                        后者  还有 3 个成员函数 lock() try_lock() unlock()  => 更灵活
    
                    2) 是否可 `管理 mutex 的 lifetime` 
                            
                            否/是
    
                                =>前者 最好将 mutex 设为 global 变量
                
                swap 的  std::unique_lock 版本 
                    
                    std::unique_lock + std::defer_lock 作 第2参数 + std::lock(uniLkm1, uniLkm2); 
                                            |
                                            |/
                                        告诉 std::unique_lock Ctor, 构造时 `mutex unlocked`
                    
                    class A;
                    void swap(A& lhs,A& rhs);
    
                    class X
                    {
                    private:
                        A a;
                        std::mutex m;
                    public: 
                        X(A const& a_) : a(a_){}
                        
                        friend void swap(X& lhs, X& rhs)
                        {
                            if(&lhs == &rhs) 
                                return;
    
                            std::unique_lock<std::mutex> uniLkm1(lhs.m, std::defer_lock); 
                            std::unique_lock<std::mutex> uniLkm2(rhs.m, std::defer_lock); 
                            
                            std::lock(uniLkm1, uniLkm2); 
                            
                            swap(lhs.a,rhs.a);
                        }
                    };
    
        2.8 转移 mutex ownership between scope
    
            1   mutex 的 ownership 可在 std::unique_lock 间 move
    
                std::unique_lock: movable 但 not copyable
    
            2   std::unique_lock 3 种灵活应用
    
                (1) 函数 lock mutex, transfer 该 mutex 的 ownership 给其 caller 的 lock
                
                    std::unique_lock<std::mutex> 
                    get_lock()
                    {
                        extern std::mutex mut; // extern mutex
                        std::unique_lock<std::mutex> lk(mut);
                        prepare_data();
    
                         // 目的是右值 => implicitly move
                        return lk;
                    }
                    void processData()
                    {
                        //  源是右值 => implicitly move
                        std::unique_lock<std::mutex> lk(get_lock()); 
                        do_something();
                    }
    
                (2) lock 不直接返回, 而是作 gateway class 的 mem, 
                        
                        所有 access to data 都通过 gateway 类
    
                            gateway 
                            
                                [1] destory 时, releases lock
                                [2] movable
                        
                (3) std::unique_lock 可在 `销毁前 释放 其 管理的 mutex` 
                                            |
                                            |
                                            |/
                                        unlock() 中
                    => 提高性能
    
        2.9 合适粒度下 上锁
    
            `真正 访问 共享数据 时 再 lock` mutex 
                
                左侧 和 右侧 在不同时刻相等
    
                    (可能有场景需要得到这种信息)
                        两次 read 之间, 两侧值可能已经被改了
            
                void get_and_processData()
                {
                    // 加锁 -> 获取1块处理数据 -> proces 前 解锁  
                    // -> process -> process 后 再加锁 -> process 结果 写到result
                    std::unique_lock<std::mutex> my_lock(the_mutex);
                    some_class data_to_process = get_next_T();
                    my_lock.unlock(); 
                    result_type result=process(data_to_process);
                    my_lock.lock();
                    write_result(data_to_process,result);
                }
    
                //List3.10 Lock one mutex at a time 
                // in a comparison operator
                class Y
                {
                private:
                    int a;
                    mutable std::mutex m;
                    
                    int get_detail() const
                    {
                        std::lock_guard<std::mutex> lock_a(m); 
                        return a;
                    }
                public:
                    Y(int sd):a(sd){}
                    
                    friend bool operator==(Y const& lhs, Y const& rhs)
                    {
                        if(&lhs==&rhs)
                            return true;
                        int const lhs_value=lhs.get_detail();       
                        int const rhs_value=rhs.get_detail(); 
                        return lhs_value==rhs_value; 
                    }
                };
    
        3.1 `初始化期间` 保护 共享数据
    
            1   延迟 初始化 
            
                    共享数据 只在 `初始化 并发 access 阶段` 才用 mutex 保护 
                        |
                        |   如: 打开 数据库连接 
                        |/
                      构造代价                  
                        
                    (1) 单检测: `资源 (指针)` 是否 已初始化
                        
                        1)  思想
                        
                            资源 使用前检测
                                若 没初始化  
                                    先 初始化, 再 解锁, 再 使用
                                        |
                                        |/
                                        spr.reset(new R);
                        
                        2)  code 结构
                            —————————————————————————————————————————————————
                            [1] 加锁 => 所有线程 串行
                            [2] 检测
                            [3] 初始化
                            [4] 解锁
                            [5] 使用 资源指针
                            —————————————————————————————————————————————————   
                                加/解 锁 
                                    std::unique_lock + 其 成员函数 unlock()
                            —————————————————————————————————————————————————
                            
                        3)  问题
                        
                            第1次检测 前 加锁 => 检测处 `串行` => `并发性差`
                                    
                            |
                            |   (半)解决
                            |/
                     
                    (2) 双检测锁 ( Double-Checked Locking ) 
                        
                        1)  思想 
                            
                            `第2次检查 前 才加锁`   
                        
                        2) code 结构 
                            
                            —————————————————————————————————————————————————
                            [1] 第1次检测
                            [2] 加锁
                            [3] 第2次检测 
                            —————————————————————————————————————————————————   
                                加/解锁
                                    std::lock_guard + scope 结束 Dtor 解锁 
                            —————————————————————————————————————————————————   
                        3)  问题
                        
                                1条 new 语句 分解为 3 条底层语句
                                    ——————————-————
                                    [1] 分配 内存 
                                    [2] 构造 
                                    [3] 指针赋值 
                                    ————————————————
                                        compiler 可能重排执行顺序为 [1][3][2]
    
                                            线程 1 资源 `构造未完成 -> 资源 ptr 就 已赋值`
    
                                                线程 2 检测 资源 ptr 非空 
                                                
                                                    `访问 初始化未完成` 的 资源
                                                        
                                                        data race 型 race condition
                                                            
                                                            undefined behavior
    
                                
                        
                            // 单检测      
                            std::shared_ptr<R> spr; 
                            std::mutex mut;
                            void f()
                            {
                                std::unique_lock<std::mutex> lk(mut); 
                                if(!spr)
                                {   
                                    spr.reset(new R); 
                                }
                                lk.unlock(); 
                                spr->mf();
                            }
    
                            // 双检测 
                            void f() 
                            {
                                if(!spr)
                                {  
                                    std::lock_guard<std::mutex> lk(mut);  
                                    if(!spr) 
                                    {
                                        spr.reset(new R); 
                                    }
                                } 
                                spr->mf(); 
                            }   
                
                解决
                    
                    std::call_once(std::once_flag, 资源初始化函数, func_args)
    
                        [1] 多线程 `并发进入` std::call_once 函数 + `同步` 
                            
                                但 `只有1个线程 ( active 线程 ) 真正执行` 第 2 arg: (资源初始化) `函数`
                                    
                                    其他线程 ( passive/被动 线程 ) 进入 std::call_once 后 
                                        
                                        `等待` active 线程 完成初始化 + 返回 后, 再返回
                                            |
                                            |/
                                            同步 
                                                        
                        [2] Note    
                            
                            当前线程 从 std::call_once 返回时, 
                            
                                资源初始化 已完成, 但可能由 另一线程 完成
    
                        [3] std::call_once 参数           
                            
                            template< class Callable, class... Args >
                            void call_once( std::once_flag& flag, Callable&& f, Args&&... args ) // <mutex>             
                            
                            ——————————————————————————————————————————————————————
                            1] 第1参数     std::once_flag 
                            ——————————————————————————————————————————————————————
                            2] 其余参数 与 std::thread Ctor
                                ——————————————————————————————————————————————————  
                                1> 形式 同     `右值引用 args` 
                                ——————————————————————————————————————————————————  
                                2> 目的 不同    `避免 copy, 而不是 实现 move 语义`
                                
                                                    因为 不必转移到 另一线程 去执行
                            ——————————————————————————————————————————————————————      
    
                        [4] std::once_flag
                            
                                功能相当于 std::mutex
                                
                                    同 `不能 copy`
    
                        1) 作 namespace-scope
    
                            std::shared_ptr<R> pr;        // 资源指针
                            std::once_flag rFlag;         //std::once_flag
    
                            void initResource() { pr.reset(new R); }
                            void f()
                            {
                                std::call_once(rFlag, initResource); 
                                pr->mf();
                            }
    
                        2) 作 类的 (private)成员
                                    
                            class X
                            {
                            private:
                                connection_info connection_details;
                                connection_handle connection; // resource handle
                                std::once_flag connection_init_flag;
    
                                void 
                                open_connection()
                                {
                                    connection=connection_manager.open(connection_details);
                                }
    
                            public:
                                X(connection_info const& connection_details_):
                                    connection_details(connection_details_) {}
    
                                void 
                                send_data(data_packet const& data)
                                {
                                    std::call_once(connection_init_flag, &X::open_connection, this); 
                                    connection.send_data(data);
                                }
    
                                data_packet 
                                receive_data()
                                {
                                    std::call_once(connection_init_flag,&X::open_connection, this);
                                    return connection.receive_data();
                                }
                            };
    
            2   `static local 变量` 初始化 时 race condition
    
                (1) static 局部变量 初始化
    
                    [1] 可能 真正在多个线程上 进行(C++11 前编译器) => problematic race condition
    
                    [2] 只能 真正在1个线程上 进行 (C++11 compiler 解决)
    
                        => race condition 只是说 `哪个线程 去 初始化`
    
                (2) 应用 
    
                    `单例` 
                    
                        需要 `单个 全局 instance` 时
                            
                            实现 `像 std::call_once 一样` 的 功能
    
                            // 单例
                            class A;
    
                            A& get_my_class_instance()
                            {
                                static A a; 
                                return a;
                            }
    
        3.2 很少更新 的 数据结构 的 保护
    
                boost 库 boost::shared_mutex
    

    第4章 同步并发操作 cv & future & async & packaged_task & promise

    image.png image.png image.png image.png image.png
        多线程 2个要点
            
            [1] 保护 `共享数据`
                    
                    第 3 章 
                
            [2] 线程 `同步`
    
                线程 需 `等待` 另一线程 完成某任务后, 再去完成 自己的任务
    
                    实现
                    
                        cv & futures
                        |
                        |   cv: 类 std::condition_variable
                        |/
                        条件变量/condition variables 
    
    #1 循环等待 事件/条件: cv
    
        ##1.1   最简单的 方法 
        
            (1) code 结构 // 等待线程
    
                        |—— ——> `标志变量 flag` = true /false 表示条件 是/否满足
                加锁  |    - - - - -
                        |            |
                check 条件 是否满足  |
                                     |  => 周期性 ( 即 循环 )
                    否                |          +
                                     |          等待
                        解锁 - - - - -            |\
                                                |
                        sleep 固定时间 // Note: sleep 期间, other 线程 可获得锁 并 `modify 条件 使满足`
            
                    是
                        函数退出 => 自动解锁 
                        
            (2) 问题
            
                sleep 多久 难确定
    
                    太短: 浪费太多处理时间
    
                    太长: 事件早已发生, 等待线程还在 sleep
                                
                                1]  丢帧              快节奏游戏 中 
                                
                                2]  过多消耗 时间片        实时应用   中 
    
            (3) 解决
            
                cv
        
                    bool flag; 
                    std::mutex m;
    
                    void wait_for_flag() 
                    {
                        std::unique_lock<std::mutex> lk(m); 
                        
                        while(!flag)
                        {
                            lk.unlock(); 
                            std::this_thread::sleep_for(std::chrono::milliseconds(100) ); 
    
                            lk.lock();  
                        }
                    }
                
        ##1.2 cv class
    
                1   `同步` 基础
                
                    可用于 同时 `block/阻塞 多个线程`
                    
                        until 通知线程 
                        
                            1]  修改 a `shared variable` 
                                            |
                                            |   用于表示 
                                            |/
                                            条件 
                                                    
                            2]  notifies cv
    
                        ——————————————————————————————
                        通知线程 modify & notify cv
                                    |
                            shared variable
                                    |
                        等待线程 check  & wait on cv 
                        ——————————————————————————————
    
                2   2 个问题   
                    
                    (1) 问题1 
                    
                        精确调度事件
                        
                            => 可能 `notify called on destroyed cv`
                        
                        解决1
                            
                            notify under lock 可能必要
            
                    (2) 问题2 
                        
                        notify 发送时, 等待线程 没有 waiting on cv
                        
                            => `错过 notification`
    
                        原因 
                        
                            ————————————————
                            shared variable 
                            ————————————————
                                1] `atomic`     
                            ————————————————        
                                2] 未加锁 
                            ————————————————
                                Note 
                                
                                    atomic copy delete 
                                        => 
                                            不能用 cv 上 predicated wait 版本
                                
                                                即 不能为 
                                                    
                                                    cv.wait(lk, [] { return proceed; } ); //  std::atomic<bool> proceed(false);
                                                
                        解决 
                            
                            1) 用 cv 上 non-predicated wait 版本 
                                            
                                    while ( !proceed ){ cv.wait(lk); }
    
                            2) shared variable 即使是 atomic 
                                
                                修改 也要 加锁 
                
                3   cv 类 
                    
                    [1] copy deleted
    
                    [2] notify_one & notify_all
    
                    [3] wait 2个重载
                        
                        1] non-predicated 版本        (如) while ( !proceed ){ cv.wait(lk); }  
                            
                            非循环
                            
                                `释放` lock 
                                    
                                    `阻塞` current thread & 加到 cv 上的 `等待线程 队列`
                                        
                                        notified 或 spuriously wake up 
                
                                            `解阻塞` 
                                                
                                                `relock` + `wait exits`
                                    
                        2] predicated wait  版本      (如) cv.wait(lk, [] { return proceed; } );
                            
                            循环 + 条件
                                    |   
                                    |/
                                    满足 = p() == true 时, 返回 
                            
                                ————————————————————————————————————————————
                                与 non-predicated 版本  
                                    ————————————————————————————————————————    
                                    区别 
                                        
                                        仅多了1步
                                        
                                        relock 后, (循环) 检测 条件是否满足 
                                    ————————————————————————————————————————
                                    联系 
                                        
                                        视为 循环调 non-predicated 版本
                                ————————————————————————————————————————————        
                                        
                                    template<typename Pred>
                                    void std::condition_variable::wait(unique_lock<mutex>& uniLk, Pred pred)
                                    {
                                        while ( !pred() )
                                            wait(uniLk);    // 视为 non-predicated 版本
                                    }
    
                                    //等价于
                                    while ( !pred() ) 
                                    {
                                        wait(uniLk);
                                    } 
            
                4   等待线程
    
                    (1) notify_one
            
                        通知线程 + 多个等待线程 
                                
                            没必要 持有 the same mutex
    
                                否则
                                    
                                    `悲观锁` 
    
                                        hurry up and wait
                        
                                    原因
                                        
                                        awakened 后 可能无法 获得 mutex ( 被 通知线程 占着 )
                                |
                                |   解决 
                                |/
                            pthreads 实现  
                                
                                1]  `识别` 悲观锁场景 
                            
                                2]  `notify 内` 把 等待线程   
                                        
                                        从 `cv 上的 队列` 转移到 `mutex 上的 队列` 
                                            
                                            而 `不 唤醒它`
    
                    (2) notify_all
    
                        `惊群现象`
                            
                            事件发生
                                
                                `唤醒所有` 所有等待线程
                                
                                    但 `只能有1个线程 获得事件处理权`
                                    
                                        `其他线程 重新 陷入等待`          
                    
                5   spurious wakeup / 假唤醒 
                            
                        notify_one/notify_all 之外的因素导致的 
                            
                            `wait 被唤醒, 但 条件不满足`
                                
                                |
                                |   解决  
                                |/  
                             predicate wait 版本 
                 
                    [1] 只影响 non-predicated wait
    
    
                    [2] 不影响 predicated wait
                        
                6   应用
    
                    //------ App
                    threadsafe_queue<T> que; 
    
                    void producer()
                    {
                        while( more_data_to_prepare() )
                        {
                            T const data = prepare_data(); 
                            que.push(data);  // [1] 真正 push 前 加锁: que.push() 中 internal std::queue 真正 push 前  
                        }
                    }
    
                    void consumer()
                    {
                        while(true)
                        {
                            T data;
                            que.wait_and_pop(data);
                            process(data); // [2] process 前 解锁: wait_and_pop() scope 结束 + 自动解锁
                            if( is_last_chunk(data) )
                                break;
                        }
                    }
                        
                    // 记住这 1个 足够 => others 
                    template<typename T>
                    std::shared_ptr<T> 
                    threadsafe_queue::wait_and_pop()
                    {
                        std::unique_lock<std::mutex> lk(mut);
                        
                        // 捕获 this 目的: 可在 lambda 中用 当前类的 成员 
                        cv.wait(lk, 
                                    [this]{ return !que.empty();} );
                                    
                        std::shared_ptr<T> res( 
                            std::make_shared<T>( que.front() ) );
                            
                        que.pop();
                        
                        return res;
                    }
            
                    (1) List 4.1 等待 data 被处理: 用 std::condition_variable
    
                        std::mutex mut;
                        std::queue<T> que;               
                        std::condition_variable cv; 
    
                        void producer() 
                        {
                            while(more_data_to_prepare())
                            {
                                T const data=prepare_data(); 
                                
                                std::lock_guard<std::mutex> lk(mut); 
                                
                                que.push(data);              
                                cv.notify_one();             
                            }
                        }
    
                        void consumer()  
                        {   
                            while(true) 
                            {       
                                std::unique_lock<std::mutex> lk(mut); 
                                cv.wait(
                                    lk, []{ return !que.empty(); } ); 
                                    
                                T data = que.front(); 
                                que.pop();  
                                     
                                lk.unlock();  // unlock mutex                  
                                process(data);
                                
                                if( is_last_chunk(data) )
                                    break;
                            }
                        }
                            |
                            |   (2) 重构: 同步被限制在 queue 本身, 可 极大减少 同步问题 和 竞争条件 
                            |
                            |   提取  thread-safe queue: using cv
                            |   
                            |   借鉴 chapter3 thread-safe stack's thought:联合 front() and pop()
                            |/  
                    
                        //------ App
                        threadsafe_queue<T> que; 
    
                        void producer()
                        {
                            while( more_data_to_prepare() )
                            {
                                T const data = prepare_data(); 
                                que.push(data);  // [1] 真正 push 前 加锁: que.push() 中 internal std::queue 真正 push 前  
                            }
                        }
    
                        void consumer()
                        {
                            while(true)
                            {
                                T data;
                                que.wait_and_pop(data);
                                process(data); // [2] process 前 解锁: wait_and_pop() scope 结束 + 自动解锁
                                if( is_last_chunk(data) )
                                    break;
                            }
                        }
                        
                        // ------ threadsafe_queue
                        #include <queue>
                        #include <memory>
                        #include <mutex>
                        #include <condition_variable>
    
                        template<typename T>
                        class threadsafe_queue
                        {
                        private:
                            mutable std::mutex mut;   
                            std::queue<T> que; 
                            std::condition_variable cv;
                        public:
                            threadsafe_queue(){}
                            threadsafe_queue(threadsafe_queue const& rhs) 
                            {
                                std::lock_guard<std::mutex> lk(rhs.mut);
                                que = rhs.que;
                            }
                            
                            void 
                            push(T new_value)
                            {
                                std::lock_guard<std::mutex> lk(mut);
                                
                                que.push(new_value);
                                
                                cv.notify_one();
                            }
    
                            // 记住这 1个 足够 => others 
                            std::shared_ptr<T> 
                            wait_and_pop()
                            {
                                
                                std::unique_lock<std::mutex> lk(mut);
                                
                                // 捕获 this 
                                cv.wait(lk, 
                                            [this]{ return !que.empty();} );
                                            
                                std::shared_ptr<T> res( 
                                    std::make_shared<T>( que.front() ) );
                                    
                                que.pop();
                                
                                return res;
                            }
                            
                            void 
                            wait_and_pop(T& value)
                            {
                                std::unique_lock<std::mutex> lk(mut);
                                
                                cv.wait(lk, 
                                            [this]{return !que.empty();});
                                            
                                value = que.front(); 
                                
                                que.pop();
                            }
    
                            std::shared_ptr<T> 
                            try_pop()
                            {
                                std::lock_guard<std::mutex> lk(mut);
                                
                                if(que.empty())  // dif
                                    return std::shared_ptr<T>(); // empty std::shared_ptr
                                    
                                std::shared_ptr<T> res(
                                        std::make_shared<T>(que.front() ) );
                                        
                                que.pop();
                                
                                return res;
                            }
                            
                            bool 
                            try_pop(T& value)
                            {
                                std::lock_guard<std::mutex> lk(mut);
                                
                                if( que.empty() )
                                    return false;
                                    
                                value = que.front(); // dif 
                                
                                que.pop();
                                
                                return true;
                            }
    
                            bool empty() const //7. const function
                            {
                                std::lock_guard<std::mutex> lk(mut);
                                return que.empty();
                            }
                        };
    
                            Since locking a mutex is a mutating operation, 
                            the `mutex object must be` marked `mutable`
    
    #2 等待 `1次性事件`: future
    
        线程 以某种方式
            
            获得 future -> 表示事件
            
        场景: 等待航班
    
        ##2.0 std::future
    
            `等待 被 异步设定 的 value`
    
            template< class T > class future;       (1)
            template< class T > class future<T&>;   (2)
            template<>          class future<void>; (3)
    
            1   异步 机制
                              准备好
                异步操作     - - - -> result  + 修改 shared state
                    |                 /\                 |
                    | 提供             /  hold             |  link
                    |/              /                    |
                    std::future 对象 —— —— —— —— —— —— ——  
                    |       |\
                    |  给   |    query / wait for / extract result: 阻塞, 直到 result 准备好 
                    |/      |
                     creator
                        std::async / std::packaged_task / std::promise
    
                future shared_future async packaged_task promise
    
                除 async 为 为 function template 外, 其余为 class template
    
        ##2.1 std::async
    
            `异步 runs a function` ( 可能 in a new thread)
                
                返回 std::future
                    
                     future 上调 get(), 当前线程 阻塞, 直到 future ready, 然后 return result
            
            传参
                与 std::thread 传参方式相同
            
            
        ##2.2 std::packaged_task
        
            std::packaged_task<> obj 绑定 future to callable object
    
            work.get_future() 返回 future 对象
            
        ##2.3 std::promise
            
            1   针对2种场景
    
            [1] work 不能被表达为 函数调用
            
            [2] 结果 来自多个地方
    
            2   机制
    
                // 3大成员函数
                
                [1] get_future()
                
                    返回 promised 结果 关联的 future
    
                [2] set_value() : 主动线程
                
                        会 make state/future ready
    
                [3] fut.get()   : 被动线程
    
    
            #include <iostream>
            #include <numeric> // std::accumulate
            #include <vector>
            #include <thread>
            #include <future>
    
            void accumulate(std::vector<int>::iterator first,
                std::vector<int>::iterator last,
                std::promise<int> prom)
            {
                int sum = std::accumulate(first, last, 0);
                prom.set_value(sum);
            }
    
            int main()
            {
                // 演示用 promise<int> 在线程间传递结果
                std::vector<int> numbers = { 1, 2, 3, 4, 5, 6 };
    
                //1. 构建 promise obj
                std::promise<int> prom;
    
                //2. get_future()
                std::future<int> futu = prom.get_future();
    
                //3. 启动新线程, 线程间 move promise
                std::thread work_thread(accumulate, 
                     numbers.begin(), 
                     numbers.end(),
                     std::move(prom) );
                
                //4. future 上 get()
                std::cout << "result = " << futu.get() << '\n';
                
                work_thread.join(); 
            }
    
        ##2.4 async & packaged_task & promise 机制 & 应用
    
        1   async
        
        
            //List4.6 用 std::future 得到一个 异步任务的返回值
    
            #include <future>
            #include <iostream>
    
            int find_the_answer_to_ltuae(); // long time 
            void do_other_stuff();
    
            int main()
            {
                // std::async 可以增加 参数, 作为 异步任务 函数的 args 
                std::future<int> the_answer=std::async(find_the_answer_to_ltuae); 
                do_other_stuff();
                std::cout<<"The answer is "<< the_answer.get() <<std::endl;
            }
    
            //List 4.7 std::async 中 传参数给 异步函数
    
            #include <string>
            #include <future>
    
            //-------class1
            struct X
            {
                void foo(int,std::string const&);
                std::string bar(std::string const&);
            };
            X x;
    
            //1. 通过 指针 p/&x 间接 提供对象:
            //   调 p->foo(42, "hello"), p 是 &x
            auto f1=std::async(&X::foo, &x, 42, "hello"); 
    
            //2. 直接 提供 (左值)对象 x: 
            //   调 tmpx.bar("goodbye"), tmpx is a copy of x
            auto f2=std::async(&X::bar, x, "goodbye");    
    
            //-------class2
            struct Y
            {
                double operator()(double);
            };
            Y y;
    
            //3. 调 tmpy(3.141), tmpy is move-constructed from Y()
            //   <=> tmpy is constructed from std::move( Y() )
            //(1 )参数 Y(): 调 Y 的 ctor, 产生 右值(无名对象) tmp
            //(2) 右值 Y() 被 wrap 成 std::move( Y() ), 
            //   作为 实参 调 Y 的 移动ctor, 产生 tmpy
            //(3) 通过 std::async 调 operator(), tmpy(3.141)
            auto f3=std::async( Y(), 3.141);    
                 
            //4. std::ref (左值对象 y) 提供对象 & 调 y 的 operator() 
            //   => 调 y(2.718)
            auto f4=std::async(std::ref(y), 2.718);
    
            //5. 调 baz(x)
            // 异步 非 成员函数 & ref(类的对象)
            X baz(X&); 
            std::async(baz, std::ref(x) ); 
    
            //-------class3
            class move_only
            {
            public:
                move_only();
                move_only(move_only&&)
                move_only(move_only const&) = delete;
    
                move_only& operator=(move_only&&);
                move_only& operator=(move_only const&) = delete;
    
                void operator()();
            };
    
            //6. 调 tmp() , tmp is constructed from std::move( move_only() )
            // 与 3 同理
            auto f5=std::async( move_only() ); 
    
    
            (3)可用第1参数 `std::launch` 指定, `是让 std::async 起一个新线程; 还是 在 future 上等待时, 才让 任务运行`
    
            //1) 函数调用被 延迟, 直到 在 future 调用 wait()/get()
            std::launch::deferred
    
            //2) 函数调用 必须运行在 自己的线程上, 即 起一个新线程
            std::launch::async
    
            // 就像调用
            std::thread(std::forward<F>(f), std::forward<Args>(args)...)
    
            //3) 由实现决定 <=> 默认选择
            std::launch::deferred | std::launch::async
    
            // 新线程中 运行
            auto f6=std::async(std::launch::async, Y(), 1.2); 
    
            //2.1 future 上 wait() 或 get() 时 运行
            auto f7=std::async(std::launch::deferred, baz, std::ref(x) ); 
    
            //3. 实现选择
            //(1)默认
            auto f9=std::async(baz, std::ref(x) );     
            //(2)显式写出
            auto f8=std::async(                             
                            std::launch::deferred | std::launch::async,
                            baz, std::ref(x) );
    
    
            //2.2 唤起 延迟函数 / deferred function
            // 此时才调 work f7 的 函数 baz(x)
            f7.wait();                                       
    
        2   用 future 关联 任务 std::packaged_task
    
            (1) std::packaged_task<> 机制
    
            实参及实际返回类型 只要能正确地 隐式转换为 
            函数标签中的 参数类型 和 返回类型 即可
    
    
            1) 创建 std::packaged_task<> obj ( work )
    
            std::packaged_task<> ties a future to a function 或 callable object
    
            2) work.get_future() 返回 future 对象, 存 work函数 返回值/抛出的异常
    
            3) `唤起` std::packaged_task<> 对象时, 
    
                1> 以 相应的 `args 作实参 调该对象的 operator()`, 该 operator() 中再 `转调 work function 或 callable object 的 operator()`
    
                2> makes the future ready
    
            4) future对象 .get() 获得 返回值
    
            (2) std::packaged_task< > 特化版 部分接口
    
        //List 4.8 std::packaged_task< > 特化的  类(部分)定义
            template<>
            class packaged_task< std::string(std::vector<char>*, int) > 
            {
            public:
                template<typename Callable>
                explicit packaged_task(Callable&& f);
    
                std::future<std::string> get_future();
                void operator()(std::vector<char>*,int);
            };
            => `std::packaged_task 对象 是 callable 对象:` 可以
            1)被 `wrap` 进 std::function  对象
            2)作为 `线程函数` 传给 std::thread ctor
            3)传给需要 callable object 的函数
            4)直接唤起
    
        3   std::packaged_task 3种用法
            // std::packaged_task 3种用法
            #include <iostream>
            #include <cmath>
            #include <thread>
            #include <future>
            #include <functional>
             
            int f(int x, int y) { return std::pow(x,y); }
             
            void work_lambda()
            {
                std::packaged_task<int(int,int)> work([](int a, int b) {
                    return std::pow(a, b); 
                });
                std::future<int> result = work.get_future();
             
                work(2, 9);
             
                std::cout << "work_lambda:\t" << result.get() << '\n';
            }
             
            void work_bind()
            {
                std::packaged_task<int()> work(std::bind(f, 2, 11));
                std::future<int> result = work.get_future();
             
                work();
             
                std::cout << "work_bind:\t" << result.get() << '\n';
            }
             
            void work_thread()
            {
                std::packaged_task<int(int,int)> work(f);
                std::future<int> fut = work.get_future();
                std::thread work_td(std::move(work), 2, 10);
                work_td.join();
                std::cout << "work_thread:\t" << fut.get() << '\n';
            }
             
            int main()
            {
                work_lambda();
                work_bind();
                work_thread();
            }
    
        4   2个应用
    
            4.1 若 任务 可 划分` 为多个 self-contained `子任务`
    
                -> 每个std::packaged_task<> instance `wrap` 1个子任务
    
                -> 该 instance 被 `传进 任务调度器 或 线程池` 
    
                => 好处: `调度器 只需要处理 各std::packaged_task<> instance`, 而不是 各函数
    
            4.2 `线程间 传递works`
    
                    `GUI 框架: 用 指定的线程 更新 GUI`
    
                    方法1: 指定一个线程 更新GUI, 某线程想 更新GUI, 给它发消息即可
    
                    方法2: 
    
                    共享/全局 std::packaged_task 任务队列: 放 gui work
                    std::deque<std::packaged_task<void()> > works; 
    
    
                    全局 mutex:
                    1   何时 lock mutex?
                    access 共享数据(结构) 时, lock
    
                    2   何时 释放/unlock mutex
                    access 完, 让 std::lock_guard<std::mutex> 对象 作用域 结束, 自动释放
    
                    => 代码中 锁的范围 就很好确定
    
                    `std::packaged_task 任务`:
                    //1. 创建 obj : wrap Func
                    std::packaged_task<void()> work(f);
    
                    //2. push work to deque : 用 std::move()
                    works.push_back(std::move(work));
    
                    //默认 ctor : 对象 用于接收 提取的 work
                    std::packaged_task<void()>work;
    
                    //3. 提取 work: 用 std::move()
                    work=std::move( works.front() ); 
    
                    //4. invoke work 函数: with 函数参数(本例无参)
                    //通过 调用 work 的 operator(),  转调 work 函数的 operator()
                    work()
    
                    gui 线程:
                    循环 : 直到 收到 gui关闭消息 时 跳出
                        轮询(polling for) 要处理的 GUI messages, 比如 用户 clicks
                        根据队列是否为空 来决定, 
                            是从 work队列 front  返回 1个 gui work; 还是跳到下一循环
                        pop work -> 释放锁
                        执行 work
                        
                    push 任务的线程:
                    用接收的 函数 创建1个 work
                    获得 fut = work.get_future()
                    push work到 任务队列
                    返回 fut
    
                    调度线程:
                    以 work 函数 启动 push任务的线程
    
                    放 gui 消息 到 GUI 消息队列
                    启动 gui 线程
    
                    从 push线程 返回的 future: 调 get 获取 work 函数 返回值; 
                        或  丢掉 future
    
                    发gui 结束消息 给 gui 线程
    
                    // 合成完整的list4.9 用 std::packaged_task 在 GUI 线程上 运行代码
                    #include <deque>
                    #include <mutex>
                    #include <future>
                    #include <thread>
                    #include <utility>
    
                    std::mutex m;
                    std::deque<std::packaged_task<void()> > works; 
    
                    bool gui_shutdown_message_received();
                    void get_and_process_gui_message();
    
                    //-----1 GUI 线程
                    void gui_thread()      
                    {
                        while( !gui_shutdown_message_received() )
                        {
                            get_and_process_gui_message();     
                            
                            // local work: 接收 从 任务队列提取出的 任务
                            std::packaged_task<void()> work; 
    
                            {
                                std::lock_guard<std::mutex> lk(m);
                                if( works.empty() )          
                                    continue;
                                    
                                // 从work队列中 提取 front 任务         
                                work=std::move( works.front() ); 
                                works.pop_front();       
                            }// 释放 锁 => 用 块作用域 {}
    
                            work();    //  运行任务
                        }
                    }
                    std::thread gui_bg_thread(gui_thread);
    
                    //-----2 post_work_for_gui_ 的线程
                    template<typename Func>
                    std::future<void> post_work_for_gui_thread(Func f)
                    {
                        std::packaged_task<void()> work(f);    
                        std::future<void> res=work.get_future();
    
                        std::lock_guard<std::mutex> lk(m);
    
                        works.push_back( std::move(work) );  
                        return res;                    
                    }
    
                    // -----3 调度线程
                    //(1)在 future 上等待 : 可 知道 任务是否已完成
                    //(2)丢掉 future
    
    
        ###2.6.3 promises
    
        1   针对2种场景
    
            [1] work 不能被表达为 函数调用
            
            [2] 结果 来自多个地方
    
        2   机制
    
            // 3大成员函数
            
            1   get_future()
            
                返回 promised 结果 关联的 future
    
            2   set_value() : 主动线程
            
                    会 make state/future ready
    
            3   fut.get()   : 被动线程
    
    
        3   应用: `网络连接 数 n > 1`
    
            `网络侧 起多线程, 单个线程上 处理 单连接:` 
    
            问题:连接数过多时, 消耗 OS 资源过多, 上下文切换过多(线程数超过硬件支持的并发数)
    
            解决: `网络侧 起多线程, 单个线程上 处理 多连接 & 用 promise / future structure`
    
    
            网络侧 循环处理 来自终端的 多连接 中 每个连接, 连接分2种
    
            (1)上传数据的连接
                1)取出 接收data
                2)用 接收data的ID 获得 连接关联的 promise
                3)promise 结果 : set_value为 data 的 payload/有效载荷
    
            (2)下载数据的连接
                1)数据发送队列 中 取出 发送data(与上传线程中 接收data 类型不同)
                2)发送 data
                3)data关联的 promise 结果 : set_value 为 true, 以告诉  对方线程 发送成功
    
            //List 4.10 用 promises 处理 来自1个线程的 多个 connections
            #include <future>
            void process_connections(connection_set& connections)
            {
                //1 循环, 直到 处理完所有连接, 即 done() 返回 true
                while( !done(connections) ) 
                {
                    for(connection_iterator connection=connections.begin(),end=connections.end();
                        connection!=end;
                        ++connection)
                    {
                        if( connection->has_incoming_data() )    
                        {
                            data_packet data=connection->incoming(); 
                            std::promise<payload_type>& p=
                                connection->get_promise(data.id); 
                            p.set_value(data.payload);            
                        }
                        
                        if( connection->has_outgoing_data() )   
                        {
                            outgoing_packet data=                
                                connection->top_of_outgoing_queue();
    
                            connection->send(data.payload);      
                            data.promise.set_value(true);  
                        }
                    }
                }
            }
    
        ###2.6.4  用 future 保存 exception
    
            1   std::async 抛出异常 时, future 内部 存储 异常 & future become ready
            -> fut.get() 会再次抛出异常`
    
                double square_root(double x)
                {
                    if(x<0)
                    {
                        throw std::out_of_range(“x<0”);
                    }
                    return sqrt(x);
                }
                
                std::future<double> f=std::async(square_root,-1);
    
                double y=f.get();
    
            2   用 promise 的 set_exception() 来 make future ready`
    
                extern std::promise<double> some_promise;
    
                try
                {
                    some_promise.set_value(calculate_value());
                }
                catch(...)
                {
                    some_promise.set_exception(std::current_exception()); // std::current_exception(): 恢复异常
                }
    
        ###2.6.5 从多线程中 等待 
    
            1   多线程 access 1个 std::future/std::shared_future 对象时, `data race` 
    
                `对future, 并发 access 没有意义; 对shared_future, 用1个 锁来 保护 并发 access`
    
    
                第1次调用get()之后, future 就没有值可以获取
                -> 只有1个线程可以恢复future中的值
                -> 并发 access 没有意义
                
                std::promise<int> p;
                std::future<int> f( p.get_future() );
                assert(f.valid());  // future f is valid
    
                std::shared_future<int> sf( std::move(f) );
                assert(!f.valid()); // f 不再 valid
                assert(sf.valid()); // sf 是 valid
    
                // shared_future
                std::promise<std::string> prom;
    
                //Implicit transfer of ownership 
                //  for rvalue of type std::future<std::string>
                std::shared_future<std::string> sf( prom.get_future() ); 
    
                std::future::`share(): creates a new std::shared_future and transfers ownership to it directly`
    
            2   总结
    
                `future 可以 关联 或 不关联 数据`
    
                `一旦 事件发生, future 就不能被重置`
    
                //(1)类模板: <> 内 参数为 关联的数据类型 
    
                //(2)不 关联数据
                std:future<void> /std::shared_future<void> 
    
        ##4.2 `消息传递` + `同步`
    
            1   CSP 通信序列处理
    
                若 `无 shared state, 则 每个线程 可以 完全独立, 是一个 状态机: 
    
                收到 1个消息 
                -> 更新 自身状态 & 发消息 给其他线程 & 处理 是基于初始状态 
                => 有限状态机模型`
    
            2   代码分成 3个独立线程
    
                (1) 设备处理 线程
                (2) ATM 逻辑 线程
                (3) 与银行 communicate
    
                仅 `通过 `传递消息`, 而不是 `共享数据`, `3个线程就可以 communicate—消息传递风格`
    
                如: 当用户 插入卡 / 按下按键 时,
                设备处理线程 给 逻辑线程 发一个消息; 
                而 逻辑线程 给 设备处理线程 发消息 提示取多少钱
    
                // List 4.15  simple implementation of ATM logic class
                struct card_inserted
                {
                    std::string account;
                };
    
                class atm
                {
                    messaging::receiver incoming;
                    messaging::sender bank;
                    messaging::sender interface_hardware;
                    
                    // function pointer : void (*state)(); 
                    // => member function pointer: void (atm::*state)(); 
                    void (atm::*state)(); 
                    
                    std::string account;
                    std::string pin;
                    
                    void waiting_for_card() //1
                    {
                        //2 send “waiting insert card” message to interface
                        interface_hardware.send( display_enter_card() ); 
                        
                        incoming.wait()     //3
                            .handle<card_inserted>(
                                [&](card_inserted const& msg) //4
                                {
                                    account = msg.account;
                                    pin="";
                                    
                                    // send "input PIN" message to interface
                                    interface_hardware.send( display_enter_pin() ); 
                                    
                                    // member function pointer state reassigned:  
                                    state = &atm::getting_pin; 
                                }
                            );
                    }
                    
                    void getting_pin(); 
                    
                public:
                    void run()    //5
                    {
                        state = &atm::waiting_for_card; //6
                        
                        try
                        {
                            for( ; ; )
                            {
                                //7. call function ( signature: void (), return void, no Args ) 
                                //   by member function pointer atm::*state 
                                //   firstly call waiting_for_card() circularly, then call getting_pin()
                                (this->*state)(); 
                            }
                        }
                        catch(messaging::close_queue const&) { }
                    }
                };
    

    相关文章

      网友评论

          本文标题:C++ concurrency in action: 1~4 章

          本文链接:https://www.haomeiwen.com/subject/cohlaktx.html