美文网首页代码改变世界
C++多线程同步的几种方式

C++多线程同步的几种方式

作者: Dufre | 来源:发表于2020-07-13 22:14 被阅读0次

    Overview

    C++的多线程同步方式有这么几种:

    • mutex
      • lock_guard
      • unique_lock
    • condition_variable
    • future
      • promise
      • packaged_task
      • async

    C++11并没有提供semaphore的API,信号量太容易出错了(too error prone),通过组合互斥锁(mutex)和条件变量(condition variable)可以达到相同的效果,且更加安全。

    mutex

    官网介绍:mutex

    它包含以下三个部分:

    • Mutex type
    • Locks:lock_guardunique_lock
    • Functions:try_locklock

    example:

    mutex的实现也很简单,在进入临界区之前调用该变量(mtx)的lock函数,出临界区之前调用该变量的(mtx)的unlock函数。所以程序会连续输出50个'*'或者连续输出50个'$',而不会'*' '$'交替输出。

    但是考虑这样一个问题,std::cout << '\n',这里发生异常会发生什么?抛出异常后,意味着mtx.unlock()不会被执行,即锁没有被释放,整个程序进入不了临界区,该程序往往会挂死。

    // mutex example
    #include <iostream>       // std::cout
    #include <thread>         // std::thread
    #include <mutex>          // std::mutex
    
    std::mutex mtx;           // mutex for critical section
    
    void print_block (int n, char c) {
      // critical section (exclusive access to std::cout signaled by locking mtx):
      mtx.lock();
      for (int i=0; i<n; ++i) { std::cout << c; }
      std::cout << '\n';
      mtx.unlock();
    }
    
    int main ()
    {
      std::thread th1 (print_block,50,'*');
      std::thread th2 (print_block,50,'$');
    
      th1.join();
      th2.join();
    
      return 0;
    }
    

    Possible output (order of lines may vary, but characters are never mixed):

    **************************************************
    $$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$
    

    lock_guard

    在构造时,互斥对象被调用线程锁定,而在销毁时,互斥对象被解锁。它是最简单的锁,作为具有自动持续时间的对象特别有用,该持续时间一直持续到其上下文结束。这样,可以保证在抛出异常的情况下互斥对象已正确解锁。

    example

    // lock_guard example
    #include <iostream>       // std::cout
    #include <thread>         // std::thread
    #include <mutex>          // std::mutex, std::lock_guard
    #include <stdexcept>      // std::logic_error
    
    std::mutex mtx;
    
    void print_even (int x) {
      if (x%2==0) std::cout << x << " is even\n";
      else throw (std::logic_error("not even"));
    }
    
    void print_thread_id (int id) {
      try {
        // using a local lock_guard to lock mtx guarantees unlocking on destruction / exception:
        std::lock_guard<std::mutex> lck (mtx);
        print_even(id);
      }
      catch (std::logic_error&) {
        std::cout << "[exception caught]\n";
      }
    }
    
    int main ()
    {
      std::thread threads[10];
      // spawn 10 threads:
      for (int i=0; i<10; ++i)
        threads[i] = std::thread(print_thread_id,i+1);
    
      for (auto& th : threads) th.join();
    
      return 0;
    }
    

    possible output

    [exception caught]
    2 is even
    [exception caught]
    4 is even
    [exception caught]
    6 is even
    [exception caught]
    8 is even
    [exception caught]
    10 is even
    

    unique_lock

    unique_lock基本用法和lock_guard一致,在构造函数和析构函数中进行锁操作,不同的地方在于它提供了非常多构造函数。

    example

    // unique_lock example
    #include <iostream>       // std::cout
    #include <thread>         // std::thread
    #include <mutex>          // std::mutex, std::unique_lock
    
    std::mutex mtx;           // mutex for critical section
    
    void print_block (int n, char c) {
      // critical section (exclusive access to std::cout signaled by lifetime of lck):
      std::unique_lock<std::mutex> lck (mtx);
      for (int i=0; i<n; ++i) { std::cout << c; }
      std::cout << '\n';
    }
    
    int main ()
    {
      std::thread th1 (print_block,50,'*');
      std::thread th2 (print_block,50,'$');
    
      th1.join();
      th2.join();
    
      return 0;
    }
    

    condition_variable

    条件变量是一个对象,可以阻塞线程,直到被通知恢复。当调用其等待功能之一时,它使用unique_lock(通过互斥锁)来锁定线程。该线程将保持阻塞状态,直到被另一个在同一个condition_variable对象上调用通知功能的线程唤醒为止。

    Wait functions

    • wait
      • Wait until notified (public member function )
    • wait_for
      • Wait for timeout or until notified (public member function )
    • wait_until
      • Wait until notified or time point (public member function )

    Notify functions

    • notify_one
      • Notify one (public member function )
    • notify_all
      • Notify all (public member function )

    example

    // condition_variable::notify_one
    #include <iostream>           // std::cout
    #include <thread>             // std::thread
    #include <mutex>              // std::mutex, std::unique_lock
    #include <condition_variable> // std::condition_variable
    
    std::mutex mtx;
    std::condition_variable produce,consume;
    
    int cargo = 0;     // shared value by producers and consumers
    
    void consumer () {
      std::unique_lock<std::mutex> lck(mtx);
      while (cargo==0) consume.wait(lck);
      std::cout << cargo << '\n';
      cargo=0;
      produce.notify_one();
    }
    
    void producer (int id) {
      std::unique_lock<std::mutex> lck(mtx);
      while (cargo!=0) produce.wait(lck);
      cargo = id;
      consume.notify_one();
    }
    
    int main ()
    {
      std::thread consumers[10],producers[10];
      // spawn 10 consumers and 10 producers:
      for (int i=0; i<10; ++i) {
        consumers[i] = std::thread(consumer);
        producers[i] = std::thread(producer,i+1);
      }
    
      // join them back:
      for (int i=0; i<10; ++i) {
        producers[i].join();
        consumers[i].join();
      }
    
      return 0;
    }
    

    Possible output (order of consumed cargoes may vary):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    

    future

    future的目标是充分利用CPU的并发性,它只能通过asyncpromisepackage_task三种方式构造。future只能移动,不可复制,需要复制时可以使用shared_future,但通常不建议使用。调用future的get()时可能会发生阻塞,直到返回值ready。future有三种姿势的等待:

    • wait():一直等待直到得到返回值
    • wait_for():设定一个超时时间;
    • wait_until():等待到某个时间点。

    future有一特化版本future<void>,返回值为空,即不返回任何值,因此仅能用于线程间通知,但却是最常用的future。

    promise

    promise对象可以通过调用成员get_future将此共享状态与future对象关联。调用之后,两个对象共享相同的共享状态:

    • promise:promise对象是异步提供者,在共享状态的时候设置一个值
    • future负责:future对象是异步返回对象,可以检索共享状态的值,并在必要时等待其准备就绪

    example

    // promise example
    #include <iostream>       // std::cout
    #include <functional>     // std::ref
    #include <thread>         // std::thread
    #include <future>         // std::promise, std::future
    
    void print_int (std::future<int>& fut) {
      int x = fut.get();
      std::cout << "value: " << x << '\n';
    }
    
    int main ()
    {
      std::promise<int> prom;                      // create promise
    
      std::future<int> fut = prom.get_future();    // engagement with future
    
      std::thread th1 (print_int, std::ref(fut));  // send future to new thread
    
      prom.set_value (10);                         // fulfill promise
                                                   // (synchronizes with getting the future)
      th1.join();
      return 0;
    }
    

    Output:

    value: 10
    

    packaged_task

    很多情况下并不希望另起一个线程,因为线程是非常重要的资源。因此希望可以合理的管理线程资源,这就需要使用线程池。如何将future与线程池同时使用呢?这就需要采用package_task。package_task本质是将一个函数包装成一个future。这个task类似于std::function,有输入输出,大家可以将其认为是一个异步函数,但该异步函数并不负责执行,而是将其结果预置于一个future变量中,然后交给一个线程来实际执行,此时主线程便可以得到其返回值。

    example

    // packaged_task example
    #include <iostream>     // std::cout
    #include <future>       // std::packaged_task, std::future
    #include <chrono>       // std::chrono::seconds
    #include <thread>       // std::thread, std::this_thread::sleep_for
    
    // count down taking a second for each value:
    int countdown (int from, int to) {
      for (int i=from; i!=to; --i) {
        std::cout << i << '\n';
        std::this_thread::sleep_for(std::chrono::seconds(1));
      }
      std::cout << "Lift off!\n";
      return from-to;
    }
    
    int main ()
    {
      std::packaged_task<int(int,int)> tsk (countdown);   // set up packaged_task
      std::future<int> ret = tsk.get_future();            // get future
    
      std::thread th (std::move(tsk),10,0);   // spawn thread to count down from 10 to 0
    
      // ...
    
      int value = ret.get();                  // wait for the task to finish and get result
    
      std::cout << "The countdown lasted for " << value << " seconds.\n";
    
      th.join();
    
      return 0;
    }
    

    Possible output:

    10
    9
    8
    7
    6
    5
    4
    3
    2
    1
    Lift off!
    The countdown lasted for 10 seconds.
    

    async

    有时某项工作很早就可以开始做(前置条件都已完备),而等待这件工作结果的任务在非常靠后的位置,这时候就需要async。换言之,如果可以尽早开始做一件事,就让其在后台运行即可,或快或慢都可以,只需在需要结果的时候运行完成就好。

    example

    // async example
    #include <iostream>       // std::cout
    #include <future>         // std::async, std::future
    
    // a non-optimized way of checking for prime numbers:
    bool is_prime (int x) {
      std::cout << "Calculating. Please, wait...\n";
      for (int i=2; i<x; ++i) if (x%i==0) return false;
      return true;
    }
    
    int main ()
    {
      // call is_prime(313222313) asynchronously:
      std::future<bool> fut = std::async (is_prime,313222313);
    
      std::cout << "Checking whether 313222313 is prime.\n";
      // ...
    
      bool ret = fut.get();      // waits for is_prime to return
    
      if (ret) std::cout << "It is prime!\n";
      else std::cout << "It is not prime.\n";
    
      return 0;
    }
    

    Possible output (the first two lines may be in a different order, or scrambled):

    Checking whether 313222313 is prime.
    Calculating. Please, wait...
    It is prime!
    

    同步在我的微信公众号上


    Coder101

    Reference

    相关文章

      网友评论

        本文标题:C++多线程同步的几种方式

        本文链接:https://www.haomeiwen.com/subject/odxthktx.html