美文网首页
第5章: 并发与实用功能

第5章: 并发与实用功能

作者: my_passion | 来源:发表于2022-06-20 23:05 被阅读0次

    5.0 建议

        1 用 资源句柄 管理资源
        2 用 unique_ptr 访问 多态类型的对象
        3 用 shared_ptr 访问 共享对象
        4 用 类型安全的机制 处理 并发
        5 最好避免 shared data
        6 从 并发执行任务, 而不是 thread 角度 思考
    

    5.1 资源管理

    1 unique_ptr 与 shared_ptr : <memory>

        资源: 符合 先获取后释放 的 东西
            内存/锁/套接字/线程句柄/文件句柄
            |
            |   不及时释放
            |/
         资源泄露
            |
            |
            |/
        标准库 组件 不会出现 资源泄露
            用 RAII 确保 `资源 依存于 其所属对象, 而 不会超过 其对象的 lifetime`
        
         
        (1) SP: unique_ptr vs. shared_ptr
            unique_ptr/shared_ptr 所有权 唯一/共享
                移动/copy 
                
            unique_ptr  
                可以把 自由存储 上的对象 传递给/出 函数
                
            shared_ptr
                1) copy 而非 移动
                2) 最后1个 shared_ptr 销毁时, 所管理的 资源 才被销毁
                3) 使对象的 lifetime 变得不那么容易掌控
                4) 没有指明 哪个 owner 有权读写 资源
                    数据竞争 / 数据混淆 依然存在
                    
    (2) `何时应该用 SP / ` vector 等 资源句柄(vector / thread)`
            
    答: 指针语义 / 对象集合
    

    5.2 并发

    1 task(任务) 和 thread(线程)

        (1) task
        
            可视为与 caller 并发执行的 function
                只需 传参 / 获取结果 / 保证 2个 task 不竞争使用 共享数据 
            
            形式: func / funcObj
            
        (2) std::thread 对象: <thread>
            task 作 实参
            
            线程间通信
            
            [1] 同步
            
                1] 调用-返回 
                    传参
                    返回结果
                    
                2] 共享数据 
                
                3] cv
                
            [2] 异步
    

    2 传参

    thread ctor: 可变参数模板
    

    3 返回结果

        (1) 通过参数: 不优雅
                
            默认 值传递 copy
            std::ref() 才引用传递
                    
            注意2种 handle 参数: 指针 / 容器
                值传递 -> copy 的是 handle, resource 不 copy  
    
        (2) 异步
        
        // eg: 传参 + 返回结果(通过参数)  
        void f(vector<double>& v, double* res); // 从 v 获取输入, 结果放 *res
    
        class F
        {
        private:
            vector<double>& v; // 输入 源
            duoble* res;       // 输出 目标
        public
            F(vector<double>& vv, double* p): v(vv), res(p){}
            void operator()(); 
        };
        
        duoble res1;
        double res2;
        thread t1 {f, vec1, &res1};
        thread t2 {F(vec2), &res2};
    

    4 shared data

        |
        | 访问 必须 
        |    
    (1) `同步`
            以确保 同一时刻 最多有1个 task 能访问
        |
        | 机制
        |/
        
     mutex / 锁: <mutex>
        
        thread 
            lock()/unlock() 操作 可 获取/释放 1个 mutex
        
        unique_lock: 对 mutex 的 RAII 
            
            若 other 线程 已获取 mutex
                当前线程 `等待/阻塞`, 直到 that 线程 完成对共享数据的访问
    
            
        mutex m; // 控制 共享数据 访问 的 mutex
        int sh;  // 共享数据
        
        void f()
        {
            unique_lock<mutex> lck {m}; // 获取 mutex
            sh++;                       // 处理 共享数据
        } // 隐式 释放 mutex
    
        | 共享对象 与 mutex 间 对应关系, 靠 程序员 去维护, 易出错
        |
        | 解决: 
        |/
                
        1)  mutex 与 shared dada 关联, 形成类, 访问 shared data 前 先 lock mutex  
            
            class Record
            {
            public:
                mutex rm;
                int sh;
            };
        
            同时 访问多个资源 来 执行1个操作
                |
                | 可能导致
                |/
              死锁
                    thread1 获取了 mutex1 然后 试图获取 mutex2, 而同时 thread2 已获取了 mutex2 然后 试图获取 mutex1
                    => 2 个任务 都无法继续执行了
                |
                |   解决
                |       同时获取多个锁
                |/
        
        2) 推迟加锁 defer_lock + 同时获取 全部锁 lock(lck1, lck2);
        
            void f()
            {
                // ...
                unique_lock<mutex> lck1 {m1, defer_lock};
                unique_lock<mutex> lck2 {m2, defer_lock};
                // ...
                lock(lck1, lck2);
                // ... 处理 共享数据 ...
            }                               // 隐式 释放所有 mutex    
    
    (2) cv
    
        condition_variable 机制
                
        thread1 等待 条件/事件(线程2完成某工作后)  发生
    
        // === 2个 thread 通过 queue 传递消息
        #include <condition_variable>
        class Message{
            // ...
        };
        
        queue<Message> mq;
        condition_variable cv;
        mutex mut;
        
        // 1)
        void producer()
        {
            while(true)
            {
                Message msg;
                // fill msg
                
                unique_lock<mutex> lck {mut}; // === lock 
                mq.push(msg);
                cv.notify_one();              // 通知
            }                                 // === unlock implicitly 
        }
        
        // 2)
        void consumer()
        {
            while(true)
            {
                unique_lock<mutex> lck {mut};
                while (cv.wait(lck) ); // 释放 lck 并 等待, 被唤醒后 重新获取 lck
                                        
                auto msg = mq.front(); // extract msg
                mq.pop();
                
                lck.unlock();          // 释放 lck: explicitly
                // 处理 msg
            }
        }
    

    5 任务通信

        在 task (抽象层), 而不是 线程和锁(底层) 上操作
        
        user 不用 care 锁 
        
        <future>
        
        (1) future + promise
        
            task 间 value/exception 传递
                2个 task 运行于 独立 线程
                
                step
                    [1] task1 建 promise, 对 promise 调 get_future 获取 promise 内部的 future
                    
                    [2] task1 将 promise 引用传递 给 task2
                    
                    [3] task2 中 对 promise 调 set_value(...)/set_exception(...): 
                            内部 将 value/exception 设给 promise 的 future
                    
                    [4] task1 对 future 调 get(), block, 直到 future 为 ready 态/抛出异常, get() 返回 value /exception
            
            另1种形式 
                [1] caller 建 promise, 对 promise 调 get_future() 获得 promise 内部的 future 
                
                [2] caller 将 promise 引用传到 task1
                
                [3] task1 中 对 promise   调 set_value()/set_exception()
                
                [4] caller 将 future 引用传到 task2 
                
                [5] task2 调 future.get() 获得 value 
                
        (2) packaged_task
            
            1) task2 的 return value 传给 task1   
                    |
            转化为 |  wrap/封装 task2 到 packaged_task
                    |  packaged_task 把 task 的 return value/异常 放入 packaged_task 内部的 future
                    |/
            task1 与 packaged_task 间 value/exception 传递 
                task1 对 packaged_task 调 get_future()
                task1 对 获得的 future 调 get()
            
            task1 与 packaged_task 运行于 独立 线程 
            
            2) 类模板
                    
                [1] 模板参数: task 类型
                        
                    ctor 的参数: task
                        
                [2] note
                    packaged_task 不能 copy 
                    为 packaged_task 启动线程时, 必须 move(packaged_task_obj)
                                                
        (3) async()
        
            不必再操心 `锁 和 线程`
            
            1) 思路
            
                类似 调用函数 的方式 启动任务
            
                `将 task 当作` 可以与 other task 并发执行 的 `函数` 来处理
            
            2) 机制: 2个分离
                
                [1] 分离 函数调用的 `调用部分` 和 `获取结果部分`
                
                [2] 分离 这2部分 与 任务的实际执行
        
            3) 优势
            
                简单 又不失其强大性
                    能满足广泛的需求
            
            4) 限制
            
                对 share data 且 需要锁机制 的 task 不要用 async()
                
                用 async() 内部到底启用 几个 thread, 由 async()实现 决定, user 不知道
            
            template< class Function, class... Args >
            std::future<typename std::result_of<
                typename std::decay<Function>::type(
                    typename std::decay<Args>::type...)
                        >::type>
                async( std::launch policy, Function&& f, Args&&... args );
                
                ArgsType 
                FunctionType
                FunctionReturnType
    
        // ===(1)
        #include <future>
        #include <thread>
        #include <chrono>
        #include <iostream>
    
        void f(std::promise<int>& pr)
        {
            std::this_thread::sleep_for(std::chrono::seconds(1));
            int res = 1;
            pr.set_value(res);
        }
    
        void g(std::future<int>& fut)
        {
    
            int v = fut.get();
            std::cout << v << std::endl;
        }
    
        void test()
        {
            std::promise<int> pr;
            std::future<int> fut = pr.get_future();
    
            std::thread t1(f, std::ref(pr));
            std::thread t2(g, std::ref(fut) );
            t1.join();
            t2.join();
        }
        int main()
        {
            test();
        }
    
        // ===
        void f(promise<X>& px) // promise 任务: 将 结果 放 promise 中
        {
            // ...
            try
            {
                X res;
                // ... 计算 1个 值, 保存在 res 中
                px.set_value(res);
            }
            catch(...)
            {
                // 将异常传给 future 的线程
                px.set_exception( current_exception() );
            }
        }
        
        void g(future<X>& fx) // future 任务: 从 future 获取 结果
        {
            // ...
            try
            {
                X v = fx.get(); // 如必要, 等待 值 准备好
                // ... 使用 v
            }
            catch(...)
            {
                // ...
            }
        }
    
        // === (2)
        #include <iostream>
        #include <thread>
        #include <future>
    
        int f(int x, int y) { return x+y; }
    
        // 1] 任务 类型: 任务 / 函数 signature
        using TaskType = int(int, int);
    
        void task_thread()
        {
           
            // 2] wrap task
            std::packaged_task<TaskType> pt(f);
    
            // 3] 请求 future
            std::future<int> fut = pt.get_future();
    
            // 4] 为 packaged_task0 / 1 启动线程
            std::thread t(std::move(pt), 1, 2);
            t.join();
    
            // 5] 获得 结果
            std::cout << fut.get() << '\n';
        }
    
        void task_lambda()
        {
            std::packaged_task<TaskType> pt(
                [](int a, int b) {return a + b; });
    
            std::future<int> fut = pt.get_future();
    
            pt(1, 2);
    
            std::cout << fut.get() << '\n';
        }
    
        void task_bind()
        {
            // Note: bind 哪个参数后, TaskType 中 哪个参数去掉  
            std::packaged_task<int()> pt( std::bind(f, 1, 2) );
    
            std::future<int> fut = pt.get_future();
    
            pt();
    
            std::cout << fut.get() << '\n';
        }
    
        int main()
        {
            task_thread();
            task_lambda();
            task_bind();
        }                           
        
        // === (3)
        #include <iostream>
        #include <future>
        #include <thread>
    
        int f(int x) 
        {
            x++;
            return x;
        }
    
        int main()
        {
            // std::launch::deferred 当执行到 fut.get() 时, 才开始 创建线程
            std::future<int> fut = std::async(std::launch::deferred, f, 1);
            std::cout << fut.get() << std::endl; 
        }
    
    promise-future.jpg

    5.3 小工具 组件

    1 类型函数

    `编译时求值` 的 函数, 它 接受 `类型实参` 或 返回 `类型结果`
        
    (1) iterator_traits 机制: <iterator>
    
        `容器类型` 
            |   template <typename C>
            |   using Iterator_type = typename C::iterator;     
            |
            |   迭代器类型 再用别名 封装1层
            |   // Note: Iterator_type<C> 前没有 typename, 编译器已知道 template <typename C> Iterator_type<C> 为类型     
            |   template <typename C>    
            |   using Iter = Iterator_type<C>; 
            |/
        `容器迭代器类型`
            |           
            |   template <typename Ite>     
            |   using Iterator_category = typename     
            |       std::iterator_traits<Ite>::iterator_category;    
            |/      
        迭代器 category 
            |   
            |   对象: 构建 `标签值`, 指示 迭代器 的 category 
            |       Iterator_category<Iter>{}    
            |/      
        标签分发
        
            形参: vector / list 的 迭代器 category 为 random_access_iterator_tag / forward_iterator_tag
            
                对 list 排序
                    list copy 到 vector -> vector 上 sort -> vector copy 回 list
                
            `类型魔法` 发生在 选择 helper 函数时
    
        // template< class RandomIt >
        // void sort(RandomIt first, RandomIt last);
    
        #include <iostream>
        #include <iterator> // std::bidirectional_iterator_tag / std::random_access_iterator_tag
        #include <vector>
        #include <list>
        #include <algorithm>    // copy, sort
        #include <type_traits>  // std::remove_cv
    
        template <typename C>
        using Iterator_type = typename C::iterator;
    
        template <typename Ite>
        using Iterator_category = typename std::iterator_traits<Ite>::iterator_category;
    
        // vector sort
        template <typename Iter>
        void sort_helper(Iter beg, Iter end, std::random_access_iterator_tag)
        {
            std::sort(beg, end);
        }
    
        // list sort
        template <typename Iter>
        void sort_helper(Iter beg, Iter end, std::bidirectional_iterator_tag)
        {
            //decltype(*beg) is int&: because *beg is not a identifier, but is a lvalue
            using U = typename std::remove_reference< decltype(*beg) >::type;
            // <=>
            //auto identifier = *beg;
            //using U = decltype(identifier);
            
            std::vector< U > v{ beg, end };     // list init(copy to) vector
            std::sort(v.begin(), v.end());      // sort vector
            std::copy(v.begin(), v.end(), beg); // vector copy to list 
        }
    
        template <typename C>
        void sort(C& c)
        {
            using Iter = Iterator_type<C>;
    
            auto beg = c.begin();
            sort_helper(beg, c.end(), Iterator_category<Iter>{});
        }
    
        int main()
        {
            std::vector<int> vec{ 1, 3, 2 };
            std::list<int> lst { 1, 2, 5, 4, 3 };
            sort(vec);
            sort(lst);
        }
                
    (2) 类型谓词 <type_traits>
        
        标准库 类型谓词 是 类型函数
        负责回答 关于类型的问题
        
        bool b1 = is_pod<int>();
        
        has_virtual_destructor
        is_base_of
    

    2 pair 和 tuple <utility>

        equal_range 返回 迭代器 pair
        
        pair 
            提供 = == <
            其 元素得 支持 这些运算
            
            希望从 接口函数 返回2个结果
                (结果本身, 结果好不好的标志位) -> pair
            
        make_pair()/make_tupe() 可创建 pair/tuple
        
            vector<string> vec;
            auto p = make_pair(vec.begin(), 2); // pair<vector<string>::iterator, int>
            auto p_first = p.first;
            auto p_second = p.second;
            
            auto t = make_tuple(string("hello"), 1, 1.2);
            string s = get<0>(t);
    

    相关文章

      网友评论

          本文标题:第5章: 并发与实用功能

          本文链接:https://www.haomeiwen.com/subject/liybmrtx.html