美文网首页
C++同步并发操作

C++同步并发操作

作者: 龙虾天天 | 来源:发表于2021-12-26 14:43 被阅读0次

    何时需要线程同步

    • 线程完成前,需要等待另一个线程执行
    • 线程需要等待特定事件发生
    • 线程等待某个条件变为true

    线程同步的方式

    1. 持续检查共享标记
    void wait_for_flag() {
        std::unique_lock lock(m);
    
        while (!flag) {
            lock.unlock();
            lock.lock();
        }
      
        do_something();
    }
    
    1. 等待线程在检查间隙
    void wait_for_flag() {
        std::unique_lock lock(m);
    
        while (!flag) {
            lock.unlock();
            std::this_thread::sleep_for(std::chrono::milliseconds(100)); //休眠
            lock.lock();
        }
      
        //going on next
    }
    
    1. 条件变量(condition variable)

    1 条件变量使用方式

    目前有以下两种方式,两者都需要与一个互斥量才能工作。

    • std::condition_variable: 仅限于和 std::mutex一起工作
    • std::condition_variable_any:可以和任何满足最低标准的互斥量一起工作,更加通用,但是体积、性能、系统资源会产生额外的开销
    std::mutex lock;
    std::queue<data_set> data_queue;
    std::condition_variable data_cond;
    
    void data_preparation_thread() {
        while (more_data_to_prep()) {
            data_set  const data = prep_data();
            std::lock_guard l(lock);
            data_queue.push(data);
            /**
            1. notify_one()触发一个正在执行wait()的线程,去检查条件和wait()函数的返回状态
            2. 另一种可能是,很多线程等待同一事件,对于通知他们都需要做出回应,需要使用notify_all()
            **/
            data_cond.notify_one(); 
        }
    }
    
    void data_processing_thread() {
        while (true) {
            std::unique_lock l(lock); //后续需要unlock, 因此不能用lock_guard
            /*1. 在条件满足的情况下,从wait()返回并继续持有锁。当条件不满足时,线程将对互斥量解锁,并且重新开始等待
              2. 当等待线程重新获取互斥量并检查条件时,如果它并非直接响应另一个线程的通知,这就是所谓的“伪唤醒”(spurious wakeup)。*/
            data_cond.wait(l, []{return !data_queue.empty();}); 
            /**
            另一种形式:
            if (data.queue.empty()) {
                data_cond.wait(l);
            }
            **/
            data_set data = data_queue.front();
            data_queue.pop;
            l.unlock();
    
            process(data);
    
            if (is_last_data(data)) {
                break;
            }
        }
    }
    

    2 使用期待处理一次性事件

    当等待线程只等待一次,当条件为true时,它就不会再等待条件变量了,那么这种情况下使用条件变量会存在一定的浪费。

    C++将这种一次性事件称为“期望”(future)。当一个线程需要等待一个特定的一次性事件时,future有以下集中应用方式:

    1. 这个线程可以周期性(较短的周期)的等待或检查,事件是否触发(检查信息板);
    2. 在检查期间也可以执行其他任务;
    3. 在等待任务期间它可以先执行另外一些任务,直到对应的任务触发,而后等待期望的状态会变为“就绪”(ready)

    C++有两种期望类型: std::futurestd::shared_future
    std::future的实例只能与一个指定事件相关联,而 std::shared_future的实例就能关联多个事件。后者的实现中,所有实例会在同时变为就绪状态,并且他们可以访问与事件相关的任何数据。

    2.1 std::future

    比如需要一个长时间的运算,但是现在并不需要关注这个值,在需要时再去获取,我们来看条件变量的方式

    bool result_ok();
    int get_result(); //假设result非0
    int result = 0; 
    std::mutex lock;
    
    void main() {
        std::unique_lock l(lock); //1. 互斥量
        while(!result) {
            wait(l, result_ok);   //2. 条件变量, 阻塞等待
            result = get_result;   
        }
    }
    
    //3. 独立计算result的线程 
    std::unique_lock l(lock);
    calculate_result();
    notify_all();
    

    接下来是future的方式:

    /*当任务的结果你不着急要时,你可以使用std::async启动一个异步任务。与std::thread对象等待运行方式的不同,std::async会返回一个std::future对象,这个对象持有最终计算出来的结果。当你需要这个值时,你只需要调用这个对象的get()成员函数;并且直到“期望”状态为就绪的情况下,线程才会阻塞;之后,返回计算结果*/
    void main() {
        std::future<int> future_result = std::async(calculate_result);
        do_some_other() //如果暂时不需要结果,可以做些其他事情
        result = future_result.get();  //需要的时候就去获取结果,如果future未ready则阻塞
    }
    

    更多请查看std::future

    async简述
    函数模板 async 异步地运行函数 f (潜在地在可能是线程池一部分的分离线程中),并返回最终将保有该函数调用结果的 std::future 。

    async的构造方式:

    1. 传入函数+参数: auto f2=std::async(bar,"goodbye")
    2. 传入成员函数指针+成员类的对象+成员函数参数:auto f1=std::async(&X::foo,&x,42,"hello")
    3. 传入引用:std::async(baz,std::ref(x)); // 调用baz(x)
      更多构造方式见std::async

    在函数调用之前,向std::async传递一个额外参数, 参数的类型是std::launch,有以下几种取值:

    • 若设置 async 标志, 即 std::launch::async, 则 async 在新的执行线程(初始化所有线程局域对象后)执行可调用对象 f ,如同产出 std::thread(std::forward<F>(f), std::forward<Args>(args)...),除了若 f 返回值或抛出异常,则于可通过 async 返回给调用方的 std::future访问的共享状态存储结果。
    • 若设置 deferred 标志,即 std::launch::deferred, 则 async 以同 std::thread 构造函数的方式转换 f 与 args... ,但不产出新的执行线程。而是进行惰性求值:在 async 所返回的 std::future 上首次调用非定时等待函数,将导致在当前线程(不必是最初调用 std::async 的线程)中,以 args... (作为右值传递)的副本调用 f (亦作为右值)的副本。将结果或异常置于关联到该 future 的共享状态,然后才令它就绪。对同一 std::future 的所有后续访问都会立即返回结果。
    • 若 policy 中设置了 std::launch::async 和 std::launch::deferred 两个标志,则进行异步执行还是惰性求值取决于实现
    auto f6=std::async(std::launch::async,Y(),1.2);  // 在新线程上执行
    auto f7=std::async(std::launch::deferred,baz,std::ref(x));  // 在wait()或get()调用时执行
    

    2.2 std::shared_future

    std::future 所引用的共享状态不与另一异步返回对象共享, std::future模型独享同步结果的所有权,并且通过调用 get()函数,一次性的获取数据,这就让并发访问变的毫无意义——只有一个线程可以获取结果值,因为在第一次调用 get()后,就没有值可以再获取了,再次调用 get()会抛出异常
    std::shared_future允许多个线程等候同一共享状态, 可用于同时向多个线程发信,类似 std::condition_variable::notify_all(),多个对象可以引用同一关联“期望”的结果,简而言之,std::shared_future中共享状态可以被 get()多次。
    注意在每一个std::shared_future的独立对象上成员函数调用返回的结果还是不同步的,所以为了在多个线程访问一个独立对象时,避免数据竞争,必须使用锁来对访问进行保护。

    int queryNumber();
    void doSomething(char c, shared_future<int> f);
    void check() {
        try {
            shared_future<int> f = std::async(queryNumber);
    
            auto f1 = std::async(std::launch::async, doSomething, '.', f);
            auto f2 = std::async(std::launch::async, doSomething, '+', f);
    
            f1.get();
            f2.get();
        }
        catch (const std::exception& e) {
            std::cout << "Exception: " << e.what << endl; 
        }
    }
    

    3 更多的使用 future的方式

    除了前面 std::async的使用方式,还可以使用 std::package_taskstd::promisestd::package_task可以封装一个可调用对象,待后续调用,而 std::promise则可以封装一个值,待后续使用。

    3.1 std::package_task

    std::packaged_task<>对一个函数或可调用对象,绑定一个期望。当 std::packaged_task<> 对象被调用,它就会调用相关函数或可调用对象,将期望状态置为就绪,返回值也会被存储为相关数据。
    它包装任何可调用 (Callable) 目标,包括函数、 lambda 表达式、 bind 表达式或其他函数对象,使得能异步调用它。

    std::packaged_task<>的模板参数是一个函数签名,如:

    int f(int x, int y) { return std::pow(x,y); }
    std::packaged_task<int(int,int)> task(f)
    

    使用std::packaged_task关联的std::future对象保存的数据类型是可调对象的返回结果类型,如示例函数的返回结果类型是int,那么声明为 std::future<int>,而不是 std::future<int(int)>

    int Add(int x, int y);
    
    void task_lambda() {
        int ret;
        std::packaged_task<int(int, int)> task([](int a, int b){return a + b;}); //使用lamba表达式包装可调用函数
    
        task(2, 10); //启动任务,非异步
    
        std::future<int> result = task.get_future();
        ret = result.get(); //获取共享状态的值
    
        task.reset(); //重置共享状态
        result = task.get_future();
    
        thread td(std::move(task), 2, 10) //异步启动
        ret = result.get();
    }
    
    

    3.2 std::promise

    类模板 std::promise 提供存储值或异常的设施,之后通过 std::promise 对象所创建的 std::future 对象异步获得结果。注意 std::promise 只应当使用一次。

    promise 是 promise-future 交流通道的“推”端:存储值于共享状态的操作同步于任何在共享状态上等待的函数(如 std::future::get )的成功返回。其他情况下对共享状态的共时访问可能冲突:例如, std::shared_future::get 的多个调用方必须全都是只读,或提供外部同步。

    一对 std::promise/std::future在期望上可以阻塞等待线程,同时,提供数据的线程可以使用组合中的“承诺”来对相关值进行设置,以及将“期望”的状态置为“就绪”。

    可以通过 get_future()成员函数来获取与一个给定的 std::promise相关的 std::future对象,就像是与 std::packaged_task相关。当“承诺”的值已经设置完毕(使用set_value()成员函数),对应“期望”的状态变为“就绪”,并且可用于检索已存储的值。当你在设置值之前销毁std::promise,将会存储一个异常。

    在调用std::future::get()时,如果std::future对象状态不是ready,则调用的地方将一直阻塞等待。

    int thread_task(std::promise<int> & pro, int i) {
        std::this_thread::sleep_for(std::chrono::miliseconds(1000));
        pro.set_value(i); //提醒 future
        return 0;
    }
    
    void main() {
        std::promise<int> pro; //promise<int> 在线程间传递结果
        std::thread mythread(thread_task, std::ref(pro), 5);
        mythread.join();
      
        std::future<int> result = pro.get_future();
        //future::get() 将等待直至该 future 拥有合法结果并取得它 
        std::cout << result.get() << std::endl;
    }
    

    相关文章

      网友评论

          本文标题:C++同步并发操作

          本文链接:https://www.haomeiwen.com/subject/hresqrtx.html