美文网首页C++多线程
C++11新特性--并发、原子性、锁、条件变量

C++11新特性--并发、原子性、锁、条件变量

作者: 于天佐 | 来源:发表于2018-02-23 16:47 被阅读68次

    线程与C++

        在C++11之前的标准中,在C++语言层面是没有对线程的支持的,所以在特定平台编写(windows,linux等)跟线程相关的C++程序往往是要结合所在平台的线程相关API来操作线程,如在Windows上创建线程的API是CreateThread,在其他平台(mac、unix、linux等)通常可以用posixthread的API来创建线程即pthread_create。反观java语言,我们会发现Java的线程处理在不同平台是一致的,因为它在语言层面支持了线程也就是并发的一系列操作,开发者不必关心平台相关的细节同样可以编写出行为一致的跨平台程序。在C++11标准中,引入了线程以及并发操作所需的一切,这样使得C++同Java一样可以不关心平台的编写并发相关的代码,这所带来的便利我认为是C++11标准带来的最大好处。
        随着新标准而来的不仅仅是线程的支持,并发编程需要的一系列操作都有了语言层面的支持,如锁、原子性操作、条件变量。这些以往在C++编程中被视作系统能力的特性,在新的标准中统统那入了标准库的实现中。下面我们来分别看一下标准库对他们的支持以及它们的使用。

    std::thread

        std::thread是一个类包装,它包装了所在平台的线程实现,隐藏了所有细节,对外提供了统一的语义。首先看一下它的构造函数和operator=函数:

    default (1) 
    thread() noexcept;
    initialization (2)  
    template <class Fn, class... Args>
    explicit thread (Fn&& fn, Args&&... args);
    copy [deleted] (3)  
    thread (const thread&) = delete;
    move (4)    
    thread (thread&& x) noexcept;
    ///////////////////////////////////////////
    move (1)    
    thread& operator= (thread&& rhs) noexcept;
    copy [deleted] (2)  
    thread& operator= (const thread&) = delete;
    

        我们可以看到拷贝构造和相应的operator=函数被显示删除了,这意味着线程对象不可复制,只能在std::thread对象之间进行move语义的移动。这不难理解,在个平台上thread对象都属于资源型对象,而且具有唯一性,复制它没有意义,所以在语言层面C++11标准库中的thread类直接用语义屏蔽了复制对象的路径。其余常用的三个函数见下面

    void join();
    id get_id() const noexcept;
    void detach();
    

        这里get_id()是统一对线程唯一标识的一个封装,隐藏了不同平台线程标识不同的细节。join和detach语义和posixthread中的语义相同,这里直接摘要一下函数说明。

    Join thread
    The function returns when the thread execution has completed.
    
    This synchronizes the moment this function returns with the completion of all the operations in the thread: This blocks the execution of the thread that calls this function until the function called on construction returns (if it hasn't yet).
    
    After a call to this function, the thread object becomes non-joinable and can be destroyed safely.
    
    Detach thread
    Detaches the thread represented by the object from the calling thread, allowing them to execute independently from each other.
    
    Both threads continue without blocking nor synchronizing in any way. Note that when either one ends execution, its resources are released.
    
    After a call to this function, the thread object becomes non-joinable and can be destroyed safely.
    

        下面直接看一段代码,很直观的可以明白std::thread的用法。

    class STDThreadTest
    {
    public:
        static void test_thread(void *pVoid)
        {
            std::cout << "work thread:" << STDThreadTest::g_thread_unum << std::endl;
            STDThreadTest::g_thread_unum = 2;
            std::cout << "work thread:" << STDThreadTest::g_thread_unum << std::endl;
        }
    
        static void execute()
        {
    //        auto lambdafunc = [](void *param) -> void
    //        {
    //            std::cout << "work thread:" << STDThreadTest::g_thread_unum << std::endl;
    //            STDThreadTest::g_thread_unum = 2;
    //            std::cout << "work thread:" << STDThreadTest::g_thread_unum << std::endl;
    //        };
    
            std::thread thread_test([](void *param) -> void
                                    {
                                        std::cout << "work thread:" << STDThreadTest::g_thread_unum << std::endl;
                                        STDThreadTest::g_thread_unum = 2;
                                        std::cout << "work thread:" << STDThreadTest::g_thread_unum << std::endl;
                                    }, (void *) nullptr);
            thread_test.join();//如果没有join或者detach线程的话,会报异常
            std::cout << "main thread:" << STDThreadTest::g_thread_unum << std::endl;
        }
    
        static thread_local int g_thread_unum;
    };
    
    thread_local int STDThreadTest::g_thread_unum = 1;
    

        上面代码展示了std::thread的基本用法,在它构造的时候传入相应的回调函数(可以是函数、functor、lambda)和回调函数所需要的参数(任意数量)。可以看出来结合lambda函数来使用std::thread异常的方便。这个例子还用到了C++11引入的thread_local关键字,它直接封装了平台相关的TLS实现,简单好用,用法一目了然不再做解释了。

    std::atomic

        这个类封装了原子操作所需要的所有操作,通过模板参数对所有基本类型进行了封装

    typedef atomic<bool>               atomic_bool;
    typedef atomic<char>               atomic_char;
    typedef atomic<signed char>        atomic_schar;
    typedef atomic<unsigned char>      atomic_uchar;
    typedef atomic<short>              atomic_short;
    typedef atomic<unsigned short>     atomic_ushort;
    typedef atomic<int>                atomic_int;
    typedef atomic<unsigned int>       atomic_uint;
    typedef atomic<long>               atomic_long;
    typedef atomic<unsigned long>      atomic_ulong;
    typedef atomic<long long>          atomic_llong;
    typedef atomic<unsigned long long> atomic_ullong;
    typedef atomic<char16_t>           atomic_char16_t;
    typedef atomic<char32_t>           atomic_char32_t;
    typedef atomic<wchar_t>            atomic_wchar_t;
    
    typedef atomic<int_least8_t>   atomic_int_least8_t;
    typedef atomic<uint_least8_t>  atomic_uint_least8_t;
    typedef atomic<int_least16_t>  atomic_int_least16_t;
    typedef atomic<uint_least16_t> atomic_uint_least16_t;
    typedef atomic<int_least32_t>  atomic_int_least32_t;
    typedef atomic<uint_least32_t> atomic_uint_least32_t;
    typedef atomic<int_least64_t>  atomic_int_least64_t;
    typedef atomic<uint_least64_t> atomic_uint_least64_t;
    
    typedef atomic<int_fast8_t>   atomic_int_fast8_t;
    typedef atomic<uint_fast8_t>  atomic_uint_fast8_t;
    typedef atomic<int_fast16_t>  atomic_int_fast16_t;
    typedef atomic<uint_fast16_t> atomic_uint_fast16_t;
    typedef atomic<int_fast32_t>  atomic_int_fast32_t;
    typedef atomic<uint_fast32_t> atomic_uint_fast32_t;
    typedef atomic<int_fast64_t>  atomic_int_fast64_t;
    typedef atomic<uint_fast64_t> atomic_uint_fast64_t;
    
    typedef atomic< int8_t>  atomic_int8_t;
    typedef atomic<uint8_t>  atomic_uint8_t;
    typedef atomic< int16_t> atomic_int16_t;
    typedef atomic<uint16_t> atomic_uint16_t;
    typedef atomic< int32_t> atomic_int32_t;
    typedef atomic<uint32_t> atomic_uint32_t;
    typedef atomic< int64_t> atomic_int64_t;
    typedef atomic<uint64_t> atomic_uint64_t;
    
    typedef atomic<intptr_t>  atomic_intptr_t;
    typedef atomic<uintptr_t> atomic_uintptr_t;
    typedef atomic<size_t>    atomic_size_t;
    typedef atomic<ptrdiff_t> atomic_ptrdiff_t;
    typedef atomic<intmax_t>  atomic_intmax_t;
    typedef atomic<uintmax_t> atomic_uintmax_t;
    

        所以平时我们常用的基本类型的原子操作都可以通过标准库中的定义找到。来看一下它提供的函数:

    Operations supported by certain specializations (integral and/or pointer, see below)
    fetch_add
    Add to contained value (public member function )
    fetch_sub
    Subtract from contained value (public member function )
    fetch_and
    Apply bitwise AND to contained value (public member function )
    fetch_or
    Apply bitwise OR to contained value (public member function )
    fetch_xor
    Apply bitwise XOR to contained value (public member function )
    operator++
    Increment container value (public member function )
    operator--
    Decrement container value (public member function )
    atomic::operator (comp. assign.)
    Compound assignments (public member function )
    

        绝大多数编程场景中我们只需调用operator++和operaotr--完成原子递增递减即可。这里虽然语义上是保证原子性操作的,但是还涉及到不同平台的内存模型,即所谓的“顺序一致性(memory_order)”,std::atomic提供了不同内存模型下memory_order的参数选择,不过默认值即可达到大部分我们预期的效果,有兴趣可以查阅相关的文档。更复杂的多线程变量读写可以应用锁来实现。下面来看一下C++11中的锁。

    std::mutex、std::lock_guard、std::unique_lock

        std::mutex是一个互斥量的包装类,它来封装平台相关的互斥量实现,它还有一些高级实现如timed_mutex等,我们这里只介绍一下mutex。它相当于一个锁头,但是锁上它以及开启它还需要其它类的帮忙,它们组合在一起就能实现对程序某些代码进行锁互斥的作用。锁定以及开锁的工具类就是std::lock_guard和std::unique_lock。我们直接看一段代码看看他们是如何工作的。

    #include <thread>
    #include <mutex>
    #include <iostream>
    
    
    class MutexLock
    {
    private:
        std::mutex m_mutex;
        int m_count;
    public:
        MutexLock() : m_count(0)
        {
    
        }
        ////////
    #define MAX_COUNT 50
    
        static void execute()
        {
            MutexLock lockobj;
            std::thread tobj(
                    [&]() -> void
                    {
                        while (lockobj.m_count < MAX_COUNT)
                        {
                            try
                            {
                                std::lock_guard<std::mutex> lock(lockobj.m_mutex);
                                std::cout << "thread1" << std::endl;
                                if (lockobj.m_count >= MAX_COUNT)
                                {
                                    break;
                                }
                                ++lockobj.m_count;
                            }
                            catch (...)
                            { std::cout << "lock_guard exp" << std::endl; }
                        }
                    }
            );
    
            std::thread tobj2(
                    [&]() -> void
                    {
                        while (lockobj.m_count < MAX_COUNT)
                        {
                            try
                            {
                                std::unique_lock<std::mutex> lock(lockobj.m_mutex);
                                std::cout << "thread2" << std::endl;
                                if (lockobj.m_count >= MAX_COUNT)
                                {
                                    break;
                                }
                                ++lockobj.m_count;
                                lock.unlock();
                                std::cout << "unlock" << std::endl;
                                std::cout << "unlock" << std::endl;
                                std::cout << "unlock" << std::endl;
                                std::cout << "unlock" << std::endl;
                                std::cout << "unlock" << std::endl;
                                std::cout << "unlock" << std::endl;
                                std::cout << "unlock" << std::endl;
                                std::cout << "unlock" << std::endl;
                                std::cout << "unlock" << std::endl;
                                std::cout << "unlock" << std::endl;
                                std::cout << "unlock" << std::endl;
                            }
                            catch (...)
                            { std::cout << "unique_lock exp" << std::endl; }
                        }
                    }
            );
            tobj.join();
            tobj2.join();
        }
    };
    

        上面代码很简单,两个线程同时竞争同一个锁,得到锁的线程会检查退出条件确保退出条件在无锁判断的时候没有被其他线程改变,然后对计数器进行加1。这里两个线程用到了两种锁,首先看一下std::lock_guard。它是一个模板类,接受一个模板参数指明用的是哪一种锁(前面提到的mutex或者timed_mutex等),它不提供加锁解锁函数,加锁解锁完全靠其对象的声明周期,上面使用的例子就是while循环体内。如果想自己控制它的加锁范围可以人为加代码块({})。后面一个线程用到的是std::unique_lock,用法和std::lock_guard一样,只不过再它的基础上加入了lock和unlock函数,更灵活的控制加锁解锁的时机。

    条件变量std::condition_variable

        熟悉posixthread的人应该很清楚条件变量的用法,C++11中的条件变量使用跟posix中的基本一直,不过更简单一些,有了它就可以实现对某一种状态的等待以及对满足某一种状态的唤醒。它需要跟锁一起搭配使用,这点跟posixthread是一致的。我们直接看一段代码来了解它的使用。

    class ConditionalTest
    {
    private:
        std::mutex m_mtx;
        std::condition_variable m_cv;
        int n_counter = 0;
    public:
    #define COUNTER 5
    
       void execute()
        {
            auto lambdafunc1 = [this]() -> void
            {
                try
                {
    
    
                    std::unique_lock<std::mutex> lock(m_mtx);
    //            while (n_counter < 5)
    //            {
    //                m_cv.wait(lock);
    //            }
                    m_cv.wait(lock, [this]
                    {
                        if (n_counter >= COUNTER)
                        {
                            return true;
                        } else
                        {
                            std::cout << "aweek but condition does not satisfied" << std::endl;
                            return false;
                        }
                    });
                    std::cout << "waitthread runs" << std::endl;
                    std::this_thread::sleep_for(std::chrono::seconds(2));
                    std::cout << "waitthread runs end" << std::endl;
                }
                catch (...)
                {}
            };
    
            auto lambdafunc2 = [this]() -> void
            {
                try
                {
                    std::unique_lock<std::mutex> lock(m_mtx, std::defer_lock_t());
                    do
                    {
                        lock.lock();
                        ++n_counter;
                        std::cout << "plusthread plus one " << std::endl;
                        m_cv.notify_all();
                        lock.unlock();
                        std::this_thread::sleep_for(std::chrono::seconds(1));
                    } while (n_counter < COUNTER);
    
                    std::cout << "plusthread notify end" << std::endl;
                    std::cout << "plusthread notify end" << std::endl;
                    std::cout << "plusthread notify end" << std::endl;
                    std::cout << "plusthread notify end" << std::endl;
                    std::cout << "plusthread notify end" << std::endl;
                    std::cout << "plusthread notify end" << std::endl;
                    std::cout << "plusthread notify end" << std::endl;
                    std::cout << "plusthread notify end" << std::endl;
                    std::cout << "plusthread notify end" << std::endl;
                    std::cout << "plusthread notify end" << std::endl;
                } catch (...)
                {}
            };
    
            std::thread t1(lambdafunc1), t2(lambdafunc2);
            t1.join();
            t2.join();
            std::cout << "all done counter is:" << n_counter << std::endl;
        }
    };
    

        上面代码需要注意的地方是每次条件变量notify的时候需要unlock锁,否则锁是不会立即释放的,这样等待的条件变量就不会即时的拿到锁执行下面的语句。另外线程1条件变量等待的地方是有一层额外的判断注释的部分和两个参数版本的wait部分,这里的语义跟posix是完全一致的,就是有可能存在虚假的唤醒,所以要加一层额外的判断,C++11库提供了我们现在使用的双参数版本进行条件比对,如果不满足会继续释放锁等待条件的满足。

    Generally, the function is notified to wake up by a call in another thread either
     to member notify_one or to member notify_all. But certain implementations 
    may produce spurious wake-up calls without any of these functions being 
    called. Therefore, users of this function shall ensure their condition for 
    resumption is met.
    

        上面是wait函数的解释的一部分,这个防御完全同posixthread中的条件变量防御完全一致,我猜应该就是适应posixthread中条件变量的这个问题。总之我推荐总是使用wait函数两个参数的版本,这样就不会出现问题了。

    最后

        通过对前面一些C++11标准库提供的关于并发编程的类的介绍,我们应该可以看出在语言层面上C++已经完全具备了并发编程需要的一切要素,在它们的基础上编写跨平台代码将更加便利,操作线程进行并发编程将变得更加容易。结合前面的介绍,最后看一段典型的生产者消费者的代码作为结束。

    class ConsumerAndProducer
    {
    private:
        std::deque<std::time_t> m_deque; //things to be consumed
        std::condition_variable m_cv;
        std::mutex m_mutex;
        bool m_quit;
    public:
        ConsumerAndProducer() : m_quit(false)
        {
    
        }
    
        static std::time_t getTimeStamp()
        {
            std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds> tp =
                    std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now());
            auto tmp = std::chrono::duration_cast<std::chrono::milliseconds>(tp.time_since_epoch());
            std::time_t timestamp = tmp.count();
            //std::time_t timestamp = std::chrono::system_clock::to_time_t(tp);
            return timestamp;
        }
    
        static void execute()
        {
            ConsumerAndProducer obj;
    
            auto consumer = [&]() -> void
            {
                while (!obj.m_quit)
                {
                    try
                    {
                        std::this_thread::sleep_for(std::chrono::milliseconds(1));
                        std::unique_lock<std::mutex> lock(obj.m_mutex);
                        obj.m_cv.wait(lock, [&]
                        {
                            return obj.m_deque.size() > 0;
                        });
                        while (obj.m_deque.size() > 0)
                        {
                            std::cout << "Consumed :" << obj.m_deque.front() << std::endl;
                            obj.m_deque.pop_front();
                        }
                    }
                    catch (...)
                    {
                        std::cout << "consumer exp" << std::endl;
                    }
                }
            };
    
            auto producer = [&]() -> void
            {
                while (!obj.m_quit)
                {
                    try
                    {
                        std::unique_lock<std::mutex> lock(obj.m_mutex);
                        std::time_t time = getTimeStamp();
                        std::cout << "Produce :" << time << std::endl;
                        obj.m_deque.push_back(time);
                        while (time % 3)
                        {
                            std::this_thread::sleep_for(std::chrono::milliseconds(100));
                            time = getTimeStamp();
                            std::cout << "Produce :" << time << std::endl;
                            obj.m_deque.push_back(time);
                        }
    
                        obj.m_cv.notify_all();
                        lock.unlock();
                    }
                    catch (...)
                    {
                        std::cout << "producer exp" << std::endl;
                    }
                }
            };
    
            auto timer = [&]() -> void
            {
                std::this_thread::sleep_for(std::chrono::seconds(3));
                obj.m_quit = true;
            };
    
            std::thread t1(consumer), t2(producer), t3(timer);
            t1.join();
            t2.join();
            t3.join();
            std::cout << "all done remain:" << obj.m_deque.size() << std::endl;
        }
    };
    
    

    相关文章

      网友评论

        本文标题:C++11新特性--并发、原子性、锁、条件变量

        本文链接:https://www.haomeiwen.com/subject/rjxezxtx.html