美文网首页
使用boost::thread_group和同步队列模拟Gola

使用boost::thread_group和同步队列模拟Gola

作者: FredricZhu | 来源:发表于2021-04-08 18:34 被阅读0次

    boost::thread_group前两天讲过了,比较简单,其实就是一个boost::list<boost::shared_ptr<boost::thread>> threads类型对象的一个包装,封装一组线程列表。
    同步队列的实现也是比较简单的使用了C++11提供的std::mutex和std::unique_lock对象,如果要兼容C++11之前的版本,可以将相关对象的名称空间换成 boost就可以。
    下面上代码,
    工程结构如下,


    图片.png

    utils/sync_queue.hpp

    #ifndef _SYNC_QUEUE_HPP_
    #define _SYNC_QUEUE_HPP_
    
    #include <list>
    #include <mutex>
    #include <thread>
    #include <condition_variable>
    #include <iostream>
    
    using namespace std;
    
    template <typename T> 
    class SyncQueue {
        public:
            
            SyncQueue(int maxSize): m_maxSize(maxSize), m_needStop(false) {}
    
            void Put(const T& x) {
                Add(x);
            }
    
            void Put(T&& x) {
                Add(std::forward<T>(x));
            }
            
            void Take(std::list<T>& list) {
                std::unique_lock<std::mutex> locker(m_mutex);
                m_NotEmpty.wait(locker, [this] {
                            return m_needStop || NotEmpty();
                        });
    
                if(m_needStop) {
                    return;
                }
    
                list = std::move(m_queue);
                m_NotFull.notify_one();
            }
    
    
            void Take(T& t) {
                std::unique_lock<std::mutex> locker(m_mutex);
                m_NotEmpty.wait(locker, [this] {
                            return m_needStop || NotEmpty();
                        });
                
                if(m_needStop) {
                    return;
                }
    
                t = m_queue.front();
                m_queue.pop_front();
                m_NotFull.notify_one();
            }
    
            void Stop() {
                {
                    std::lock_guard<std::mutex> locker(m_mutex);
                    m_needStop = true;
                }
    
                m_NotFull.notify_all();
                m_NotEmpty.notify_all();
            }
            
            bool Empty() {
                std::lock_guard<std::mutex> locker;
                return m_queue.empty();
            }
    
            bool Full() {
                std::lock_guard<std::mutex> locker;
                return m_queue.size() == m_maxSize;
            }
        
        private:
            
            // 判断队列未满,内部使用的无锁版,否则会发生死锁
            bool NotFull() const {
                bool full = m_queue.size() >= m_maxSize;
                return !full;
            }
    
            bool NotEmpty() const {
                bool empty = m_queue.empty();
                return !empty;
            }
    
            template <typename F> 
            void Add(F&& x) {
                std::unique_lock<std::mutex> locker(m_mutex);
                m_NotFull.wait(locker, [this]{ return m_needStop || NotFull(); });
                
                if(m_needStop) {
                    return;
                }
    
                m_queue.push_back(std::forward<F>(x));
                m_NotEmpty.notify_one();
            }
    
        private:
            std::list<T> m_queue; // 缓冲区
            std::mutex m_mutex; // 互斥量和条件变量结合起来使用
            std::condition_variable m_NotEmpty; // 不为空的条件变量
            std::condition_variable m_NotFull; // 不为满的条件变量
            int m_maxSize; // 同步队列的最大大小
            bool m_needStop; // 停止标志
    };
    #endif
    

    send_data/consts.h

    #ifndef _SEND_DATA_CONSTS_H_
    #define _SEND_DATA_CONSTS_H_
    
    namespace send_data {
        static const int DATA_SIZE = 3000;
        static const int THREAD_SIZE = 3;
    };
    #endif
    

    send_data/package_data.h

    #ifndef _PACKAGE_DATA_H_
    #define _PACKAGE_DATA_H_
    #include "../utils/sync_queue.hpp"
    #include <boost/atomic.hpp>
    
    #include <vector>
    #include <algorithm>
    namespace send_data
    {
    
        class PackageData
        {
        private:
            std::vector<int> input_;
            SyncQueue<int> &data_queue_;
    
        public:
            PackageData(std::vector<int> input, SyncQueue<int> &data_queue) : input_(input), data_queue_(data_queue)
            {
            }
    
            void package_data() const
            {
                for (int i = 0; i < input_.size(); ++i)
                {
                    data_queue_.Put(input_[i]);
                }
            }
        };
    };
    
    #endif
    

    send_data/send_data.h

    #ifndef _SEND_DATA_H_
    #define _SEND_DATA_H_
    #include "consts.h"
    #include "../utils/sync_queue.hpp"
    namespace send_data
    {
        class SendData
        {
        private:
    
            SyncQueue<int> &data_queue_;
    
        public:
            SendData(SyncQueue<int> &data_queue) : data_queue_(data_queue) {}
    
            void send_data()
            {
                int ele;
                for (int i = 0; i < DATA_SIZE; ++i)
                {
                    data_queue_.Take(ele);
                    std::cerr << "data: " << ele << " is been sent!" << std::endl;
                }
            }
        };
    };
    
    #endif
    

    main.cpp

    #include "send_data/package_data.h"
    #include "send_data/send_data.h"
    #include "utils/sync_queue.hpp"
    
    #include <boost/thread.hpp>
    
    using namespace send_data;
    
    int main(int argc, char* argv[]) {
        boost::thread_group package_data_g;
        boost::thread_group send_data_g;
    
        SyncQueue<int> d_queue(THREAD_SIZE * DATA_SIZE);
    
        // std::vector<int> a{1,2,3};
        //   // 1 + 0 *3
        //   // 1 + 1 *3 
        //   // 1 + 2
        // std::vector<int> b{4,5, 6};
        // std::vector<int> c{7,8,9};
        
        for(int i=0; i<THREAD_SIZE; ++i) {
            std::vector<int> a;
            for(int j=0; j<DATA_SIZE; ++j) {
                a.emplace_back(1 + i*DATA_SIZE +j);
            }
    
            PackageData pkg_data(a, d_queue);
            package_data_g.create_thread([pkg_data]() {
                pkg_data.package_data();
            });
        }
       
        SendData sdata(d_queue);
        for(int i=0; i<THREAD_SIZE; ++i) {    
            send_data_g.create_thread([&sdata]() {
                sdata.send_data();
            });
        }
    
        package_data_g.join_all();
        send_data_g.join_all();
    }
    

    程序会将 1-9000拆成三个批次放入 sync_queue,然后从sync_queue中把1-9000全部读出来,如下图所示,


    图片.png

    这种方式存在一些限制,比如 打包的线程和发送的线程 个数必须一样,不然会造成条件变量的死等。
    所以生产实践中不建议使用,可以使用下一节讲述的boost::asio::io_service对象来处理,效果更好,限制更少。

    相关文章

      网友评论

          本文标题:使用boost::thread_group和同步队列模拟Gola

          本文链接:https://www.haomeiwen.com/subject/ramvkltx.html