美文网首页
使用tasks_processor管理网络请求

使用tasks_processor管理网络请求

作者: FredricZhu | 来源:发表于2021-04-14 07:32 被阅读0次

    这个框架作者是在boost::asio库的基础上做二次开发,设计得挺骚的。

    服务器和客户端都通过tasks_processor来启动和调度。如果加上io_service的多线程ios::run,就可以让服务器和客户端启动的task在多线程环境中执行。

    加上timer_task这种调度任务后,可以使用boost::asio::deadline_timer来设定哪种任务先执行,哪种任务后执行,延迟到什么时候。

    如果是标准的实时通信操作的话,客户端和服务器都应该写成一个
    class tcp_client: public boost::enable_shared_from_this<tcp_client> 类似这样的类。
    这样才便于持续操作。参考authorizer服务器的写法就可以。

    当然认证服务器,写到作者这个样子就可以,毕竟只是一个demo,而且也不用在内存中保存历史认证信息,认证一次就够了。

    没时间画类图了,直接上代码,
    代码结构,


    图片.png

    CMakeLists.txt

    cmake_minimum_required(VERSION 2.6)
    project(lexical_cast)
    
    add_definitions(-std=c++14)
    
    include_directories("/usr/local/include")
    link_directories("/usr/local/lib")
    file( GLOB APP_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp)
    foreach( sourcefile ${APP_SOURCES} )
        file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${sourcefile})
        string(REPLACE ".cpp" "" file ${filename})
        add_executable(${file} ${sourcefile})
        target_link_libraries(${file} boost_filesystem boost_thread boost_system boost_serialization pthread boost_chrono)
    endforeach( sourcefile ${APP_SOURCES} )
    

    main.cpp

    #include "utils/tasks_processor_network.hpp"
    
    #include <boost/asio/read.hpp>
    #include <boost/asio/write.hpp>
    
    using namespace tp_network;
    // 前向声明
    class authorizer;
    // 全局的authorizer_ptr声明
    using authorizer_ptr = boost::shared_ptr<authorizer>;
    
    class authorizer: public boost::enable_shared_from_this<authorizer> {
        tcp_connection_ptr connection_;
        boost::array<char, 512> message_;
    
        explicit authorizer(const tcp_connection_ptr& connection):
            connection_(connection) {
    
        }
    
        public:
            // 至少读一个字节
            static void on_connection_accept(const tcp_connection_ptr& connection) {
                authorizer_ptr auth(new authorizer(connection));
                auth->connection_.async_read(
                    boost::asio::buffer(auth->message_),
                    [auth](const boost::system::error_code& ec, std::size_t bytes_transferred) {
                        auth->on_data_received(ec, bytes_transferred);
                    },
                    1
                );
            }
    
            void on_data_received(const boost::system::error_code& ec, std::size_t bytes_transferred) {
                if(ec) {
                    std::cerr << "authorizer.on_data_received: error during receiving response: " << ec << '\n';
                    assert(false);
                }
    
                if(bytes_transferred == 0) {
                    std::cerr << "authorizer.on_data_received: zero bytes received\n";
                    assert(false);
                }
    
                // 现在我们在 message_中读到了数据,我们可以做一些验证登陆的工作
                // ...
    
                // 往客户端回写消息 ”OK“
                message_[0] = 'O';
                message_[1] = 'K';
    
                std::size_t bytes_to_send = 2;
    
                auto self(shared_from_this());
                // 现在把"OK"写回去
                connection_.async_write(
                    boost::asio::buffer(message_, bytes_to_send),
                    [self](const boost::system::error_code& ec, std::size_t bytes_transferred) {
                        self->on_data_send(ec);
                    }
                );
            }
    
            // 发回客户端数据后的回调函数
            void on_data_send(const boost::system::error_code& ec) {
                if(ec) {
                    std::cerr << "authorizer.on_data_send: error during send response: " << ec << '\n';
                    assert(false);
                }
    
                connection_.shutdown();
            }
    };
    
    // 全局的是否auth成功的标志
    bool g_authed = false;
    
    void finish_socket_auth_task(
        const boost::system::error_code& ec,
        std::size_t bytes_transferred,
        const tcp_connection_ptr& connection,
        const boost::shared_ptr<std::string>& data
    ) {
        if(ec && ec != boost::asio::error::eof) {
            std::cerr << "finsh_socket_auth_task: Client error on recieve: " << ec.message() << '\n';
            assert(false);
        }
    
        if(bytes_transferred != 2) {
            std::cerr << "finish_socket_auth_task: wrong bytes count\n";
            assert(false);
        }
    
        data->resize(bytes_transferred);
        if(*data != "OK") {
            std::cerr << "finish_socket_auth_task: wrong response " << *data << '\n';
            assert(false);
        }
    
        g_authed = true;
        // 关闭TCP双向连接
        connection.shutdown();
        tasks_processor::get().stop();
    }
    
    // 接收认证服务器发回的消息的函数
    void receive_auth_task(const boost::system::error_code& ec, const tcp_connection_ptr& connection, 
        const boost::shared_ptr<std::string>& data) {
        if(ec) {
            std::cerr << "receive_auth_task: client error on receive: " << ec.message() << '\n';
            assert(false);
        }
    
        tcp_connection_ptr soc(connection);
        soc.async_read(
            boost::asio::buffer(&(*data)[0], data->size()),
            [soc, data](const boost::system::error_code& ec, std::size_t bytes_transferred) {
                    finish_socket_auth_task(ec, bytes_transferred, soc, data);
            },
            1
        );
    
    }
    
    const unsigned short port_num = 65001;
    // 客户端发送auth的函数
    void send_auth_task() {
        tcp_connection_ptr soc = tasks_processor::get().create_connection("127.0.0.1", port_num);
        boost::shared_ptr<std::string> data = boost::make_shared<std::string>("authname");
    
        soc.async_write(
            boost::asio::buffer(*data),
            [soc, data](const boost::system::error_code& ec, std::size_t bytes_transferred) {
                receive_auth_task(ec, soc, data);
            }
        );
    }
    
    int main(int argc, char* argv[]) {
        // 一秒钟之后发起请求,因为要等服务器先启动起来
        tasks_processor::get().run_after(boost::posix_time::seconds(1), &send_auth_task);
        // 在65001的TCP V4端口,启动auth服务器
        tasks_processor::get().add_listener(port_num, &authorizer::on_connection_accept);
        
        // 没有开始跑,
        assert(!g_authed);
    
        tasks_processor::get().start();
        assert(g_authed);
        return 0;
    }
    

    utils/tasks_processor_base.hpp

    #ifndef _CHAPTOR06_06_TASKS_PROCESSOR_BASE_HPP_
    #define _CHAPTOR06_06_TASKS_PROCESSOR_BASE_HPP_
    
    #include <boost/noncopyable.hpp>
    #include <boost/thread/thread.hpp>
    #include <boost/asio/io_service.hpp>
    
    #include <iostream>
    
    namespace detail {
        // 封装task,避免异常发生中断执行
        template <class T>
        struct task_wrapped {
            private:
                T task_unwrapped_;
            
            public:
                explicit task_wrapped(const T& task_unwrapped):
                    task_unwrapped_(task_unwrapped) {}
                
                void operator()() const {
                    // 重置中断点
                    try {
                        boost::this_thread::interruption_point();
                    } catch(const boost::thread_interrupted&) {
    
                    }
    
                    try {
                        // 执行任务
                        task_unwrapped_();
                    } catch(const std::exception& ex) {
                        std::cerr << "Exception: " << ex.what() << '\n';
                    } catch(const boost::thread_interrupted&) {
                        std::cerr << "Thread interrupted\n";
                    } catch(...) {
                        std::cerr << "Unknown exception\n";
                    }
                }
        };
    
        template <class T>
        inline task_wrapped<T> make_task_wrapped(const T& task_unwrapped) {
            return task_wrapped<T>(task_unwrapped);
        }
    };
    
    namespace tp_base {
        class tasks_processor: private boost::noncopyable {
            protected:
                boost::asio::io_service ios_;
                boost::asio::io_service::work work_;
    
                tasks_processor():
                    ios_(),
                    work_(ios_)
                 {}
                
            public:
                static tasks_processor& get();
    
                template <class T>
                inline void push_task(const T& task_unwrapped) {
                    ios_.post(detail::make_task_wrapped(task_unwrapped));
                }
    
                void start() {
                    ios_.run();
                }
    
                void stop() {
                    ios_.stop();
                }
        };
    };
    #endif
    

    utils/tasks_processor_timer.hpp

    #ifndef _CHAPTOR06_06_TASKS_PROCESSOR_TIMER_HPP_
    #define _CHAPTOR06_06_TASKS_PROCESSOR_TIMER_HPP_
    // 给基础的tasks_processor_base加上timer功能
    
    #include "tasks_processor_base.hpp"
    
    #include <boost/asio/io_service.hpp>
    #include <boost/asio/deadline_timer.hpp>
    #include <boost/system/error_code.hpp>
    #include <boost/make_shared.hpp>
    
    #include <iostream>
    
    namespace detail {
        using duration_type_t = boost::asio::deadline_timer::duration_type;
    
        template <class Functor>
        struct timer_task: public task_wrapped<Functor> {
            private:
                using base_t = task_wrapped<Functor>;
                boost::shared_ptr<boost::asio::deadline_timer> timer_;
    
            public:
                template <class Time>
                explicit timer_task(boost::asio::io_service& ios,
                    const Time& duration_or_time,
                    const Functor& task_unwrapped):
                        base_t(task_unwrapped),
                        timer_(boost::make_shared<boost::asio::deadline_timer>(
                            ios, duration_or_time
                            )) 
                {
               
                }
    
                void push_task() const {
                    timer_->async_wait(*this);
                }
    
                void operator()(const boost::system::error_code& error) const {
                    if(!error) {
                        base_t::operator()();
                    } else {
                        std::cerr << error << '\n';
                    }
                }
        };
    
        template <class Time, class Functor>
        inline timer_task<Functor> make_timer_task(
            boost::asio::io_service& ios,
            const Time& duration_or_time,
            const Functor& task_unwrapped
        ) {
            return timer_task<Functor>(ios, duration_or_time, task_unwrapped);
        }
    }; // namespace detail
    
    namespace tp_timers {
        class tasks_processor: public tp_base::tasks_processor {
            public:
                static tasks_processor& get();
    
                using duration_type_t = boost::asio::deadline_timer::duration_type;
                template <class Functor>
                void run_after(duration_type_t duration, const Functor& f) {
                    detail::make_timer_task(ios_, duration, f).push_task();
                }
    
                using time_type_t = boost::asio::deadline_timer::time_type;
                template <class Functor>
                void run_at(time_type_t time, const Functor& f) {
                    detail::make_timer_task(ios_, time, f).push_task();
                }
        };
    }; // namespace tp_timers
    #endif
    

    utils/tasks_processor_network.hpp

    #ifndef _CHAPTOR06_06_TASKS_PROCESSOR_NETWORK_HPP_
    #define _CHAPTOR06_06_TASKS_PROCESSOR_NETWORK_HPP_
    // 给task_processor_timer加上网络处理功能
    #include "tasks_processor_timer.hpp"
    
    #include <boost/asio/ip/tcp.hpp>
    #include <boost/asio/placeholders.hpp>
    #include <boost/asio/write.hpp>
    #include <boost/asio/read.hpp>
    #include <boost/shared_ptr.hpp>
    #include <boost/function.hpp>
    #include <boost/enable_shared_from_this.hpp>
    #include <boost/lexical_cast.hpp>
    
    #include <map>
    
    // 将当前TCP连接的套接字
    // 和读取,写入,关闭函数绑定
    class tcp_connection_ptr {
    
        boost::shared_ptr<boost::asio::ip::tcp::socket> socket_;
        public:
    
            explicit tcp_connection_ptr(boost::shared_ptr<boost::asio::ip::tcp::socket> socket):
                socket_(socket)
             {}
    
            explicit tcp_connection_ptr(
                boost::asio::io_service& ios,
                const boost::asio::ip::tcp::endpoint& endpoint
            ):
                  socket_(boost::make_shared<boost::asio::ip::tcp::socket>(ios))
            {    
                    socket_->connect(endpoint);
            }
    
            template <class Functor>
            void async_write(
                const boost::asio::const_buffers_1& buf, const Functor& f
            ) {
                boost::asio::async_write(*socket_, buf, f);
            }
            
            template <class Functor>
            void async_write(
                const boost::asio::mutable_buffers_1& buf, const Functor& f
            ) {
                boost::asio::async_write(*socket_, buf, f);
            }
    
            template <class Functor>
            void async_read(
                const boost::asio::mutable_buffers_1& buf, 
                const Functor& f,
                std::size_t at_least_bytes
            ) {
                boost::asio::async_read(
                    *socket_, buf, boost::asio::transfer_at_least(at_least_bytes), f
                );
            }
    
            void shutdown() const {
                socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both);
                socket_->close();
            }
    };
    
    namespace detail {
        // tcp 接收器类,循环接收TCP连接,
        class tcp_listener: public boost::enable_shared_from_this<tcp_listener> {
            using acceptor_t = boost::asio::ip::tcp::acceptor;
            acceptor_t acceptor_;
            boost::asio::io_service& io_service_;
    
            boost::function<void(tcp_connection_ptr)> func_;
    
            public:
                template <class Functor> 
                tcp_listener(
                    boost::asio::io_service& io_service,
                    unsigned short port,
                    const Functor& task_unwrapped
                ):
                    io_service_(io_service),
                    acceptor_(io_service, boost::asio::ip::tcp::endpoint(
                        boost::asio::ip::tcp::v4(), port
                    )),
                    func_(task_unwrapped)
                 {}
                
    
                void stop() {
                    acceptor_.close();
                }
    
                // 主要用来接收连接,串起接收链
                void push_task() {
                    // acceptor 关闭, 不能再接收连接
                    if(!acceptor_.is_open()) {
                        return;
                    }
    
                    using socket_t = boost::asio::ip::tcp::socket;
                    boost::shared_ptr<socket_t> socket = 
                        boost::make_shared<socket_t>(io_service_);
    
                    auto self(shared_from_this());
                    tcp_connection_ptr new_connection(socket);
                    acceptor_.async_accept(*socket, [self, new_connection](const boost::system::error_code& ec) {
                        self->handle_accept(new_connection, ec);
                    }); 
    
                }
            
            private:
                void handle_accept(
                    const tcp_connection_ptr& new_connection,
                    const boost::system::error_code& error
                ) {
                    // 继续串起接收链,接收下一个新连接
                    push_task();
                    // 说明async_accept成功
                    if(!error) {
                        auto self(shared_from_this());
                        // 连接成功以后,构造一个task_wrapped,执行一下函数
                        make_task_wrapped([self, new_connection](){
                            self->func_(new_connection);
                        })
                        ();
                    } else {
                        std::cerr << error << '\n';
                    }
                }
    
    
        };
    }; // namespace detail
    
    
    namespace tp_network {
        class tasks_processor: public tp_timers::tasks_processor {
            using listeners_map_t = std::map<
                unsigned short, 
                boost::shared_ptr<detail::tcp_listener>
            >;
            listeners_map_t listeners;
    
            public:
                // 获取全局单例的get方法
                static tasks_processor& get();
    
                template <class Functor>
                void add_listener(unsigned short port_num, const Functor& f) {
                    auto it = listeners.find(port_num);
                    // 这个端口在listeners map中已经存在,不需要重复添加
                    if(it != listeners.end()) {
                        throw std::logic_error(
                            "Such listener for port '" +
                            boost::lexical_cast<std::string>(port_num) +
                            "' already created"
                        );
                    }
                    listeners[port_num] = boost::make_shared<detail::tcp_listener>(boost::ref(ios_), port_num, f);
                    listeners[port_num]->push_task(); // 开始接收连接,之后一直接收
                }
    
                void remove_listener(unsigned short port_num) {
                    auto it = listeners.find(port_num);
                    // 没找着,没法移除
                    if(it == listeners.end()) {
                        throw std::logic_error(
                            "No listener for port '" +
                            boost::lexical_cast<std::string>(port_num) +
                            "' created"
                        );
                    } 
                    (*it).second->stop();
                    listeners.erase(it);
                }
                
                tcp_connection_ptr create_connection(const char* addr, unsigned short port_num) {
                    return tcp_connection_ptr(ios_, boost::asio::ip::tcp::endpoint(
                        boost::asio::ip::address_v4::from_string(addr),
                        port_num
                    ));
                }
        };
    
        tasks_processor& tasks_processor::get() {
            static tasks_processor proc;
            return proc;
        }
    }; // namespace tp_network
    #endif
    

    程序应该没有输出就是正常的。

    相关文章

      网友评论

          本文标题:使用tasks_processor管理网络请求

          本文链接:https://www.haomeiwen.com/subject/dftklltx.html