美文网首页
使用boost::asio 模拟JMeter做分布式压测网络部分

使用boost::asio 模拟JMeter做分布式压测网络部分

作者: FredricZhu | 来源:发表于2021-09-28 19:30 被阅读0次

    本例使用boost::asio模拟jmeter做分布式压测网络部分的原理,虽然比较简单,但基本可以实现分布式压测的效果。
    原理图如下,


    image.png

    为了实现方便,master只做一个总控用,没有用来执行性能测试了。
    还有一点妥协是,master必须等所有node全部ready之后才能发送起测命令。为了少发几个消息[通过再多发送一对消息还是可以实现的],就没有刻意实现了。
    所以启动的顺序是,先启动server,
    接着启动所有node, node1, node2, node3..
    最后启动master。

    另外recent_msgs不需要保存,可以去掉,简化一些代码,也没做。主要是没时间。

    整体代码如下,
    CMakeLists.txt

    cmake_minimum_required(VERSION 2.6)
    project(perf_tool)
    
    add_definitions(-std=c++14)
    
    
    find_package(Boost REQUIRED COMPONENTS 
        system
        filesystem
        serialization
        program_options
        )
    
    include_directories(${Boost_INCLUDE_DIRS} ${CMAKE_CURRENT_SOURCE_DIR}/../../include)
    
    file( GLOB APP_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp)
    foreach( sourcefile ${APP_SOURCES} )
        file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${sourcefile})
        if( NOT ${filename} MATCHES "parse_msg.cpp" )
            string(REPLACE ".cpp" "" file ${filename})
            add_executable(${file} ${sourcefile} "parse_msg.cpp")
            target_link_libraries(${file} ${Boost_LIBRARIES})
            target_link_libraries(${file} pthread)
        endif( NOT ${filename} MATCHES "struct_header.cpp" )
    endforeach( sourcefile ${APP_SOURCES} )
    

    server.cpp

    #include "chat_message.h"
    
    #include <boost/asio.hpp>
    
    #include <deque>
    #include <iostream>
    #include <memory>
    #include <set>
    #include <list>
    #include <utility>
    
    #include <cassert>
    #include <cstdlib>
    
    using boost::asio::ip::tcp;
    
    using chat_message_queue = std::deque<chat_message>;
    
    class chat_session;
    using chat_session_ptr = std::shared_ptr<chat_session>;
    
    
    std::string master_name = "";
    
    // 聊天室类的声明
    class chat_room {
        public:
            void join(chat_session_ptr);
            void leave(chat_session_ptr);
            void deliver(const chat_message&);
            void deliver_to(const chat_message&, const std::string& paticipant_name);
        private:
            std::set<chat_session_ptr> participants_;
            enum { max_recent_msgs = 100 };
            chat_message_queue recent_msgs_;
    };
    
    class chat_session: public std::enable_shared_from_this<chat_session> {
        public:
            chat_session(tcp::socket socket, chat_room& room): socket_(std::move(socket)), room_(room) {}
    
            void start() {
                room_.join(shared_from_this());
                // 启动服务时开始读取消息头
                do_read_header();
            }
    
            void deliver(const chat_message& msg) {
                bool write_in_progress = !write_msgs_.empty();
                write_msgs_.push_back(msg);            
    
                // 为了保护do_write线程里面的deque,避免两个线程同时写
                if(!write_in_progress) {
                    do_write();
                }
            }
        
        std::string& get_client_name() {
            return m_name;
        }
    
        private:
            // 读取消息头
            void do_read_header() {
                auto self(shared_from_this());
    
                boost::asio::async_read(
                            socket_,
                            boost::asio::buffer(read_msg_.data(), chat_message::header_length),
                            [this, self] (boost::system::error_code ec, std::size_t length) {
                                // 头部解析成功,获取到body_length
                                if(!ec && read_msg_.decode_header()) {
                                    do_read_body();
                                } else {
                                    room_.leave(shared_from_this());
                                }
                            }
                        );
            }
    
    
            void do_read_body() {
                auto self(shared_from_this());
    
                boost::asio::async_read(
                        socket_,
                        boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
                        [this, self] (boost::system::error_code ec, std::size_t length) {
                            // 如果读取消息成功,没有error
                            if(!ec) {
                                // room_的deliver msg,会先更新recent_message queue,
                                // 然后调用各;个Session的Deliver message
                                // 将消息发给对应的client 
                                // room_.deliver(read_msg_);
                                handleMessage();
                                
                                // 接着读头,形成事件循环
                                do_read_header();
                            }else {
                                room_.leave(shared_from_this());
                            }
                        }
                    );
            }
            
    
            
            json to_json() {
                std::string buffer(read_msg_.body(), read_msg_.body() + read_msg_.body_length());
                std::cout << "raw message server: " << buffer << std::endl; 
                std::stringstream ss(buffer);
                json json_obj;
                try {
                   json_obj = json::parse(ss.str());
                }catch(std::exception& ex) {
                    std::cerr << "解析 json对象 失败!!" << std::endl;
                    std::cerr << ex.what() << std::endl;
                }
                return json_obj;
            }
        
            // 处理接收到的客户端的消息的函数
            void handleMessage() {
                // master 和 slave都会发这个,注册自己的名字
                if(read_msg_.type() == MT_BIND_NAME) {
                    auto json_obj = to_json();
                    m_name = json_obj["name"].get<std::string>();
                    // 只有master会发launch task message
                } else if(read_msg_.type() == MT_LAUNCH_TASK_MSG) {
                    master_name = m_name;
                    std::cerr << "Master name: " << master_name << std::endl;
                    auto json_obj = to_json();
                    m_chatInformation = json_obj["information"].get<std::string>();
                    auto rinfo = buildRoomInfo();
                    chat_message msg;
                    msg.setMessage(MT_LAUNCH_TASK_MSG, rinfo);
                    room_.deliver(msg);
                    // master, slave执行完性能测试之后,都会发这个消息
                } else if(read_msg_.type() == MT_SEND_TASK_INFO_MSG){
                    std::cerr << "send task info" << std::endl;
                    std::cerr << "Master name in task info: " << master_name << std::endl;
                    auto json_obj = to_json();
                    m_chatInformation = json_obj["information"].get<std::string>();
                    auto rinfo = buildRoomInfo();
                    chat_message msg;
                    msg.setMessage(MT_SEND_TASK_INFO_MSG, rinfo);
                    room_.deliver_to(msg, master_name);
                } else {
                    // 不可用消息,啥也不做
                }
            }
            
            // 构建一个RoomInformation信息
            std::string buildRoomInfo() const {
                std::ostringstream oss;
                oss << R"({"name": )";
                oss << R"(")" << m_name << R"(")" << ",";
                oss << R"("information": )";
                oss << R"(")" << m_chatInformation << R"(")";
                oss << "}";
    
                std::cout << "Room info: " <<  oss.str() << std::endl;
                return std::move(oss.str());
            }
    
            void do_write() {
                auto self(shared_from_this());
                boost::asio::async_write(
                            socket_,
                            boost::asio::buffer(write_msgs_.front().data(), write_msgs_.front().length()),
                            [this, self] (boost::system::error_code ec, std::size_t length) {
                                // 如果写队头信息成功,没有错误    
                                if(!ec) {
    
                                    write_msgs_.pop_front();
                                    // 如果还有得写,就接着写
                                    if(!write_msgs_.empty()) {
                                        do_write();
                                    }
                                }else {
                                    room_.leave(shared_from_this());
                                }
                            }
                        );
            }
    
            tcp::socket socket_;
            // room的生命周期必须长于session的生命周期,
            // 否则会因为持有无效的引用而翻车
            chat_room& room_;
            chat_message read_msg_;
            chat_message_queue write_msgs_;
            std::string m_name; // 客户端姓名
            std::string m_chatInformation; // 客户端当前的消息
    };
    
    
    void chat_room::join(chat_session_ptr participant) {
        participants_.insert(participant);
        // 不需要广播历史消息这里
    }
    
    void chat_room::leave(chat_session_ptr participant) {
        participants_.erase(participant);
    }
    
    // 消息分发函数
    void chat_room::deliver(const chat_message& msg) {
        recent_msgs_.push_back(msg);
        // recent_msgs_调整到最大值
        while(recent_msgs_.size() > max_recent_msgs) {
            recent_msgs_.pop_front();
        }
    
        // 给每个群聊参与者群发消息
        for(auto & participant: participants_) {
            participant->deliver(msg);
        }
    }
    
    void chat_room::deliver_to(const chat_message& msg, const std::string& paticipant_name) {
        recent_msgs_.push_back(msg);
        // recent_msgs_调整到最大值
        while(recent_msgs_.size() > max_recent_msgs) {
            recent_msgs_.pop_front();
        }
    
        // 给每个群聊参与者群发消息
        for(auto & participant: participants_) {
            if(participant->get_client_name() == paticipant_name) {
                participant->deliver(msg);
            }
        }
    }
    
    
    class chat_server {
        public:
            chat_server(boost::asio::io_service& io_service, 
                        const tcp::endpoint& endpoint): acceptor_(io_service, endpoint), 
                                                        socket_(io_service){
                                                            do_accept();
                                                        }
            
            // 接收来自客户端的连接的函数
            void do_accept() {
                acceptor_.async_accept(
                            socket_, 
                            [this] (boost::system::error_code ec) {
                                // 如果接收连接成功,没有错误
                                if(!ec) {
                                    auto session = std::make_shared<chat_session>(std::move(socket_),
                                            room_
                                        );
    
                                    session->start();
                                } 
    
                                // 无论成功或失败,都继续接收连接
                                do_accept();
                            }
                        );
            }
        private:
            tcp::acceptor acceptor_;
            tcp::socket socket_;
            chat_room room_;
    };
    
    
    int main(int argc, char* argv[]) {
    
        try {
            if(argc < 2) {
                std::cerr << "Usage: chat_server <port> [<port> ...]" << std::endl;
                return 1;
            }
            
            boost::asio::io_service io_service;
    
            std::list<chat_server> servers;
            
            for(int i=1; i<argc; ++i) {
                tcp::endpoint endpoint(tcp::v4(), std::atoi(argv[i]));
                servers.emplace_back(io_service, endpoint);
            }
            io_service.run();
    
        }catch(std::exception& e) {
            std::cerr << "Exception: " << e.what() << std::endl; 
        }
    
        return 0;
    }
    

    master.cpp

    #include "chat_message.h"
    
    #include <boost/asio.hpp>
    #include <deque>
    #include <iostream>
    #include <thread>
    
    #include <cstdlib>
    
    
    using boost::asio::ip::tcp;
    
    using chat_message_queue = std::deque<chat_message>;
    
    int slave_count = 0;
    
    class chat_client {
        public:
            chat_client(boost::asio::io_service& io_service,
                        tcp::resolver::iterator endpoint_iterator
                    ): io_service_(io_service), socket_(io_service) {
                do_connect(endpoint_iterator);
            }
    
            void write(const chat_message& msg) {
                // write是由主线程往子线程写东西
                // 所以需要使用post提交到子线程运行
                // 使得所有io操作都由io_service的子线程掌握
                io_service_.post(
                        [this, msg] () {
    
                            bool write_in_progress = !write_msgs_.empty();
                            write_msgs_.push_back(msg);
                            if(!write_in_progress) {
                                do_write();
                            }
                        }
                        );
            } 
    
            
            void close() {
                io_service_.post(
                        [this] () {
                            socket_.close();
                        }
                        );
            }
        private:
          
            void do_connect(tcp::resolver::iterator endpoint_iterator) {
                boost::asio::async_connect(
                            socket_,
                            endpoint_iterator,
                            [this] (boost::system::error_code ec, tcp::resolver::iterator it) {
                                if(!ec) {
                                    // 如果连接成功,读取消息头
                                    do_read_header();
                                }
                            }
                        );
            }
    
    
            void do_read_header() {
                boost::asio::async_read(
                            socket_,
                            boost::asio::buffer(read_msg_.data(), chat_message::header_length),
                            [this] (boost::system::error_code ec, std::size_t length) {
                                if(!ec && read_msg_.decode_header()) {
                                    // 如果没有错误,并且Decode_header成功,成功读取到body_length
                                    do_read_body();
                                }else {
                                    // 读取失败时关闭与服务端的连接,退出事件循环
                                    socket_.close();
                                }
                            }
                        );
            }
            
            json to_json() {
                std::string buffer(read_msg_.body(), read_msg_.body() + read_msg_.body_length());
                std::stringstream ss(buffer);
                auto json_obj = json::parse(ss.str());
                return json_obj;
            } 
    
            void do_read_body() {
                boost::asio::async_read(
                            socket_,
                            boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
                            [this] (boost::system::error_code ec, std::size_t length) {
                                if(!ec) {
                                    // 校验一下消息长度和消息类型,
                                    // 证明确实发过来的是RomInformation消息
                                    if(read_msg_.type() == MT_SEND_TASK_INFO_MSG) {
                                        auto json_obj = to_json();
                                        std::cout << "client ";
                                        std::cout << json_obj["name"].get<std::string>();
                                        std::cout << " says: ";
                                        std::cout << json_obj["information"].get<std::string>();
                                        std::cout << "\n";
    
                                        ++receive_slave_cout;
                                        if(receive_slave_cout == slave_count) {
                                            //TODO: 汇总计算结果
                                            std::cerr << "开始汇总计算性能测试结果" << std::endl;
    
                                            close();
                                        }
                                    }
                                  
                                    // 调用do_read_header函数串联起事件链,接着读
                                    do_read_header();
                                }else {
                                    socket_.close();
                                }
                                
                            }
                        );
            }
            
            // 向服务端真正发送消息的函数
            void do_write() {
                boost::asio::async_write(
                            socket_,
                            boost::asio::buffer(
                                write_msgs_.front().data(),
                                write_msgs_.front().length()
                                ),
                            [this] (boost::system::error_code ec, std::size_t length) {
                                if(!ec) {
                                    // 一直写直到写完
                                    write_msgs_.pop_front();
                                    if(!write_msgs_.empty()) {
                                        do_write();
                                    }
                                }else {
                                    socket_.close();
                                }
                            }
                        );
            }
    
            // 注意使用了引用类型,
            // io_service对象的生命周期必须要大于chat_client对象的生命周期
            // 否则会出现引用失效,导致异常
            boost::asio::io_service& io_service_;
            tcp::socket socket_;
            chat_message read_msg_;
            chat_message_queue write_msgs_;
            int receive_slave_cout {0};
    };
    
    
    int main(int argc, char* argv[]) {
        try {
            if(argc != 3) {
                std::cerr << "Usage: chat_client <host> <port>" << std::endl;
                return 1;
            }
    
            //TODO: 读配置文件或者命令行参数,获取SLAVE_COUNT 
            slave_count = 2;
    
            boost::asio::io_service io_service;
            tcp::resolver resolver(io_service);
            auto endpoint_iterator = resolver.resolve({argv[1], argv[2]});
            chat_client c(io_service, endpoint_iterator);
    
            std::thread t([&io_service]() {io_service.run(); });
            char line[chat_message::max_body_length + 1];
    
           
            chat_message msg;
            auto type = 0;
    
            std::string input = "BindName master";
            std::string output;
            if(parseMessage(input, &type, output)) {
                msg.setMessage(type, output.data(), output.size());
                c.write(msg);
            }
    
            input = "LaunchTask task1";
    
            if(parseMessage(input, &type, output)) {
                msg.setMessage(type, output.data(), output.size());
                c.write(msg);
            }
    
            
            t.join();
        }catch(std::exception& ex) {
            std::cerr << "Exception: " << ex.what() << std::endl;
        }
    
        return 0;
    }
    

    slave.cpp

    #include "chat_message.h"
    
    #include <boost/asio.hpp>
    #include <deque>
    #include <iostream>
    #include <thread>
    
    #include <cstdlib>
    
    using boost::asio::ip::tcp;
    
    using chat_message_queue = std::deque<chat_message>;
    
    class chat_client {
        public:
            chat_client(boost::asio::io_service& io_service,
                        tcp::resolver::iterator endpoint_iterator
                    ): io_service_(io_service), socket_(io_service) {
                do_connect(endpoint_iterator);
            }
    
            void write(const chat_message& msg) {
                // write是由主线程往子线程写东西
                // 所以需要使用post提交到子线程运行
                // 使得所有io操作都由io_service的子线程掌握
                io_service_.post(
                        [this, msg] () {
    
                            bool write_in_progress = !write_msgs_.empty();
                            write_msgs_.push_back(msg);
                            if(!write_in_progress) {
                                do_write();
                            }
                        }
                        );
            } 
    
            
            void close() {
                io_service_.post(
                        [this] () {
                            socket_.close();
                        }
                        );
            }
        private:
          
            void do_connect(tcp::resolver::iterator endpoint_iterator) {
                boost::asio::async_connect(
                            socket_,
                            endpoint_iterator,
                            [this] (boost::system::error_code ec, tcp::resolver::iterator it) {
                                if(!ec) {
                                    // 如果连接成功,读取消息头
                                    do_read_header();
                                }
                            }
                        );
            }
    
    
            void do_read_header() {
                boost::asio::async_read(
                            socket_,
                            boost::asio::buffer(read_msg_.data(), chat_message::header_length),
                            [this] (boost::system::error_code ec, std::size_t length) {
                                if(!ec && read_msg_.decode_header()) {
                                    // 如果没有错误,并且Decode_header成功,成功读取到body_length
                                    do_read_body();
                                }else {
                                    // 读取失败时关闭与服务端的连接,退出事件循环
                                    socket_.close();
                                }
                            }
                        );
            }
            
            json to_json() {
                std::string buffer(read_msg_.body(), read_msg_.body() + read_msg_.body_length());
                std::stringstream ss(buffer);
                auto json_obj = json::parse(ss.str());
                return json_obj;
            } 
    
            void do_read_body() {
                boost::asio::async_read(
                            socket_,
                            boost::asio::buffer(read_msg_.body(), read_msg_.body_length()),
                            [this] (boost::system::error_code ec, std::size_t length) {
                                if(!ec) {
                                    // 校验一下消息长度和消息类型,
                                    // 证明确实发过来的是RomInformation消息
                                    if(read_msg_.type() == MT_LAUNCH_TASK_MSG) {
                                        // TODO: 启动性能测试,完事以后发送
                                        // send_task_info_msg
                                        auto json_obj = to_json();
                                        std::cout << "client ";
                                        std::cout << json_obj["name"].get<std::string>();
                                        std::cout << " says: ";
                                        std::cout << json_obj["information"].get<std::string>();
                                        std::cout << "\n";
    
                                        std::cerr << "开始做性能测试..." << std::endl;
                                        std::cerr << "结束做性能测试..." << std::endl;
    
                                        chat_message msg;
                                        auto type = 0;
    
                                        std::string input("SendTaskInfo TaskSuccess");
                                        std::string output;
                                        
                                        if(parseMessage(input, &type, output)) {
                                            msg.setMessage(type, output.data(), output.size());
                                            write(msg);
                                        }
    
                                        close();
                                    }
                                    // 调用do_read_header函数串联起事件链,接着读
                                    do_read_header();
                                }else {
                                    socket_.close();
                                }
                                
                            }
                        );
            }
            
            // 向服务端真正发送消息的函数
            void do_write() {
                boost::asio::async_write(
                            socket_,
                            boost::asio::buffer(
                                write_msgs_.front().data(),
                                write_msgs_.front().length()
                                ),
                            [this] (boost::system::error_code ec, std::size_t length) {
                                if(!ec) {
                                    // 一直写直到写完
                                    write_msgs_.pop_front();
                                    if(!write_msgs_.empty()) {
                                        do_write();
                                    }
                                }else {
                                    socket_.close();
                                }
                            }
                        );
            }
    
            // 注意使用了引用类型,
            // io_service对象的生命周期必须要大于chat_client对象的生命周期
            // 否则会出现引用失效,导致异常
            boost::asio::io_service& io_service_;
            tcp::socket socket_;
            chat_message read_msg_;
            chat_message_queue write_msgs_;
    };
    
    
    int main(int argc, char* argv[]) {
        try {
            if(argc != 3) {
                std::cerr << "Usage: chat_client <host> <port>" << std::endl;
                return 1;
            }
    
            boost::asio::io_service io_service;
            tcp::resolver resolver(io_service);
            auto endpoint_iterator = resolver.resolve({argv[1], argv[2]});
            chat_client c(io_service, endpoint_iterator);
    
            chat_message msg;
            auto type = 0;
    
            std::string slave_name {};
           
            std::cout << "Pls input name: " << std::endl;
            std::cin >> slave_name;
            std::string input = "BindName " + slave_name;
            std::string output;
            if(parseMessage(input, &type, output)) {
                msg.setMessage(type, output.data(), output.size());
                c.write(msg);
            }
    
            std::thread t([&io_service]() {io_service.run(); });
    
            t.join();
        }catch(std::exception& ex) {
            std::cerr << "Exception: " << ex.what() << std::endl;
        }
    
        return 0;
    }
    

    chat_message.h

    #ifndef _CHAT_MESSAGE_H_
    #define _CHAT_MESSAGE_H_
    
    #include "parse_msg.h"
    
    #include <cstdio>
    #include <cstdlib>
    #include <cstring>
    #include <cassert>
    #include <iostream>
    
    class chat_message {
        public:
            // Header的大小变为8个字节,使用sizeof关键字进行计算
            enum { header_length = sizeof(Header) };
            enum { max_body_length = 512 };
    
            chat_message() {}
            
            // 这里返回的data不可以修改
            const char* data() const { return data_; }
            char* data() { return data_; }
            
            // 计算总长度时,需要通过m_header获取到bodySize
            std::size_t length() const { return header_length + m_header.bodySize; }
            
            // body为 data_往后面移动 head_length个字节
            const char* body() const { return data_ + header_length; }
            char* body() { return  data_ + header_length; }
            
            int type() const { return m_header.type; }
    
    
            std::size_t body_length() const { return m_header.bodySize; }
    
            void setMessage(int messageType, const void* buffer, size_t bufferSize) {
                // 确认body大小未超过限制
                assert(bufferSize < max_body_length);
    
                m_header.bodySize = bufferSize;
                m_header.type = messageType;
                std::memcpy(body(), buffer, bufferSize);
                std::memcpy(data(), &m_header, sizeof(m_header));
            }
    
            void setMessage(int messageType, const std::string& buffer) {
                setMessage(messageType, buffer.data(), buffer.size());
            }
    
            bool decode_header() {
                std::memcpy(&m_header, data(), header_length);
                if(m_header.bodySize > max_body_length) {
                    std::cout <<"body size: " << m_header.bodySize << " header type:" << m_header.type  << std::endl;
                    return false;
                }
    
                return true;
            }
            
        private:
            char data_[header_length+max_body_length];
            Header m_header;
    };
    #endif
    

    parse_msg.h

    #ifndef _PARSE_MSG_H_
    #define _PARSE_MSG_H_
    
    #include "json/json.hpp"
    
    #include <sstream>
    #include <cstdlib>
    #include <string>
    #include <iostream>
    #include <cstring>
    
    using json = nlohmann::json;
    
    
    struct Header {
        int bodySize; // 包体大小 
        int type; // 消息类型
    };
    
    enum MessageType {
        MT_BIND_NAME = 1,
        MT_LAUNCH_TASK_MSG = 2,
        MT_SEND_TASK_INFO_MSG = 3, 
    };
    
    bool parseMessage(const std::string& input, int* type, std::string& outbuffer);
    #endif
    

    parse_msg.cpp

    #include "parse_msg.h"
    
    #include <sstream>
    
    
    // 消息解析函数
    // input 输入的消息字符串
    // type 传出的消息类型指针
    // outbuffer 输出的用于发送的消息内容字符串
    bool parseMessage(const std::string& input, int* type, std::string& outbuffer) {
        auto pos = input.find_first_of(" ");
        // 消息中没找到空格
        if(pos == std::string::npos) {
            return false;
        }
    
        if(pos == 0) {
            return false; 
        }
    
        auto command = input.substr(0, pos);
        // Bind姓名消息
        if(command == "BindName") {
            std::string name = input.substr(pos+1);
            if(name.size()>32) {
                std::cerr << "姓名的长度大于32个字节!" <<  std::endl;
                return false;
            }
            
            if(type) {
                *type = MT_BIND_NAME;
            }
            
            std::ostringstream oss;
            oss << R"({"name": )";
            oss << R"(")" << name << R"(")" << "}";
            auto json_obj = json::parse(oss.str());
            outbuffer = json_obj.dump();
            return true;
            // 聊天消息
        }else if(command == "LaunchTask") {
            std::string task = input.substr(pos+1);
            if(task.size() > 1000000) {
                std::cerr << "消息的长度大于1000000个字节!" << std::endl;
                return false;
            }
            
           
            std::ostringstream oss;
            oss << R"({"information" :)";
            oss << R"(")" << task  << R"(")" << "}";
    
            auto json_obj = json::parse(oss.str());
            outbuffer = json_obj.dump();
    
            if(type) {
                *type = MT_LAUNCH_TASK_MSG;
            }
    
            return true;
        } else if(command == "SendTaskInfo") {
            std::string task_res = input.substr(pos+1);
            if(task_res.size() > 1000000) {
                std::cerr << "消息的长度大于1000000个字节!" << std::endl;
                return false;
            }
            
           
            std::ostringstream oss;
            oss << R"({"information" :)";
            oss << R"(")" << task_res  << R"(")" << "}";
    
            auto json_obj = json::parse(oss.str());
            outbuffer = json_obj.dump();
    
            if(type) {
                *type = MT_SEND_TASK_INFO_MSG;
            }
    
            return true;
        }
        
        // 不支持的消息类型,返回false
        return false;
    }
    

    程序输出如下,


    image.png image.png image.png image.png

    相关文章

      网友评论

          本文标题:使用boost::asio 模拟JMeter做分布式压测网络部分

          本文链接:https://www.haomeiwen.com/subject/hiprnltx.html