美文网首页
用C++实现数据总线的方法系列(下):消息总线和消息处理

用C++实现数据总线的方法系列(下):消息总线和消息处理

作者: JasonLiThirty | 来源:发表于2020-03-14 09:54 被阅读0次

    视频教程:https://www.bilibili.com/video/av95994293/

    本文实现一个完整功能的消息总线MessageBus,同时介绍下消息的处理方法。

    这里定义了消息类型的枚举MesageType,消息优先级的枚举MessagePriority,以及消息的结构类Message,它包含消息类型(type),消息优先级(priority)和消息数据(info)。同时定义了MessagePtr作为Message的指针指针类。

    消息总线MessageBus的实现和数据总线一样,需要有存放消息的消息链表,构成锁的互斥量和用于多线程同步的条件变量,同时也需要具有最基本的Push和Pop函数函数。

    在Push函数里,会New出一个Message对象,存放需要发送的消息数据,然后将这个对象的指针放入到消息链表中,而在Pop函数里,得到这个对象的指针,并作为初始化参数传递给一个MessagePtr智能指针,外部调用处理完消息后,消息对象会被释放。

    消息类型,结构和总线实现

    #ifndef MESSAGE_BUS_H
    #define MESSAGE_BUS_H
    
    
    #include<string>
    #include<mutex>
    #include<list>
    #include<condition_variable>
    
    
    enum MessageType
    {
        MESSAGE_NONE = 0,
        MESSAGE_START,
        MESSAGE_PROCESSING,
        MESSAGE_DONE,
        MESSAGE_EXCEPTION
    };
    enum MessagePriority
    {
        MP_LOW = 0,
        MP_NORMAL,
        EP_HIGH
    };
    
    class Message
    {
    public:
        Message(): m_type(MESSAGE_NONE), m_priority(MP_NORMAL)
        {
    
        }
        Message(MessageType type, std::string info, MessagePriority priority) : m_type(type), m_info(info), m_priority(priority)
        {
    
    
        }
        ~Message() {}
    
        MessageType Type()
        {
            return m_type;
        }
    
        MessagePriority Priority()
        {
            return m_priority;
        }
        std::string Info()
        {
            return m_info;
        }
    
    private:
        MessageType         m_type;
        MessagePriority     m_priority;
        std::string         m_info;
    };
    
    using MessagePtr = std::shared_ptr<Message>;
    
    class MessageBus
    {
    public:
        static void Clear();
    
        static void Push(Message* pMessage);
    
        static void Push(MessageType type);
        static void Push(MessageType type, std::string info);
    
        static void Push(MessageType type, std::string info, MessagePriority priority);
    
        static MessagePtr Pop();
    private:
        static std::mutex                 s_mt;
        static std::condition_variable    s_cv;
        static std::list<Message*>        s_queue;
    };
    
    #endif
    
    #include "MessageBus.h"
    
    std::mutex                  MessageBus::s_mt;
    std::condition_variable     MessageBus::s_cv;
    std::list<Message*>         MessageBus::s_queue;
    
    void MessageBus::Clear()
    {
        std::list<Message*>::iterator it = s_queue.begin();
        for (; it != s_queue.end(); it++)
        {
            delete *it;
        }
        s_queue.clear();
    }
    
    void MessageBus::Push(Message* pMessage)
    {
        std::unique_lock<std::mutex> lock(s_mt);
        s_queue.push_back(pMessage);
        s_cv.notify_all();
    }
    
    void MessageBus::Push(MessageType type)
    {
        Push(type, std::string(""), MessagePriority::MP_NORMAL);
    }
    
    void MessageBus::Push(MessageType type, std::string info)
    {
        Push(type, info, MessagePriority::MP_NORMAL);
    }
    
    void MessageBus::Push(MessageType type, std::string info, MessagePriority priority)
    {
        Message *pMessage = new Message(type, info, priority);
        Push(pMessage);
    }
    
    MessagePtr MessageBus::Pop()
    {
        Message* pMessage = NULL;
        {
            std::unique_lock<std::mutex> lock(s_mt);
            while (s_queue.empty())
            {
                s_cv.wait(lock);
            }
            pMessage = s_queue.front();
            s_queue.pop_front();
        }
        return MessagePtr(pMessage);
    }
    

    消息的处理

    首先先实现一个消息处理基类MessageHandler,这个类的On函数开启消息接收,OFF函数取消消息接收。

    这个类是怎么来实现消息处理的呢?主要依赖于这个类的messageMap成员,它是一个unorderedmap,key实现消息类型,value是消息响应函数,这也就将每种消息和相应的消息响应函数关联在一起了。这个关联关系是由RegisterMessageFunc函数完成的,也就是我们通常所说的注册消息响应函数。

    在MessageHandler开启消息接收后,首先调用继承类的InitMessageMap函数将消息响应函数注册好,然后一直在MessageBus里取出消息,调用InvokeHanlderFunc函数,这个函数会根据消息的类型查找到相应的消息响应函数,然后调用该函数并把消息作为入参传递进该函数进行处理。

    消息处理基类MessageHandler

    #ifndef MESSAGE_HANDLER_H
    #define MESSAGE_HANDLER_H
    
    #include<atomic>
    #include<iostream>
    #include<unordered_map>
    #include"MessageBus.h"
    
    template<typename T>
    class MessageHandler
    {
    public:
        MessageHandler() :m_running(false)
        {
        }
    
        ~MessageHandler()
        {
            OFF();
        }
    
        void OFF()
        {
            std::cout << "Message Handling OFF!" << std::endl;
            m_running = false;
        }
        void ON()
        {
            std::cout << "Message Handling ON!" << std::endl;
            m_running = true;
            static_cast<T*>(this)->InitMessageMap();
            while (m_running)
            {
                MessagePtr pMessage = MessageBus::Pop();
                m_running = InvokeHanlderFunc(*pMessage);
            }
        }
    
    protected:
        using MessageHandlerFunc = bool (T::*)(Message&);
        using MessageMap = std::unordered_map<MessageType, MessageHandlerFunc>;
    
        MessageMap m_messageMap;
    
        void RegisterMessageFunc(MessageType type, MessageHandlerFunc func)
        {
            m_messageMap[type] = func;
        }
    
        bool InvokeHanlderFunc(Message& message)
        {
            auto it = m_messageMap.find(message.Type());
            if (it != m_messageMap.end())
            {
                T* pThis = static_cast<T*>(this);
                return (pThis->*(it->second))(message);
            }
            else
            {
                std::cout << "Message Type: " << message.Type() << " not handler function!" << std::endl;
                return false;
            }
        }
    
        void Clear()
        {
            m_messageMap.clear();
        }
    private:
        std::atomic_bool m_running;
    };
    #endif
    

    消息处理业务类MessageProcessor

    实际的的消息处理类,实现了一些消息响应函数,然后在InitMessageMap函数里调用了基类的RegisterMessageFunc,将这些消息响应函数注册到messageMap里。

    #ifndef MESSAGE_PROCESSOR_H
    #define MESSAGE_PROCESSOR_H
    
    
    #include "MessageHandler.h"
    
    
    class MessageProcessor : public MessageHandler<MessageProcessor>
    {
    public:
        MessageProcessor();
        ~MessageProcessor();
    
        void InitMessageMap();
    
    private:
        bool OnStart(Message& message);
        bool OnProcessing(Message& message);
        bool OnDone(Message& message);
        bool OnException(Message& message);
    };
    
    #endif
    
    #include "MessageProcessor.h"
    MessageProcessor::MessageProcessor()
    {
    }
    
    MessageProcessor::~MessageProcessor()
    {
    }
    
    void MessageProcessor::InitMessageMap()
    {
        Clear();
        RegisterMessageFunc(MessageType::MESSAGE_START, &MessageProcessor::OnStart);
        RegisterMessageFunc(MessageType::MESSAGE_PROCESSING, &MessageProcessor::OnProcessing);
        RegisterMessageFunc(MessageType::MESSAGE_DONE, &MessageProcessor::OnDone);
        RegisterMessageFunc(MessageType::MESSAGE_EXCEPTION, &MessageProcessor::OnException);
    }
    
    bool MessageProcessor::OnStart(Message& message)
    {
        std::cout << "[Work Start]" << std::endl;
        return true;
    }
    
    bool MessageProcessor::OnProcessing(Message& message)
    {
        std::cout << "[Processing]......" << message.Info().c_str() << "%" << std::endl;
        return true;
    }
    
    bool MessageProcessor::OnDone(Message& message)
    {
        std::cout << "[Work Done]" << std::endl;
        return false;
    }
    
    bool MessageProcessor::OnException(Message& message)
    {
        std::cout << "[Work Exception!!!]" << std::endl;
        return true;
    }
    

    调用执行

    首先在一个线程里开启MessageProcessor对象的接收开关,然后在另一个线程里往MessageBus里发送消息,首先发送START消息,然后发送PROCESSING消息,最后发送DONE消息,我们在MessageProcessor的DONE消息响应函数ONDone中返回false,MessageProcessor停止消息接收,线程退出。

    #include "MessageProcessor.h"
    #include <functional>
    
    void SendMessage()
    {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        MessageBus::Push(MessageType::MESSAGE_START);
        int i = 1;
        while (i <= 10)
        {
            std::this_thread::sleep_for(std::chrono::seconds(1));
            MessageBus::Push(MessageType::MESSAGE_PROCESSING, std::to_string(i * 10));
            i++;
        }
        std::this_thread::sleep_for(std::chrono::seconds(1));
        MessageBus::Push(MessageType::MESSAGE_DONE);
    }
    
    int main()
    {
        MessageProcessor msgProc;
    
        std::thread thProc([&] { msgProc.ON(); });
        std::thread thSend(SendMessage);
    
        thProc.join();
        thSend.join();
        return 0;
    }
    
    //output
    Message Handling ON!
    [Work Start]
    [Processing]......10%
    [Processing]......20%
    [Processing]......30%
    [Processing]......40%
    [Processing]......50%
    [Processing]......60%
    [Processing]......70%
    [Processing]......80%
    [Processing]......90%
    [Processing]......100%
    [Work Done]
    Message Handling OFF!
    

    程序的执行结果显示,消息发送和接收处理成功。

    相关文章

      网友评论

          本文标题:用C++实现数据总线的方法系列(下):消息总线和消息处理

          本文链接:https://www.haomeiwen.com/subject/fdkmshtx.html