视频教程:https://www.bilibili.com/video/av95994293/
本文实现一个完整功能的消息总线MessageBus,同时介绍下消息的处理方法。
这里定义了消息类型的枚举MesageType,消息优先级的枚举MessagePriority,以及消息的结构类Message,它包含消息类型(type),消息优先级(priority)和消息数据(info)。同时定义了MessagePtr作为Message的指针指针类。
消息总线MessageBus的实现和数据总线一样,需要有存放消息的消息链表,构成锁的互斥量和用于多线程同步的条件变量,同时也需要具有最基本的Push和Pop函数函数。
在Push函数里,会New出一个Message对象,存放需要发送的消息数据,然后将这个对象的指针放入到消息链表中,而在Pop函数里,得到这个对象的指针,并作为初始化参数传递给一个MessagePtr智能指针,外部调用处理完消息后,消息对象会被释放。
消息类型,结构和总线实现
#ifndef MESSAGE_BUS_H
#define MESSAGE_BUS_H
#include<string>
#include<mutex>
#include<list>
#include<condition_variable>
enum MessageType
{
MESSAGE_NONE = 0,
MESSAGE_START,
MESSAGE_PROCESSING,
MESSAGE_DONE,
MESSAGE_EXCEPTION
};
enum MessagePriority
{
MP_LOW = 0,
MP_NORMAL,
EP_HIGH
};
class Message
{
public:
Message(): m_type(MESSAGE_NONE), m_priority(MP_NORMAL)
{
}
Message(MessageType type, std::string info, MessagePriority priority) : m_type(type), m_info(info), m_priority(priority)
{
}
~Message() {}
MessageType Type()
{
return m_type;
}
MessagePriority Priority()
{
return m_priority;
}
std::string Info()
{
return m_info;
}
private:
MessageType m_type;
MessagePriority m_priority;
std::string m_info;
};
using MessagePtr = std::shared_ptr<Message>;
class MessageBus
{
public:
static void Clear();
static void Push(Message* pMessage);
static void Push(MessageType type);
static void Push(MessageType type, std::string info);
static void Push(MessageType type, std::string info, MessagePriority priority);
static MessagePtr Pop();
private:
static std::mutex s_mt;
static std::condition_variable s_cv;
static std::list<Message*> s_queue;
};
#endif
#include "MessageBus.h"
std::mutex MessageBus::s_mt;
std::condition_variable MessageBus::s_cv;
std::list<Message*> MessageBus::s_queue;
void MessageBus::Clear()
{
std::list<Message*>::iterator it = s_queue.begin();
for (; it != s_queue.end(); it++)
{
delete *it;
}
s_queue.clear();
}
void MessageBus::Push(Message* pMessage)
{
std::unique_lock<std::mutex> lock(s_mt);
s_queue.push_back(pMessage);
s_cv.notify_all();
}
void MessageBus::Push(MessageType type)
{
Push(type, std::string(""), MessagePriority::MP_NORMAL);
}
void MessageBus::Push(MessageType type, std::string info)
{
Push(type, info, MessagePriority::MP_NORMAL);
}
void MessageBus::Push(MessageType type, std::string info, MessagePriority priority)
{
Message *pMessage = new Message(type, info, priority);
Push(pMessage);
}
MessagePtr MessageBus::Pop()
{
Message* pMessage = NULL;
{
std::unique_lock<std::mutex> lock(s_mt);
while (s_queue.empty())
{
s_cv.wait(lock);
}
pMessage = s_queue.front();
s_queue.pop_front();
}
return MessagePtr(pMessage);
}
消息的处理
首先先实现一个消息处理基类MessageHandler,这个类的On函数开启消息接收,OFF函数取消消息接收。
这个类是怎么来实现消息处理的呢?主要依赖于这个类的messageMap成员,它是一个unorderedmap,key实现消息类型,value是消息响应函数,这也就将每种消息和相应的消息响应函数关联在一起了。这个关联关系是由RegisterMessageFunc函数完成的,也就是我们通常所说的注册消息响应函数。
在MessageHandler开启消息接收后,首先调用继承类的InitMessageMap函数将消息响应函数注册好,然后一直在MessageBus里取出消息,调用InvokeHanlderFunc函数,这个函数会根据消息的类型查找到相应的消息响应函数,然后调用该函数并把消息作为入参传递进该函数进行处理。
消息处理基类MessageHandler
#ifndef MESSAGE_HANDLER_H
#define MESSAGE_HANDLER_H
#include<atomic>
#include<iostream>
#include<unordered_map>
#include"MessageBus.h"
template<typename T>
class MessageHandler
{
public:
MessageHandler() :m_running(false)
{
}
~MessageHandler()
{
OFF();
}
void OFF()
{
std::cout << "Message Handling OFF!" << std::endl;
m_running = false;
}
void ON()
{
std::cout << "Message Handling ON!" << std::endl;
m_running = true;
static_cast<T*>(this)->InitMessageMap();
while (m_running)
{
MessagePtr pMessage = MessageBus::Pop();
m_running = InvokeHanlderFunc(*pMessage);
}
}
protected:
using MessageHandlerFunc = bool (T::*)(Message&);
using MessageMap = std::unordered_map<MessageType, MessageHandlerFunc>;
MessageMap m_messageMap;
void RegisterMessageFunc(MessageType type, MessageHandlerFunc func)
{
m_messageMap[type] = func;
}
bool InvokeHanlderFunc(Message& message)
{
auto it = m_messageMap.find(message.Type());
if (it != m_messageMap.end())
{
T* pThis = static_cast<T*>(this);
return (pThis->*(it->second))(message);
}
else
{
std::cout << "Message Type: " << message.Type() << " not handler function!" << std::endl;
return false;
}
}
void Clear()
{
m_messageMap.clear();
}
private:
std::atomic_bool m_running;
};
#endif
消息处理业务类MessageProcessor
实际的的消息处理类,实现了一些消息响应函数,然后在InitMessageMap函数里调用了基类的RegisterMessageFunc,将这些消息响应函数注册到messageMap里。
#ifndef MESSAGE_PROCESSOR_H
#define MESSAGE_PROCESSOR_H
#include "MessageHandler.h"
class MessageProcessor : public MessageHandler<MessageProcessor>
{
public:
MessageProcessor();
~MessageProcessor();
void InitMessageMap();
private:
bool OnStart(Message& message);
bool OnProcessing(Message& message);
bool OnDone(Message& message);
bool OnException(Message& message);
};
#endif
#include "MessageProcessor.h"
MessageProcessor::MessageProcessor()
{
}
MessageProcessor::~MessageProcessor()
{
}
void MessageProcessor::InitMessageMap()
{
Clear();
RegisterMessageFunc(MessageType::MESSAGE_START, &MessageProcessor::OnStart);
RegisterMessageFunc(MessageType::MESSAGE_PROCESSING, &MessageProcessor::OnProcessing);
RegisterMessageFunc(MessageType::MESSAGE_DONE, &MessageProcessor::OnDone);
RegisterMessageFunc(MessageType::MESSAGE_EXCEPTION, &MessageProcessor::OnException);
}
bool MessageProcessor::OnStart(Message& message)
{
std::cout << "[Work Start]" << std::endl;
return true;
}
bool MessageProcessor::OnProcessing(Message& message)
{
std::cout << "[Processing]......" << message.Info().c_str() << "%" << std::endl;
return true;
}
bool MessageProcessor::OnDone(Message& message)
{
std::cout << "[Work Done]" << std::endl;
return false;
}
bool MessageProcessor::OnException(Message& message)
{
std::cout << "[Work Exception!!!]" << std::endl;
return true;
}
调用执行
首先在一个线程里开启MessageProcessor对象的接收开关,然后在另一个线程里往MessageBus里发送消息,首先发送START消息,然后发送PROCESSING消息,最后发送DONE消息,我们在MessageProcessor的DONE消息响应函数ONDone中返回false,MessageProcessor停止消息接收,线程退出。
#include "MessageProcessor.h"
#include <functional>
void SendMessage()
{
std::this_thread::sleep_for(std::chrono::seconds(1));
MessageBus::Push(MessageType::MESSAGE_START);
int i = 1;
while (i <= 10)
{
std::this_thread::sleep_for(std::chrono::seconds(1));
MessageBus::Push(MessageType::MESSAGE_PROCESSING, std::to_string(i * 10));
i++;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
MessageBus::Push(MessageType::MESSAGE_DONE);
}
int main()
{
MessageProcessor msgProc;
std::thread thProc([&] { msgProc.ON(); });
std::thread thSend(SendMessage);
thProc.join();
thSend.join();
return 0;
}
//output
Message Handling ON!
[Work Start]
[Processing]......10%
[Processing]......20%
[Processing]......30%
[Processing]......40%
[Processing]......50%
[Processing]......60%
[Processing]......70%
[Processing]......80%
[Processing]......90%
[Processing]......100%
[Work Done]
Message Handling OFF!
程序的执行结果显示,消息发送和接收处理成功。
网友评论