美文网首页
chapter5 C++11 多线程: 深入应用 C++11

chapter5 C++11 多线程: 深入应用 C++11

作者: my_passion | 来源:发表于2022-05-23 23:09 被阅读0次

    chapter5 C++11 多线程

        C++11 中 C++ 对 并发编程 提供了 语言级支持
            
            [1] 增加了 
                线程 
                线程相关的类
            
            [2] 提高了 
                多线程程序 的 可移植性
    
    5.1 线程
                
        1   线程创建 
            
            std::thread t( threadFunc / threadCallableObj, 
                           threadFunc / threadCallableObj 的 参数)
                           
                void func(int a, int b) {}
                std::thread t1(std::bind(func, 1, 2) );
                std::thread t1([] (int a, int b) { }, 1, 2 );
                
        2   线程创建 的 问题 
            
            应该保证 线程对象 lifetime 长于 线程函数 lifetime 
                
                否则, std::thread 对象 出 scope 后 会析构, 可能引入 lifetime 问题
            
            如何保证 ?
            
                [1] join 
                
                [2] detach 
                    
                [3] 将 线程对象 保存到 容器中, 保证其 lifetime 
                    
                    std::vector<std::thread> gVec;
                    std::vector<std::shared_ptr<std::thread> > gVec2;
                    
                    std::thread t(func);
                    gVec.push_back(std::move(t) );
                    gVec2.push_back(std::make_shared<std::thread>(func) );
        
        3   join
            
            [1]  target thread(函数) 将 运行于 线程对象 中
            
            [2]  block calling(current) thread, 
                
                    直到 target(newly created) thread (func) 执行完 
            
            [3] 返回值 将被 忽略, 
                    
                    若 线程函数有返回值
            
        4   detach
        
            [1] 分离 target thread 与 线程对象
                
                1] 让线程 在 后台运行
                2] detach 之后, 无法再与 target thread 联系 => 不能再 join 等 
                
            [2] calling/current thread 不会 block
        
        5   线程 可 move, 不可 copy
        
            std::thread t(func);
            std::thread t2(std::move(t) );
            t.join();
            t2.join();
            
        6   获取 线程ID 
            
            threadObj.get_id()
            
            std::this_thread::get_id()
            
        7   获取 CPU 提示支持的 并发数 
        
            std::thread::hardware_concurrency() 
            
        8   线程休眠 
            
            std::this_thread::sleep_for( std::chrono::seconds(3) );
            
        例 
        
        // 2-3 threadContainer.cpp 
        #include <iostream>
        #include <vector>
        #include <thread>
    
        void func()
        {
            std::cout << "hello " << std::this_thread::get_id() << " end" << std::endl;
        }
    
        std::vector<std::thread> gVec;
        std::vector<std::shared_ptr<std::thread> > gVec2;
    
        void createThread()
        {
            std::thread t(func);
            gVec.push_back(std::move(t) );
            gVec2.push_back(std::make_shared<std::thread>(func) );
        }
    
        int main()
        {
            createThread();
            
            for(auto& thread: gVec)
                thread.join();
                
            for(auto& thread: gVec2)
                thread->join();
        }
        
    5.2 mutex 
        
        1种 同步 原语 
        
        用于 protect 多线程 同时访问 的 shared data
        
        1
            ————————————————————————————————————————
            独占  std::mutex              
            
            递归  std::recursive_mutex 
            
                    std::timed_mutex 
            带超时
                    std::recursive_timed_mutex
            ————————————————————————————————————————        
        
        2   memFunc
        
            lock()
            
            unlock()
            
            try_lock(): non-block, 成功返回 true 
            
        3   应用 
            
            [1] lock() 与 unlock() 要成对 -> 异常下, 难保证 
                
                mutex 多做 global var, 与 protected shared data 放一起 
                
            [2] std::lock_guard
            
                RAII
        
        4   递归 mutex 
        
            [1] 允许 同一线程 多次获得 该 mutex 
                
                可用于 解决 同一线程 需多次获取 mutex 时的 
                    死锁问题 
                
                // eg
                shared data + mutex 
                    放 class 
                        1] memFunc1 / memFunc2 / memFunc3 均想 access shared data => 均要 lock mutex 
                        
                        2] memFunc3 调 memFunc1 和 memFunc2
                        
                        用 std::mutex => 死锁 
                        
                        用 std::recursive_mutex => 死锁解决 
                        
            [2] 尽量不要用 
                
                1] 用 的场合, 问题往往可以简化 
                
                2] 效率低: 比 非递归 mutex
                
                3] 可重复获得的 最大次数 未定义 => 超过 可能出错 
                
    5.3 cv
                    
        用于 wait 的 同步机制 
        
        可 block >=1 个 waiting thread, 直到 notifying thread 发出 notify, 才能唤醒 waiting thread 
        
        cv 要配合 mutex 
        
        2 种 cv 
        
            condition_variable + std::mutex 
        
            condition_variable_any + 任意带 lock/unlock 语义的 mutex
                灵活 但 效率低 
        
        同步队列 
            
            (1) 思路
                
                生产者 put()
                    
                    1) notFull() 时
                        
                        才能 put [2]
                        
                        put 后, 必然 notEmpty() ->         应该 notify[3] notEmptyCv 上 消费者 (去 take) 
                                                                    |                                                           
                    2) else                                         |
                                                                    |
                        notFullCv 上 wait [1]                    |
                            |\                                      |
                            |   由谁 notify ? 答: 消费者          |
                            |                                       |
                            |_ _ _ _ _ _                            |
                                        |                           |
                消费者 take()          |                           |
                                        |                           |
                    1) notEmpty() 时 |                           |
                                        | 应该 notify[3] notFullCv  |
                        才能 take [2] |     上 生产者 (去 put)    |
                                        |                           |
                        take 后, 必然 notFull()                        |
                            |                                       |
                            |                                       |
                            front()                                 |
                            + pop_front()                           |
                                                                    |
                    2) else                                         |
                                                                    |
                        notEmptyCv 上 wait [1] <- - - - - - - - - - - 
                        
            
            (2) 成员 变量 
                
                notFullCv
                notEmptyCv
                mutex 
                que: std::list 
                queMaxSize
                
            (3) std::lock_guard 与 std::unique_lock 比较 
                
                1) lock_guard : 只能在 lifetime 结束 才(必须 + 最终) unlock mutex 
                
                   unique_lock: 可 自由 unlock mutex 
                
                2) lock_guard + cv.wait() 的 微妙之处 
                    
                    语义上看似矛盾:
                        1] lock_guard 出 scope 要 unlock mutex 
                    
                        2] cv.wait() 在 wait 时 会 (提前) unlock mutex 
                        
                    实际没问题 
                        
                        wait 时 unlock mutex + waiting 
                        waiting thread 被 notify_one/notify_all 唤醒后, 又先 relock mutex
                            <=> lock_guard 的 mutex 在 释放(release) 之后 又 获得(acquire) 了 
                            
                3) 对 cv.wait() 中的 locker, 
                    
                    用 unique_lock 从语义上看更准确
                    
            (4) cv.wait() 2 种重载版本 
                
                // 版本 1: 更简洁
                std::unique_lock<std::mutex> lk(mut);   
                conditionSatisfiedCv <=> 
                    wokeUpSemanticCv.wait(lk,       
                                         [this] {return wokeUpSemanticFunc(); } ); // bool 函数
                                                        
                // 版本 2 
                std::unique_lock<std::mutex> lk(mut);   
                while(! wokeUpSemanticFunc() ) 
                    wokeUpSemanticCv.wait(lk);
                
                ——————————————————————————————————————————————          
                    语义          |   wokeUpSemanticCv    
                ——————————————————————————————————————————————
                wait 中          |   wokeUpSemanticFunc()
                ——————————————————————————————————————————————
                wait 外 while 中  |   ! wokeUpSemanticFunc()      
                ——————————————————————————————————————————————
                
                // eg
                std::unique_lock<std::mutex> lk(mut);   
                notFullCv.wait(lk, [this] {return notFull(); } ); 
                
                std::unique_lock<std::mutex> lk(mut);   
                while (! notFull() )
                    notFullCv.wait(lk);  
                    
                // SynQue.cpp
                #include <iostream>
                #include <thread>
                #include <mutex>
                #include <condition_variable>
                #include <list>
    
                template<typename T>
                class SynQue
                {
                private:
                    std::condition_variable notFullCv;
                    std::condition_variable notEmptyCv;
                    std::mutex              mut;
                    std::list<T>            que;
                    int                     queMaxSize;
                public:
                    SynQue(int maxSize): queMaxSize(maxSize) {}
                    ~SynQue() {}
    
                    // (1) const ref: not modify the pointed obj 
                    void put(const T& x)
                    {
                        std::unique_lock<std::mutex> lk(mut);
                        notFullCv.wait(lk,
                            [this] {return notFull(); });
    
                        que.push_back(x);
                        notEmptyCv.notify_one();
                    }
    
                    // (2) ref: modify modify the pointed obj
                    void take(T& x)
                    {
                        std::unique_lock<std::mutex> lk(mut);
                        notEmptyCv.wait(lk,
                            [this] {return notEmpty(); });
    
                        x = que.front();
                        que.pop_front();
                        notFullCv.notify_one();
                    }
    
                    // (3) func of condition sastified (woke up) on cv.wait need not use locker: locker already exist before cv.wait 
                    bool notFull()
                    {
                        bool isFull = (que.size() == queMaxSize);
                        if (isFull)
                            std::cout << "buf full, waiting ...\n";
                        return !isFull;
                    }
    
                    bool notEmpty()
                    {
                        bool isEmpty = que.empty();
                        if(isEmpty)
                            std::cout << "buf empty, waiting ...\n";
                        return !isEmpty;
                        // return ! que.empty();
                    }
                };
    
                SynQue<int> synQue(100);
    
                void f1()
                {
                    std::cout << "thread1 start ...\n";
                    synQue.put(1);
                    std::cout << "thread1 end ...\n";
                }
    
                void f2()
                {
                    int result = 0;
                    std::cout << "thread2 start ...\n";
                    synQue.take(result);
                    std::cout << "thread2 result: " << result << std::endl;
                    std::cout << "thread2 end ...\n";
                }
    
                int main()
                {
                    std::thread t1(f1);
                    std::thread t2(f2);
                    t1.join();
                    t2.join();
                }
                
                Note: 有如下替换 
                    unique_lock        换为 lock_guard
                    condition_variable 换为 condition_variable_any
                    cv.wait 版本1      换位 版本 2
                    
    5.4 原子变量
    
        std::atomic<T> 
        
        用 原子变量 就不需要用 mutex 来 protect 该变量 了 
        
        // (1) 用 mutex 实现的 计时器 
        struct Counter
        {
            int value;
            std::mutex mut;
            
            void increment()
            {
                std::lock_guard<std::mutex> lk(mut);
                ++value;
            }
            
            void decrement()
            {
                std::lock_guard<std::mutex> lk(mut);
                --value;
            }
            
            int get()
            {
                return value;
            }   
        };
        
        // (2) 用 原子变量 实现的 计时器 
        #include <atomic>
        struct Counter
        {
            std::atomic<int> value;
    
            void increment()
            {
                ++value;
            }
    
            void decrement()
            {
                --value;
            }
    
            int get()
            {
                return value.load(); // read
            }
        };
        
    5.5 call_once/once_flag
    
        call_once
            保证 多线程 下, 函数 只被调用 1 次 
            
                如, 初始化函数: 某对象 只能初始化 1 次 
            
            once_flag 作 第1参数 
            
        // callOnce.cpp
        #include <iostream>
        #include <thread>
        #include <mutex>
    
        std::once_flag onceFlag;
    
        void doOnce()
        {
            std::call_once(onceFlag,
                []() { std::cout << "called once\n"; } );
        }
    
        int main()
        {
            std::thread t1(doOnce);
            std::thread t2(doOnce);
    
            t1.join();
            t2.join();
        }
        
    5.6 异步操作 类 
    
        ——————————————————————————————————————————————————————————————————————————————————
        std::future         异步操作结果 的 传输通道
        
                            便于 get 线程函数 return value 
        ——————————————————————————————————————————————————————————————————————————————————                  
        std::promise        包装/保存 ( shared status's ) value
                                                             |
                                                             |
                            绑定 data 与 future: promise.set_value_...(dataValue)
                                                              |
                                                              |
                            便于 线程赋值 / 获取 线程函数中 某个 (在 promise 上 set 的) 值  
        ——————————————————————————————————————————————————————————————————————————————————                  
        std::package_task   包装/保存 callableObj: func / funcObject / lambda / bindExpr
        
                            绑定 callableObj 与 future 
                            
                            便于 异步调用 / 获取 taskFunc(callableObj) return value
        ——————————————————————————————————————————————————————————————————————————————————
        
        1   std::future
            
            引入: 线程函数 return value
            
                threadObj.join() 无法 直接 get  
            
                (1) 间接获得 (繁琐) 
                    
                    [1] 定义 (global) variable
                    
                    [2] 线程函数 中 给该 variable 赋值 -> variable 作 return value
                    
                    [3] join
                    
                    [4] get 线程函数 return value = variableValue
                
                (2) C++ 语言级支持: thread 库 提供
                    
                    future 来 `访问 异步(操作的)结果` 
                                |
                                |
                        不能马上获取, 只能在 未来 某时 从 某地获取 
                            => 是 未来的期待值 => 称 future 
                                    
                    [1] 查 std::future 的 状态 std::future_status 可知 
                        
                        异步任务 的 执行情况 
                            
                            std::future_status::
                                                
                            1]  deferred    还没开始 
                            
                            2]  ready       已完成
                            
                            3]  timeout     超时
                        
                    [2] 异步操作的等待 & 异步结果 的 获取
                    
                        future 的 memFunc
                        
                        ——————————————————————————————————————————————————————
                        同步方式 
                        
                            等待 异步操作 完成          1] wait
                                                        2] wait_for: 超时等待 
    
                                并 返回/获取 异步结果    3] get
                        ——————————————————————————————————————————————————————      
                                    
        2   std::promise
            
            (1) 线程函数(callableObj 总称) `引用接收` 外传来的 promise,
                
                对 该 promise set_value_ - - - - - - - - - -
                                                            |
            (2) 线程函数 完成后, 在 外部                  |
                                                            |
                1] 从 promise 获取 promise 中 future        |
                                                            |/
                2] 从 future 获取 线程函数中 某个 (在 promise 上 set 的) 值 
            
        3   std::package_task
            
            (1) 线程函数 `引用接收` 外传来的 packaged_task
                
                packaged_task 保存 taskFunc
                
                    => 线程 真正运行的函数 是保存于 packaged_task 中的 taskFunc
                                                                        |
            (2) 线程函数 完成后, 在 外部                              |
                                                                        |
                1] 从 promise 获取 promise 中 future                    |
                                                                        |
                2] 从 future 获取 taskFunc return value <- - - - - - - - 
        
        4   std::future / std::promise / std::package_task 3者关系 
                    
            (1) std::future 与 另 两者 关系
                
                `异步操作结果` 的 传输通道
                        |
                        |   是 
                        |
                    1] 线程中某个值               - std::promise 
                                                        
                    2] taskFunc 的 return value  - std::package_task
            
            (2) std::promise 与 std::package_task
                
                可将 std::package_task 所 wrap 的 异步操作 的 结果 保存到 std::promise
        
        5   future 可 move 不可 copy
            
            想将 future 放到 容器, 用 shared_future 
        
        例
            // 1 future
            std::future_status status;
            do
            {
                status = future.wait_for(std::chrono::seconds(1) );
                if(status == std::future_status::deferred)
                    ;
                if(status == std::future_status::ready) 
                    ;
                if(status == std::future_status::timeout)
                    ;
            } while( status != std::future_status::ready);
                        
            // 2 promise.cpp
            #include <iostream>
            #include <thread>
            #include <future>
    
            void threadFunc(std::promise<int>& promise)
            {
                promise.set_value_at_thread_exit(1);
            }
    
            int main()
            {
                std::promise<int> pr;                   // (1)
    
                // std::thread t(threadFunc, std::ref(pr)); // also right
    
                std::thread t([](std::promise<int>& promise)
                                { 
                                    promise.set_value_at_thread_exit(1); // (3)
                                },
                                std::ref(pr));          // (2) Note: pass by ref
    
                std::future<int> fut = pr.get_future(); // (4)
    
                auto result = fut.get();                // (5)
    
                std::cout << "result: " << result << std::endl;
                
                t.join();                               // (6) join() thread is necessary
            }
            
            // 3 packagedTask.cpp
            #include <iostream>
            #include <thread>
            #include <future>
            #include <chrono>
    
            int taskFunc()
            {
                std::cout << "task thread start...\n";
                std::this_thread::sleep_for(std::chrono::seconds(1) );
                return 5; // (3)
            }
    
            int main()
            {
                std::packaged_task<int()> packagedTask(taskFunc); // (1)
                /*
                std::packaged_task<int()> packagedTask( []() 
                    {
                        std::cout << "task thread start...\n";
                        std::this_thread::sleep_for(std::chrono::seconds(1));
                        return 5;
                     } ); 
                */
    
                std::cout << "task saved \n";
    
                std::thread t(std::ref(packagedTask) );             // (2) Note: pass by ref
    
                std::future<int> fut = packagedTask.get_future();   // (4) get_future()
    
                auto result = fut.get();                            // (5) get()
    
                std::cout << "result: " << result << std::endl;
    
                t.join();                                           // (6) join() thread is necessary
            }   
    
            // 4 promisePackagedTask.cpp
            #include <iostream>
            #include <thread>
            #include <future>
            #include <chrono>
    
            int packagedTaskFunc()
            {
                std::cout << "task thread start ...\n";
    
                std::this_thread::sleep_for(std::chrono::seconds(1) );
                return 5; 
            }
            void promiseThreadFunc(std::promise<int>& promise, std::packaged_task<int()>& packagedTask)
            {
                std::cout << "promise thread start ...\n";
    
                std::future<int> fut = packagedTask.get_future();   
                auto result = fut.get();                            
    
                std::cout << "packagedTask result: " << result << std::endl;
    
                promise.set_value_at_thread_exit(result);
            }
            int main()
            {
                std::packaged_task<int()> packagedTask(packagedTaskFunc); 
                std::promise<int> pr;
    
                std::thread t1(std::ref(packagedTask));         
                std::thread t2(promiseThreadFunc, std::ref(pr), std::ref(packagedTask) );
    
                std::future<int> fut = pr.get_future();
                auto result = fut.get();                
                std::cout << "promise result: " << result << std::endl;
    
                t1.join();                  
                t2.join();
            }
    
            // 5 shared_future.cpp
            #include <iostream>
            #include <vector>
            #include <thread>
            #include <future>
            #include <chrono>
            // #include <utility>
    
            int func(int x)
            {
                return x + 2;
            }
    
            int main()
            {
                // (1)
                std::packaged_task<int(int)> packagedTask(func);
                std::future<int> fut = packagedTask.get_future();
    
                std::thread(std::move(packagedTask), 2 ).detach();
    
                int value = fut.get();
                std::cout << "return value: " << value << std::endl;
    
                // (2)
                std::vector<std::shared_future<int> > vec;
                
                // Note: shared_future Ctor's arg must be rvalue
                // shared_future( std::future<T>&& other ) noexcept; 
                // => below is error
                // auto fut = std::async(std::launch::async,[](int a, int b) {return a + b;  }, 2, 3);
                // vec.push_back(fut); // error: there's no conversion from lvalue std::shared_future<int> to std::shared_future<int>
                vec.push_back( std::async(std::launch::async,
                                [](int a, int b) {return a + b;  }, 2, 3) 
                             );
    
                std::cout << vec[0].get() << std::endl;
            }
            
    5.7 异步操作 函数
    
        std::async  
            
            比 std::promise / std::packaged_task / std::thread 更高1层 (应 优先使用), 
                
                [1] 可用于 直接创建 异步 task 
                
                    异步任务的结果 保存在 async 返回的 std::future<T> 中 
                    
                [2] 使 coder 不用关注 线程创建 内部细节, 就能 
                    方便获取 异步 执行状态 和 结果 
                    
        std::async(std::launch::async | std::launch::deferred, 
                   f, args, ...)
                   
            2 种 线程创建策略 
                
                1] 默认策略 std::launch::async
                    
                    调用 async 时, 创建线程 
                
                2] std::launch::deferred
                    直到调用了 future 的 get() / wait() 时, 才 创建线程
                    
        例       
            // promise.cpp
            #include <iostream>
            #include <vector>
            #include <thread>
            #include <future>
            #include <chrono>
    
            int main()
            {
                std::future<int> fut = std::async(std::launch::async,
                    []() 
                    {
                        std::this_thread::sleep_for(std::chrono::seconds(3) );
                        return 5;
                    } );
                    
                std::cout << "calling thread waiting ...\n";
    
                std::future_status futStatus;
                do
                {
                    futStatus = fut.wait_for(std::chrono::seconds(1) );
                    if (futStatus == std::future_status::deferred)
                        std::cout << "deferred \n";
                    if (futStatus == std::future_status::ready)
                        std::cout << "ready \n";
                    if (futStatus == std::future_status::timeout)
                        std::cout << "timeout \n";
                } while (futStatus != std::future_status::ready);
    
                std::cout << "result: " << fut.get() << std::endl;
            }
    
    
            // possible print
            calling thread waiting ...
            timeout
            timeout
            ready
            result: 5
    

    相关文章

      网友评论

          本文标题:chapter5 C++11 多线程: 深入应用 C++11

          本文链接:https://www.haomeiwen.com/subject/rualurtx.html