美文网首页
用C++实现数据总线的方法系列(上):基本概念&同步队列

用C++实现数据总线的方法系列(上):基本概念&同步队列

作者: JasonLiThirty | 来源:发表于2020-03-11 20:57 被阅读0次

    视频教程:https://www.bilibili.com/video/av94487439

    本文主要介绍多线程中数据同步的方法,技术包括:线程锁,同步变量,原子变量,消息处理等;以及三种同步队列的实现方法。

    std::unique_lock

    • 与std:::lock_gurad基本一致,但更加灵活的锁管理类模板,构造时是否加锁是可选的,在对象析构时如果持有锁会自动释放锁,所有权可以转移。对象生命期内允许手动加锁和释放锁。但提供了更好的上锁和解锁控制接口(lock,try_lock,try_lock_for,try_lock_until 和unlock)

    条件变量

    • 条件变量可以阻塞一个或多个线程,直到收到另外一个线程发出的通知,或者超时了才会唤醒当前阻塞的线程。

    类型

    • condition_variable,配合std::unique_lock<std::mutex>进行操作
    • condition_variable_any,配合任意带有lock,unlock语义的mutex进行操作
      • 比较灵活,更通用,对所有的锁都适用
      • 效率比condition_variable

    成员函数

    • notify_one 通知一个等待线程(public)
    • notify_all 通知所有等待线程(public)

    notify_one()和notify_all()都是Object对象用于通知处在等待该对象的线程的方法,但notify_one是通知一个线程获取锁,notify_all是通知所有相关的线程去竞争锁。

    • wait 阻塞当前线程直至条件变量被唤醒(public)
    • wait_for 阻塞当前线程直至条件变量被唤醒或超时(public)
    • wait_until 阻塞当前线程直至条件变量被唤醒或直到指定的时间点(public)

    执行过程

    • 拥有条件变量的线程首先获取互斥量
    • 然后循环检查某个条件,如果条件不满足,释放互斥量,同时阻塞该线程直到条件满足;如果条件满足,则向下执行。
    • 另一个线程获取互斥量,执行完成后调用条件变量的notify_one或notify_all唤醒一个或者所有等待线程。

    简洁写法及wait机制

    std::unique_lock<std::mutex> lck(m_mtRun);
    m_cvRun.wait(lck, [this]{ return m_runDown; });
    
    • 条件变量首先检查判断式是否满足条件,例如上例中的m_runDown是否为true
    • 如果不满足条件,释放mutex,将线程置为wait状态,继续等待唤醒
    • 如果满足条件,重新获取mutex,线程结束wait状态,继续向下执行
    • 这里需要注意的是,wait状态的线程被唤醒,但判断式不满足条件,****即假唤醒****,条件变量将继续释放mutex,将线程置为wait状态,继续等待下一次的唤醒

    基本示例-wait, wait_for和假唤醒

    #include <iostream>                
    #include <thread>               
    #include <mutex>         
    #include <list>
    #include <condition_variable>   
    #include <windows.h>
    
    bool                     completed;
    std::mutex               mtRun;
    std::condition_variable  cvRun;
    
    void Wait()
    {
        std::unique_lock<std::mutex> lck(mtRun);
    
        std::cout <<"Thread_"<<std::this_thread::get_id() << " is waiting..." << std::endl;
        cvRun.wait(lck, []() {
            return completed;
        });
    
        std::cout << "Thread_" << std::this_thread::get_id() << " is completed" << std::endl;
    }
    
    void Wait_For()
    {
        std::unique_lock<std::mutex> lck(mtRun);
    
        std::cout << "Thread_" << std::this_thread::get_id() << " is waiting..." << std::endl;
        if (!cvRun.wait_for(lck, std::chrono::seconds(4), []() {
        //if (!cvRun.wait_for(lck, std::chrono::seconds(2), []() {
            return completed;
        }))
        {
            std::cout << "Thread_" << std::this_thread::get_id() << " time out!" << std::endl;
        }
        else
        {
            std::cout << "Thread_" << std::this_thread::get_id() << " is completed" << std::endl;
        }
    }
    
    void Completed()
    {
        {
            std::cout << "Thread_" << std::this_thread::get_id() << " set completed" << std::endl;
            std::unique_lock<std::mutex> lck(mtRun);
            completed = true;
        }
        cvRun.notify_all();
    }
    
    void FakeCompleted()
    {
        {
            std::cout << "Thread_" << std::this_thread::get_id() << " not set completed" << std::endl;
            std::unique_lock<std::mutex> lck(mtRun);
            completed = false;
        }
        cvRun.notify_all();
    }
    
    int main()
    {
        //Wait
        completed = false;
        std::thread thWait(Wait);
        thWait.detach();
        Sleep(3000);
        std::thread thCompleted(Completed);
        thCompleted.join();
        Sleep(3000);
        //Waitfor
        //completed = false;
        //std::thread thWait(Wait_For);
        //thWait.detach();
        //Sleep(3000);
        //std::thread thCompleted(Completed);
        //thCompleted.join();
        //Sleep(3000);
        //Fake
        /*completed = false;
        std::thread thWait(Wait_For);
        thWait.detach();
        Sleep(3000);
        std::thread thCompleted(FakeCompleted);
        thCompleted.join();
        Sleep(3000);*/
        return 0;
    }
    

    原子变量

    • 使用原子变量不需要使用互斥量来保护这个变量,使用更简洁。
    • C++11提供个原子类型std::atomic<T>, 可以使用任意类型作为参数模板,同时也内置了基础类型的原子变量。
    typedef atomic<bool> atomic_bool;
    typedef atomic<char> atomic_char;
    typedef atomic<signed char> atomic_schar;
    typedef atomic<unsigned char> atomic_uchar;
    typedef atomic<short> atomic_short;
    typedef atomic<unsigned short> atomic_ushort;
    typedef atomic<int> atomic_int;
    typedef atomic<unsigned int> atomic_uint;
    typedef atomic<long> atomic_long;
    typedef atomic<unsigned long> atomic_ulong;
    typedef atomic<long long> atomic_llong;
    typedef atomic<unsigned long long> atomic_ullong;
    typedef atomic<char16_t> atomic_char16_t;
    typedef atomic<char32_t> atomic_char32_t;
    typedef atomic<wchar_t> atomic_wchar_t;
    typedef atomic<int8_t> atomic_int8_t;
    typedef atomic<uint8_t> atomic_uint8_t;
    typedef atomic<int16_t> atomic_int16_t;
    typedef atomic<uint16_t> atomic_uint16_t;
    typedef atomic<int32_t> atomic_int32_t;
    typedef atomic<uint32_t> atomic_uint32_t;
    typedef atomic<int64_t> atomic_int64_t;
    typedef atomic<uint64_t> atomic_uint64_t;
    typedef atomic<int_least8_t> atomic_int_least8_t;
    typedef atomic<uint_least8_t> atomic_uint_least8_t;
    typedef atomic<int_least16_t> atomic_int_least16_t;
    typedef atomic<uint_least16_t> atomic_uint_least16_t;
    typedef atomic<int_least32_t> atomic_int_least32_t;
    typedef atomic<uint_least32_t> atomic_uint_least32_t;
    typedef atomic<int_least64_t> atomic_int_least64_t;
    typedef atomic<uint_least64_t> atomic_uint_least64_t;
    typedef atomic<int_fast8_t> atomic_int_fast8_t;
    typedef atomic<uint_fast8_t> atomic_uint_fast8_t;
    typedef atomic<int_fast16_t> atomic_int_fast16_t;
    typedef atomic<uint_fast16_t> atomic_uint_fast16_t;
    typedef atomic<int_fast32_t> atomic_int_fast32_t;
    typedef atomic<uint_fast32_t> atomic_uint_fast32_t;
    typedef atomic<int_fast64_t> atomic_int_fast64_t;
    typedef atomic<uint_fast64_t> atomic_uint_fast64_t;
    
    typedef atomic<intptr_t> atomic_intptr_t;
    typedef atomic<uintptr_t> atomic_uintptr_t;
    typedef atomic<size_t> atomic_size_t;
    typedef atomic<ptrdiff_t> atomic_ptrdiff_t;
    typedef atomic<intmax_t> atomic_intmax_t;
    typedef atomic<uintmax_t> atomic_uintmax_t;
    //
    typedef signed char        int8_t;
    typedef short              int16_t;
    typedef int                int32_t;
    typedef long long          int64_t;
    typedef unsigned char      uint8_t;
    typedef unsigned short     uint16_t;
    typedef unsigned int       uint32_t;
    typedef unsigned long long uint64_t;
    
    typedef signed char        int_least8_t;
    typedef short              int_least16_t;
    typedef int                int_least32_t;
    typedef long long          int_least64_t;
    typedef unsigned char      uint_least8_t;
    typedef unsigned short     uint_least16_t;
    typedef unsigned int       uint_least32_t;
    typedef unsigned long long uint_least64_t;
    
    typedef signed char        int_fast8_t;
    typedef int                int_fast16_t;
    typedef int                int_fast32_t;
    typedef long long          int_fast64_t;
    typedef unsigned char      uint_fast8_t;
    typedef unsigned int       uint_fast16_t;
    typedef unsigned int       uint_fast32_t;
    typedef unsigned long long uint_fast64_t;
    
    typedef long long          intmax_t;
    typedef unsigned long long uintmax_t;
    
    • 以下写法是一样
    std::atomic_int                             m_standbyIdIndex;
    std::atomic<int>                            m_standbyIdIndex;
    

    call_once&once_flag

    • 如果多个线程需要同时调用某个函数,std::call_once 可以保证多个线程对该函数只调用一次。
    • 需要一个std::once_flag作为std::call_once的入参
    std::once_flag m_flag;
    std::call_once(m_flag, [this](){StopExecute(); });
    

    同步队列

    基本同步队列

    #include <iostream>
    #include <thread>  
    #include <mutex>         
    #include <list>
    #include <condition_variable>  
    #include <windows.h>
    
    class SyncQueue
    {
    public:
        SyncQueue()
        {
    
        }
    
        void Push(const int& x)
        {
            {
                std::unique_lock<std::mutex> lck(m_mutex);
                m_queue.push_back(x);
            }
            m_notEmpty.notify_all();
        }
    
        void Pop(int& x)
        {
            std::unique_lock<std::mutex> lck(m_mutex);
            m_notEmpty.wait(lck, [this]() {
                return !m_queue.empty();
            });
            x = m_queue.front();
            m_queue.pop_front();
        }
    
        bool Empty()
        {
            std::lock_guard<std::mutex> lck(m_mutex);
            return m_queue.empty();
        }
    
        size_t Size()
        {
            std::lock_guard<std::mutex> lck(m_mutex);
            return m_queue.size();
        }
    
    private:
        std::list<int>          m_queue;
        std::mutex              m_mutex;
        std::condition_variable m_notEmpty;
    };
    
    SyncQueue queue;
    
    void GetData()
    {
        int x = 0;
        while (queue.Empty())
        {
            queue.Pop(x);
            std::cout << "Thread_" << std::this_thread::get_id() << "---- Pop " << x << std::endl;
            if (x == 0)
            {
                break;
            }
        }
        std::cout << "Thread_" << std::this_thread::get_id() << "---- Pop End!" << std::endl;
    }
    
    void SetData()
    {
        for (int i = 10; i >= 0; i--)
        {
            Sleep(1000);
            std::cout << "Thread_" << std::this_thread::get_id() << "---- Push " << i << std::endl;
            queue.Push(i);
        }
        Sleep(500);
        std::cout << "Thread_" << std::this_thread::get_id() << "---- Push End!" << std::endl;
    }
    
    int main()
    {
        std::thread thGet(GetData);
        thGet.detach();
        std::thread thSet(SetData);
        thSet.join();
        return 0;
    }
    //output
    Thread_27072---- Push 1
    Thread_26712---- Pop 1
    Thread_27072---- Push 2
    Thread_26712---- Pop 2
    Thread_27072---- Push 3
    Thread_26712---- Pop 3
    Thread_27072---- Push 4
    Thread_26712---- Pop 4
    Thread_27072---- Push 5
    Thread_26712---- Pop 5
    Thread_27072---- Push 6
    Thread_26712---- Pop 6
    Thread_27072---- Push 7
    Thread_26712---- Pop 7
    Thread_27072---- Push 8
    Thread_26712---- Pop 8
    Thread_27072---- Push 9
    Thread_26712---- Pop 9
    Thread_27072---- Push 10
    Thread_26712---- Pop 10
    Thread_26712---- Pop End!
    Thread_27072---- Push End!
    

    带外部控制的同步队列

    #include <iostream>
    #include <thread>  
    #include <mutex>         
    #include <list>
    #include <condition_variable>  
    #include <windows.h>
    #include <atomic>
    
    class SyncQueue
    {
    public:
        SyncQueue()
        {
    
        }
    
        void Push(const int& x)
        {
            {
                std::unique_lock<std::mutex> lck(m_mutex);
                m_queue.push_back(x);
            }
            m_notEmpty.notify_all();
        }
    
        void Pop(int& x)
        {
            std::unique_lock<std::mutex> lck(m_mutex);
            m_notEmpty.wait(lck, [this]() {
                return !m_queue.empty();
            });
            x = m_queue.front();
            m_queue.pop_front();
        }
    
        bool Empty()
        {
            std::lock_guard<std::mutex> lck(m_mutex);
            return m_queue.empty();
        }
    
        size_t Size()
        {
            std::lock_guard<std::mutex> lck(m_mutex);
            return m_queue.size();
        }
    
    private:
        std::list<int>          m_queue;
        std::mutex              m_mutex;
        std::condition_variable m_notEmpty;
    };
    
    SyncQueue queue;
    std::atomic_bool getStop = false;
    
    void GetData()
    {
        int x = 0;
        while (queue.Empty())
        {
            if (getStop)
            {
                break;
            }
            queue.Pop(x);
            std::cout << "Thread_" << std::this_thread::get_id() << "---- Pop " << x << std::endl;
        }
        std::cout << "Thread_" << std::this_thread::get_id() << "---- Pop End!" << std::endl;
    }
    
    void SetData()
    {
        for (int i = 10; i >= 0; i--)
        {
            Sleep(1000);
            std::cout << "Thread_" << std::this_thread::get_id() << "---- Push " << i << std::endl;
            queue.Push(i);
            if (i == 5)
            {
                getStop = true;
            }
        }
    
        std::cout << "Thread_" << std::this_thread::get_id() << "---- Push End!" << std::endl;
    }
    
    int main()
    {
        std::thread thGet(GetData);
        thGet.detach();
        std::thread thSet(SetData);
        thSet.join();
        return 0;
    }
    //output
    Thread_29616---- Push 10
    Thread_30076---- Pop 10
    Thread_29616---- Push 9
    Thread_30076---- Pop 9
    Thread_29616---- Push 8
    Thread_30076---- Pop 8
    Thread_29616---- Push 7
    Thread_30076---- Pop 7
    Thread_29616---- Push 6
    Thread_30076---- Pop 6
    Thread_29616---- Push 5
    Thread_30076---- Pop 5
    Thread_30076---- Pop End!
    Thread_29616---- Push 4
    Thread_29616---- Push 3
    Thread_29616---- Push 2
    Thread_29616---- Push 1
    Thread_29616---- Push 0
    Thread_29616---- Push End!
    

    带超时的同步队列

    #include <iostream>
    #include <thread>  
    #include <mutex>         
    #include <list>
    #include <condition_variable>  
    #include <windows.h>
    #include <atomic>
    
    
    class SyncQueue
    {
    public:
        SyncQueue()
        {
    
    
        }
    
    
        void Push(const int& x)
        {
            {
                std::unique_lock<std::mutex> lck(m_mutex);
                m_queue.push_back(x);
            }
            m_notEmpty.notify_all();
        }
    
    
        bool Pop(int& x)
        {
            std::unique_lock<std::mutex> lck(m_mutex);
    
    
            if (m_notEmpty.wait_for(lck, std::chrono::seconds(1), [this]() {
                return !m_queue.empty();
            }))
            {
                x = m_queue.front();
                m_queue.pop_front();
                return true;
            }
            else
            {
                return false;
            }
            
        }
    
    
        bool Empty()
        {
            std::lock_guard<std::mutex> lck(m_mutex);
            return m_queue.empty();
        }
    
    
        size_t Size()
        {
            std::lock_guard<std::mutex> lck(m_mutex);
            return m_queue.size();
        }
    
    
    private:
        std::list<int>          m_queue;
        std::mutex              m_mutex;
        std::condition_variable m_notEmpty;
    };
    
    
    SyncQueue queue;
    std::atomic_bool getStop = false;
    
    
    void GetData()
    {
        int x = 0;
        while (queue.Empty())
        {
            if (getStop)
            {
                break;
            }
            if (queue.Pop(x))
            {
                std::cout << "Thread_" << std::this_thread::get_id() << "---- Pop " << x << std::endl;
            }
            else
            {
                std::cout << "Thread_" << std::this_thread::get_id() << "---- Get Data Time out" << std::endl;
            }
        }
            
        std::cout << "Thread_" << std::this_thread::get_id() << "---- Pop End!" << std::endl;
    }
    
    
    void SetData()
    {
        for (int i = 10; i >= 0; i--)
        {
            Sleep(100);
            std::cout << "Thread_" << std::this_thread::get_id() << "---- Push " << i << std::endl;
            queue.Push(i);
            if (i <= 5)
            {
                Sleep(2000);
            }
        }
        getStop = true;
        Sleep(500);
        std::cout << "Thread_" << std::this_thread::get_id() << "---- Push End!" << std::endl;
    }
    
    
    int main()
    {
        std::thread thGet(GetData);
        thGet.detach();
    
    
        std::thread thSet(SetData);
        thSet.join();
    
    
        return 0;
    }
    
    //output
    Thread_18908---- Push 10
    Thread_2204---- Pop 10
    Thread_18908---- Push 9
    Thread_2204---- Pop 9
    Thread_18908---- Push 8
    Thread_2204---- Pop 8
    Thread_18908---- Push 7
    Thread_2204---- Pop 7
    Thread_18908---- Push 6
    Thread_2204---- Pop 6
    Thread_18908---- Push 5
    Thread_2204---- Pop 5
    Thread_2204---- Get Data Time out
    Thread_2204---- Get Data Time out
    Thread_18908---- Push 4
    Thread_2204---- Pop 4
    Thread_2204---- Get Data Time out
    Thread_2204---- Get Data Time out
    Thread_18908---- Push 3
    Thread_2204---- Pop 3
    Thread_2204---- Get Data Time out
    Thread_2204---- Get Data Time out
    Thread_18908---- Push 2
    Thread_2204---- Pop 2
    Thread_2204---- Get Data Time out
    Thread_2204---- Get Data Time out
    Thread_18908---- Push 1
    Thread_2204---- Pop 1
    Thread_2204---- Get Data Time out
    Thread_2204---- Get Data Time out
    Thread_18908---- Push 0
    Thread_2204---- Pop 0
    Thread_2204---- Get Data Time out
    Thread_2204---- Get Data Time out
    Thread_2204---- Pop End!
    Thread_18908---- Push End!
    

    相关文章

      网友评论

          本文标题:用C++实现数据总线的方法系列(上):基本概念&同步队列

          本文链接:https://www.haomeiwen.com/subject/mpahjhtx.html