美文网首页
一文搞懂C++消息队列

一文搞懂C++消息队列

作者: 晴空一垩 | 来源:发表于2019-08-08 23:22 被阅读0次

    消息队列

    消息队列在工作过程中,是个非常常用的基础知识点。

    用于将一些任务放置在同一个线程中执行。一来不会对主线程产生影响,二来针对有先后关系的任务可以更好的维护。

    下面,我们开始简单写个消息队列。希望大家都能理解。

    单任务处理队列

    这里采用qml进行验证,有一个按钮用于创建消息。对于这里讲述的qml与c++之间的通讯不熟悉的可以看我之前写的文章。

    首先我们创建两个文件 ua4qml2.h,ua4qml2.cpp,内容分别是:

    #ifndef UA4QML2_H
    #define UA4QML2_H
    
    #include <QObject>
    #include <deque>
    #include <string>
    #include <mutex>
    #include <thread>
    #include <semaphore.h>
    
    //消息队列中存放的消息结构体
    struct Message{
        uint32_t type;
        std::string cmd;
    };
    
    class UA4Qml2 : public QObject
    {
        Q_OBJECT
    public:
        UA4Qml2(QObject *parent = 0);
    
        void sendMsg(const Message& msg);
    
    signals:
    
    public slots:
        void msgQueueSend(const QString str);
    private:
        //处理消息队列
        void startProcMsg();
        static void* proc(void*);
    private :
        std::deque<Message*> m_dequeMsgs;
        std::mutex m_mutex;
        sem_t m_sem;
        pthread_t m_thread;
    };
    
    #endif // UA4QML2_H
    
    
    #include "ua4qml2.h"
    #include <iostream>
    
    UA4Qml2::UA4Qml2(QObject *parent) : QObject(parent)
    {
        //创建消息队列
        startProcMsg();
    }
    
    void UA4Qml2::sendMsg(const Message&  msg)
    {
        //往消息队列中,增加事件
        m_mutex.lock();
        m_dequeMsgs.emplace_back(new Message(msg));
        m_mutex.unlock();
        //通知分发事件
        sem_post(&m_sem);
    }
    
    //通过qml的按钮来进行操作
    void UA4Qml2::msgQueueSend(const QString str)
    {
        Message msg;
        msg.cmd = str.toStdString();
        msg.type = 1000;
        sendMsg(msg);
    }
    
    void UA4Qml2::startProcMsg()
    {
        std::cout<<"进入消息队列"<<std::endl;
        std::thread t([this](){
            while(true){
                // 如果没有事件通知过来,则持续等待,这里使用sem_wait,可以使用别的方式来卡住线程,否则对cpu是种很大的消耗
                sem_wait(&m_sem);
                //如果有消息到来就不会开启等待
                std::cout<<"开始"<<std::endl;
                std::deque<Message*> dequeMsgs;
                //对于双端队列的操作需要加锁
                m_mutex.lock();
                // 这里进行内存的交换
                dequeMsgs.swap(m_dequeMsgs);
                m_mutex.unlock();
                for(auto it=dequeMsgs.begin();it!=dequeMsgs.end();it++){
                    //处理任务,可以根据不同的type来选择不同的处理方式
                    std::cout<<(*it)->type<<":::"<<(*it)->cmd<<std::endl;
                    delete *it;
                }
                //这里注意下,我手动停止了2s,来模拟有个任务消耗过多的时间
                sleep(2);
                std::cout<<"结束"<<std::endl;
            }
        });
        t.detach();
    }
    

    接下来是qml的点击,对于怎么和c++通讯还是看 上篇文章

    QQmlApplicationEngine *engine = new QQmlApplicationEngine();
    engine->rootContext()->setContextProperty("$SigDispatcher", new UA4Qml2);
    engine->load(QUrl(QStringLiteral("qrc:/main.qml")));
    
    import QtQuick.Controls 1.4
    
    ApplicationWindow {
        id: applicationWindow
        visible: true
        width: 640
        height: 480
        Button{
            id:msgQueue
            text: "消息队列"
            anchors.top: parent.top
            anchors.topMargin: 10
            onClicked: {
                //调用C++ 的方法
                $SigDispatcher.msgQueueSend("hello world");
            }
        }
    }
    

    接下来看输出

    开始
    1000:::hello world
    结束
    开始
    1000:::hello world
    1000:::hello world
    1000:::hello world
    结束
    开始
    结束
    开始
    结束
    

    解析

    1. 使用linux中的信号量来进行等待操作,每调用sem_post 一次表示信号量+1,没调用 sem_wait 一次表示信号量-1 直到为0 则处于等待。

    2. 使用双端队列进行消息的存储。使用swap进行内存的交换,减少内存的拷贝。当然这里你们也可以使用别的数据结构来进行消息的存储。

    3. 有个死循环,一直持续处理消息,注意这里需要卡住线程 sem_wait

    4. 注意对存储的消息体操作的时候,需要加锁

    看到这里你是否有发现存在的问题呢?

    如果采用单线程的消息队列,则会有个问题,就是处理的消息不能有延时。一旦一个消息处理过慢,则对之后所有的消息都会产生影响。

    多任务处理队列

    #include "ua4qml2.h"
    #include <iostream>
    
    UA4Qml2::UA4Qml2(QObject *parent) : QObject(parent)
    {
        startProcMsg();
    }
    
    void UA4Qml2::sendMsg(const Message&  msg)
    {
        //往消息队列中,增加事件,注意加锁
        m_mutex.lock();
        m_dequeMsgs.emplace_back(new Message(msg));
        m_mutex.unlock();
        sem_post(&m_sem);
    }
    
    void UA4Qml2::msgQueueSend(const QString str)
    {
        Message msg;
        msg.cmd = str.toStdString();
        msg.type = 1000;
        sendMsg(msg);
    }
    
    void UA4Qml2::startProcMsg()
    {
        std::cout<<"进入消息队列"<<std::endl;
        //c++ 中采用#if 的方式来进行代码块的注释,这是java所没有的
        //这里采用了两种不同的线程方式,c开线程和c++开线程的两种方式
    #if 1
        for(size_t i=0;i<4;++i)
        {
            pthread_create(&m_thread,NULL,proc,this);
        }
    #else
        for(size_t i=0;i<4;++i)
        {
            std::thread t([this](){
                UA4Qml2::proc(this);
            });
            t.detach();
        }
    #endif
    }
    
    void* UA4Qml2::proc(void* data)
    {
        UA4Qml2* uA4Qml2 = (UA4Qml2*)data;
        while(true){
            sem_wait(&uA4Qml2->m_sem);
            //如果有消息到来就会执行
            uA4Qml2->m_mutex.lock();
            //这里是一个一个的获取,而非一次性获取所有的消息
            Message* it = *(uA4Qml2->m_dequeMsgs.begin());
            uA4Qml2->m_dequeMsgs.pop_front();
            uA4Qml2->m_mutex.unlock();
            std::cout<<(*it).type<<":::"<<(*it).cmd<<std::this_thread::get_id()<<std::endl;
            std::cout<<"卡住结束"<<std::endl;
            sleep(2);
            delete it;
        }
        return nullptr;
    }
    

    看下结果打印:

    进入消息队列
    1000:::hello world140150117496576 #【】
    卡住结束
    1000:::hello world140150100711168
    卡住结束
    1000:::hello world140150109103872
    卡住结束
    1000:::hello world140150017357568
    卡住结束
    1000:::hello world140150117496576 #【】
    卡住结束
    

    解析

    1. 这里使用了4个线程,来做多线程消息队列。
    2. 这里要注意因为使用了 Message* 所以要注意内存的释放与申请
    3. c++ 可以用宏定义 #if #error #define 来完成很多事情,c++的优点哦

    欢迎关注我的微信公众号:
    程序员学习分享录

    相关文章

      网友评论

          本文标题:一文搞懂C++消息队列

          本文链接:https://www.haomeiwen.com/subject/iejqjctx.html