美文网首页
task_queue事件队列的使用,模拟解包,压缩和发送过程

task_queue事件队列的使用,模拟解包,压缩和发送过程

作者: FredricZhu | 来源:发表于2021-04-10 09:47 被阅读0次

    完整的事件队列应该还添加一个条件变量,来监控事件队列的大小。本例的事件队列比较小,不是很完整。
    程序类图如下,


    图片.png

    CMakeLists.txt

    cmake_minimum_required(VERSION 2.6)
    project(lexical_cast)
    
    add_definitions(-std=c++14)
    
    include_directories("/usr/local/include")
    link_directories("/usr/local/lib")
    file( GLOB APP_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp)
    foreach( sourcefile ${APP_SOURCES} )
        file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${sourcefile})
        string(REPLACE ".cpp" "" file ${filename})
        add_executable(${file} ${sourcefile})
        target_link_libraries(${file} boost_filesystem boost_thread boost_system boost_serialization pthread boost_chrono)
    endforeach( sourcefile ${APP_SOURCES} )
    

    main.cpp

    #include <boost/atomic.hpp>
    #include <boost/function.hpp>
    #include <boost/thread/mutex.hpp>
    #include <boost/thread/locks.hpp>
    #include <boost/thread/condition_variable.hpp>
    #include <boost/thread/thread.hpp>
    
    #include <vector>
    #include <deque>
    #include <iostream>
    
    #include <cassert>
    // 未加工的数据包
    struct data_packet {
        unsigned int value;
    }; 
    
    // 解码后的数据包
    struct decoded_data {
        unsigned int value;
    };
    
    // 压缩后的数据包
    struct compressed_data {
        unsigned int value;
    };
    
    using atomic_t = boost::atomic<unsigned int>;
    
    class subsystem1 {
        atomic_t i_;
    
    public:
        subsystem1(): i_(0) {}
    
        // 获取原始数据包
        data_packet get_data() {
            data_packet ret = { ++ i_};
            return ret;
        }
        
        // 最多运行10000次
        static const unsigned int max_runs = 10000;
        
    
        // 最多运行10000次
        bool is_stopped() const {
            return i_ == max_runs;
        } 
    };
    
    class subsystem2 {
        atomic_t i_;
    
    public:
        subsystem2(): i_(0) {}
    
        void send_data(const compressed_data& data) {
            ++ i_;
            assert(data.value == i_);
        }
    
        unsigned int send_packets_count() const {
            return i_;
        }
    };
    
    // 解码数据
    decoded_data decode_data(const data_packet& packet) {
        static unsigned int i = 0;
        ++ i;
        decoded_data ret = { packet.value };
        assert(i==packet.value);
        return ret;
    }
    
    // 压缩数据
    compressed_data compress_data(const decoded_data& packet) {
        static unsigned int i = 0;
        ++ i;
        compressed_data ret = {packet.value};
        assert(i == packet.value);
        return ret;
    }
    
    class work_queue {
    public:
        using task_type = boost::function<void()>;
    
    private:
        std::deque<task_type> tasks_;
        boost::mutex mutex_;
        boost::condition_variable cond_;
        bool is_stopped_;
    
    public:
        work_queue(): is_stopped_(false){}
        // 停止队列输入和输出
        void stop() {
            boost::lock_guard<boost::mutex> lock(mutex_);
            is_stopped_ = true;
            cond_.notify_all();
        }
    
        // 向任务队列中push任务
        void push_task(const task_type& task) {
            boost::unique_lock<boost::mutex> lock(mutex_);
            if(is_stopped_) {
                return;
            }
    
            tasks_.push_back(task);
            lock.unlock();
            cond_.notify_one();
        } 
    
        task_type pop_task() {
            boost::unique_lock<boost::mutex> lock(mutex_);
            while(tasks_.empty()) {
                if(is_stopped_) {
                    return task_type();
                }
                cond_.wait(lock);
            }
    
            task_type ret = tasks_.front();
            tasks_.pop_front();
            return ret;
        }
    };
    
    
    work_queue decoding_queue, compressing_queue, sending_queue;
    subsystem1 subs1;
    subsystem2 subs2;
    
    void do_decode(const data_packet& packet);
    void start_data_accepting() {
        while(!subs1.is_stopped()) {
            data_packet packet = subs1.get_data();
            decoding_queue.push_task([packet](){
                do_decode(packet);
            });
        }
    }
    
    void do_compress(const decoded_data& packet);
    void do_decode(const data_packet& packet) {
        decoded_data dec_packet = decode_data(packet);
        compressing_queue.push_task([dec_packet](){
            do_compress(dec_packet);
        });
    }
    
    void do_compress(const decoded_data& packet) {
        compressed_data com_packet = compress_data(packet);
        sending_queue.push_task([com_packet]() {
            subs2.send_data(com_packet);
        });
    }
    
    void run_while_not_stopped(work_queue& w_queue) {
        work_queue::task_type task;
        while(task = w_queue.pop_task()) {
            task();
        }
    }
    
    int main(int argc, char* argv[]) {
        boost::thread t_data_accepting(&start_data_accepting);
        boost::thread t_data_decoding([](){
            run_while_not_stopped(decoding_queue);
        });
        boost::thread t_data_compressing([](){
            run_while_not_stopped(compressing_queue);
        });
        boost::thread t_data_sending([](){
            run_while_not_stopped(sending_queue);
        });
    
        t_data_accepting.join();
    
        decoding_queue.stop();
        t_data_decoding.join();
    
        compressing_queue.stop();
        t_data_compressing.join();
    
        sending_queue.stop();
        t_data_sending.join();
    
        std::cerr << subs2.send_packets_count() << std::endl;
        assert(subs2.send_packets_count() == subsystem1::max_runs);
    }
    

    相关文章

      网友评论

          本文标题:task_queue事件队列的使用,模拟解包,压缩和发送过程

          本文链接:https://www.haomeiwen.com/subject/apigkltx.html