这个框架作者是在boost::asio库的基础上做二次开发,设计得挺骚的。
服务器和客户端都通过tasks_processor来启动和调度。如果加上io_service的多线程ios::run,就可以让服务器和客户端启动的task在多线程环境中执行。
加上timer_task这种调度任务后,可以使用boost::asio::deadline_timer来设定哪种任务先执行,哪种任务后执行,延迟到什么时候。
如果是标准的实时通信操作的话,客户端和服务器都应该写成一个
class tcp_client: public boost::enable_shared_from_this<tcp_client> 类似这样的类。
这样才便于持续操作。参考authorizer服务器的写法就可以。
当然认证服务器,写到作者这个样子就可以,毕竟只是一个demo,而且也不用在内存中保存历史认证信息,认证一次就够了。
没时间画类图了,直接上代码,
代码结构,
图片.png
CMakeLists.txt
cmake_minimum_required(VERSION 2.6)
project(lexical_cast)
add_definitions(-std=c++14)
include_directories("/usr/local/include")
link_directories("/usr/local/lib")
file( GLOB APP_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp)
foreach( sourcefile ${APP_SOURCES} )
file(RELATIVE_PATH filename ${CMAKE_CURRENT_SOURCE_DIR} ${sourcefile})
string(REPLACE ".cpp" "" file ${filename})
add_executable(${file} ${sourcefile})
target_link_libraries(${file} boost_filesystem boost_thread boost_system boost_serialization pthread boost_chrono)
endforeach( sourcefile ${APP_SOURCES} )
main.cpp
#include "utils/tasks_processor_network.hpp"
#include <boost/asio/read.hpp>
#include <boost/asio/write.hpp>
using namespace tp_network;
// 前向声明
class authorizer;
// 全局的authorizer_ptr声明
using authorizer_ptr = boost::shared_ptr<authorizer>;
class authorizer: public boost::enable_shared_from_this<authorizer> {
tcp_connection_ptr connection_;
boost::array<char, 512> message_;
explicit authorizer(const tcp_connection_ptr& connection):
connection_(connection) {
}
public:
// 至少读一个字节
static void on_connection_accept(const tcp_connection_ptr& connection) {
authorizer_ptr auth(new authorizer(connection));
auth->connection_.async_read(
boost::asio::buffer(auth->message_),
[auth](const boost::system::error_code& ec, std::size_t bytes_transferred) {
auth->on_data_received(ec, bytes_transferred);
},
1
);
}
void on_data_received(const boost::system::error_code& ec, std::size_t bytes_transferred) {
if(ec) {
std::cerr << "authorizer.on_data_received: error during receiving response: " << ec << '\n';
assert(false);
}
if(bytes_transferred == 0) {
std::cerr << "authorizer.on_data_received: zero bytes received\n";
assert(false);
}
// 现在我们在 message_中读到了数据,我们可以做一些验证登陆的工作
// ...
// 往客户端回写消息 ”OK“
message_[0] = 'O';
message_[1] = 'K';
std::size_t bytes_to_send = 2;
auto self(shared_from_this());
// 现在把"OK"写回去
connection_.async_write(
boost::asio::buffer(message_, bytes_to_send),
[self](const boost::system::error_code& ec, std::size_t bytes_transferred) {
self->on_data_send(ec);
}
);
}
// 发回客户端数据后的回调函数
void on_data_send(const boost::system::error_code& ec) {
if(ec) {
std::cerr << "authorizer.on_data_send: error during send response: " << ec << '\n';
assert(false);
}
connection_.shutdown();
}
};
// 全局的是否auth成功的标志
bool g_authed = false;
void finish_socket_auth_task(
const boost::system::error_code& ec,
std::size_t bytes_transferred,
const tcp_connection_ptr& connection,
const boost::shared_ptr<std::string>& data
) {
if(ec && ec != boost::asio::error::eof) {
std::cerr << "finsh_socket_auth_task: Client error on recieve: " << ec.message() << '\n';
assert(false);
}
if(bytes_transferred != 2) {
std::cerr << "finish_socket_auth_task: wrong bytes count\n";
assert(false);
}
data->resize(bytes_transferred);
if(*data != "OK") {
std::cerr << "finish_socket_auth_task: wrong response " << *data << '\n';
assert(false);
}
g_authed = true;
// 关闭TCP双向连接
connection.shutdown();
tasks_processor::get().stop();
}
// 接收认证服务器发回的消息的函数
void receive_auth_task(const boost::system::error_code& ec, const tcp_connection_ptr& connection,
const boost::shared_ptr<std::string>& data) {
if(ec) {
std::cerr << "receive_auth_task: client error on receive: " << ec.message() << '\n';
assert(false);
}
tcp_connection_ptr soc(connection);
soc.async_read(
boost::asio::buffer(&(*data)[0], data->size()),
[soc, data](const boost::system::error_code& ec, std::size_t bytes_transferred) {
finish_socket_auth_task(ec, bytes_transferred, soc, data);
},
1
);
}
const unsigned short port_num = 65001;
// 客户端发送auth的函数
void send_auth_task() {
tcp_connection_ptr soc = tasks_processor::get().create_connection("127.0.0.1", port_num);
boost::shared_ptr<std::string> data = boost::make_shared<std::string>("authname");
soc.async_write(
boost::asio::buffer(*data),
[soc, data](const boost::system::error_code& ec, std::size_t bytes_transferred) {
receive_auth_task(ec, soc, data);
}
);
}
int main(int argc, char* argv[]) {
// 一秒钟之后发起请求,因为要等服务器先启动起来
tasks_processor::get().run_after(boost::posix_time::seconds(1), &send_auth_task);
// 在65001的TCP V4端口,启动auth服务器
tasks_processor::get().add_listener(port_num, &authorizer::on_connection_accept);
// 没有开始跑,
assert(!g_authed);
tasks_processor::get().start();
assert(g_authed);
return 0;
}
utils/tasks_processor_base.hpp
#ifndef _CHAPTOR06_06_TASKS_PROCESSOR_BASE_HPP_
#define _CHAPTOR06_06_TASKS_PROCESSOR_BASE_HPP_
#include <boost/noncopyable.hpp>
#include <boost/thread/thread.hpp>
#include <boost/asio/io_service.hpp>
#include <iostream>
namespace detail {
// 封装task,避免异常发生中断执行
template <class T>
struct task_wrapped {
private:
T task_unwrapped_;
public:
explicit task_wrapped(const T& task_unwrapped):
task_unwrapped_(task_unwrapped) {}
void operator()() const {
// 重置中断点
try {
boost::this_thread::interruption_point();
} catch(const boost::thread_interrupted&) {
}
try {
// 执行任务
task_unwrapped_();
} catch(const std::exception& ex) {
std::cerr << "Exception: " << ex.what() << '\n';
} catch(const boost::thread_interrupted&) {
std::cerr << "Thread interrupted\n";
} catch(...) {
std::cerr << "Unknown exception\n";
}
}
};
template <class T>
inline task_wrapped<T> make_task_wrapped(const T& task_unwrapped) {
return task_wrapped<T>(task_unwrapped);
}
};
namespace tp_base {
class tasks_processor: private boost::noncopyable {
protected:
boost::asio::io_service ios_;
boost::asio::io_service::work work_;
tasks_processor():
ios_(),
work_(ios_)
{}
public:
static tasks_processor& get();
template <class T>
inline void push_task(const T& task_unwrapped) {
ios_.post(detail::make_task_wrapped(task_unwrapped));
}
void start() {
ios_.run();
}
void stop() {
ios_.stop();
}
};
};
#endif
utils/tasks_processor_timer.hpp
#ifndef _CHAPTOR06_06_TASKS_PROCESSOR_TIMER_HPP_
#define _CHAPTOR06_06_TASKS_PROCESSOR_TIMER_HPP_
// 给基础的tasks_processor_base加上timer功能
#include "tasks_processor_base.hpp"
#include <boost/asio/io_service.hpp>
#include <boost/asio/deadline_timer.hpp>
#include <boost/system/error_code.hpp>
#include <boost/make_shared.hpp>
#include <iostream>
namespace detail {
using duration_type_t = boost::asio::deadline_timer::duration_type;
template <class Functor>
struct timer_task: public task_wrapped<Functor> {
private:
using base_t = task_wrapped<Functor>;
boost::shared_ptr<boost::asio::deadline_timer> timer_;
public:
template <class Time>
explicit timer_task(boost::asio::io_service& ios,
const Time& duration_or_time,
const Functor& task_unwrapped):
base_t(task_unwrapped),
timer_(boost::make_shared<boost::asio::deadline_timer>(
ios, duration_or_time
))
{
}
void push_task() const {
timer_->async_wait(*this);
}
void operator()(const boost::system::error_code& error) const {
if(!error) {
base_t::operator()();
} else {
std::cerr << error << '\n';
}
}
};
template <class Time, class Functor>
inline timer_task<Functor> make_timer_task(
boost::asio::io_service& ios,
const Time& duration_or_time,
const Functor& task_unwrapped
) {
return timer_task<Functor>(ios, duration_or_time, task_unwrapped);
}
}; // namespace detail
namespace tp_timers {
class tasks_processor: public tp_base::tasks_processor {
public:
static tasks_processor& get();
using duration_type_t = boost::asio::deadline_timer::duration_type;
template <class Functor>
void run_after(duration_type_t duration, const Functor& f) {
detail::make_timer_task(ios_, duration, f).push_task();
}
using time_type_t = boost::asio::deadline_timer::time_type;
template <class Functor>
void run_at(time_type_t time, const Functor& f) {
detail::make_timer_task(ios_, time, f).push_task();
}
};
}; // namespace tp_timers
#endif
utils/tasks_processor_network.hpp
#ifndef _CHAPTOR06_06_TASKS_PROCESSOR_NETWORK_HPP_
#define _CHAPTOR06_06_TASKS_PROCESSOR_NETWORK_HPP_
// 给task_processor_timer加上网络处理功能
#include "tasks_processor_timer.hpp"
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/placeholders.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/read.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/function.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/lexical_cast.hpp>
#include <map>
// 将当前TCP连接的套接字
// 和读取,写入,关闭函数绑定
class tcp_connection_ptr {
boost::shared_ptr<boost::asio::ip::tcp::socket> socket_;
public:
explicit tcp_connection_ptr(boost::shared_ptr<boost::asio::ip::tcp::socket> socket):
socket_(socket)
{}
explicit tcp_connection_ptr(
boost::asio::io_service& ios,
const boost::asio::ip::tcp::endpoint& endpoint
):
socket_(boost::make_shared<boost::asio::ip::tcp::socket>(ios))
{
socket_->connect(endpoint);
}
template <class Functor>
void async_write(
const boost::asio::const_buffers_1& buf, const Functor& f
) {
boost::asio::async_write(*socket_, buf, f);
}
template <class Functor>
void async_write(
const boost::asio::mutable_buffers_1& buf, const Functor& f
) {
boost::asio::async_write(*socket_, buf, f);
}
template <class Functor>
void async_read(
const boost::asio::mutable_buffers_1& buf,
const Functor& f,
std::size_t at_least_bytes
) {
boost::asio::async_read(
*socket_, buf, boost::asio::transfer_at_least(at_least_bytes), f
);
}
void shutdown() const {
socket_->shutdown(boost::asio::ip::tcp::socket::shutdown_both);
socket_->close();
}
};
namespace detail {
// tcp 接收器类,循环接收TCP连接,
class tcp_listener: public boost::enable_shared_from_this<tcp_listener> {
using acceptor_t = boost::asio::ip::tcp::acceptor;
acceptor_t acceptor_;
boost::asio::io_service& io_service_;
boost::function<void(tcp_connection_ptr)> func_;
public:
template <class Functor>
tcp_listener(
boost::asio::io_service& io_service,
unsigned short port,
const Functor& task_unwrapped
):
io_service_(io_service),
acceptor_(io_service, boost::asio::ip::tcp::endpoint(
boost::asio::ip::tcp::v4(), port
)),
func_(task_unwrapped)
{}
void stop() {
acceptor_.close();
}
// 主要用来接收连接,串起接收链
void push_task() {
// acceptor 关闭, 不能再接收连接
if(!acceptor_.is_open()) {
return;
}
using socket_t = boost::asio::ip::tcp::socket;
boost::shared_ptr<socket_t> socket =
boost::make_shared<socket_t>(io_service_);
auto self(shared_from_this());
tcp_connection_ptr new_connection(socket);
acceptor_.async_accept(*socket, [self, new_connection](const boost::system::error_code& ec) {
self->handle_accept(new_connection, ec);
});
}
private:
void handle_accept(
const tcp_connection_ptr& new_connection,
const boost::system::error_code& error
) {
// 继续串起接收链,接收下一个新连接
push_task();
// 说明async_accept成功
if(!error) {
auto self(shared_from_this());
// 连接成功以后,构造一个task_wrapped,执行一下函数
make_task_wrapped([self, new_connection](){
self->func_(new_connection);
})
();
} else {
std::cerr << error << '\n';
}
}
};
}; // namespace detail
namespace tp_network {
class tasks_processor: public tp_timers::tasks_processor {
using listeners_map_t = std::map<
unsigned short,
boost::shared_ptr<detail::tcp_listener>
>;
listeners_map_t listeners;
public:
// 获取全局单例的get方法
static tasks_processor& get();
template <class Functor>
void add_listener(unsigned short port_num, const Functor& f) {
auto it = listeners.find(port_num);
// 这个端口在listeners map中已经存在,不需要重复添加
if(it != listeners.end()) {
throw std::logic_error(
"Such listener for port '" +
boost::lexical_cast<std::string>(port_num) +
"' already created"
);
}
listeners[port_num] = boost::make_shared<detail::tcp_listener>(boost::ref(ios_), port_num, f);
listeners[port_num]->push_task(); // 开始接收连接,之后一直接收
}
void remove_listener(unsigned short port_num) {
auto it = listeners.find(port_num);
// 没找着,没法移除
if(it == listeners.end()) {
throw std::logic_error(
"No listener for port '" +
boost::lexical_cast<std::string>(port_num) +
"' created"
);
}
(*it).second->stop();
listeners.erase(it);
}
tcp_connection_ptr create_connection(const char* addr, unsigned short port_num) {
return tcp_connection_ptr(ios_, boost::asio::ip::tcp::endpoint(
boost::asio::ip::address_v4::from_string(addr),
port_num
));
}
};
tasks_processor& tasks_processor::get() {
static tasks_processor proc;
return proc;
}
}; // namespace tp_network
#endif
程序应该没有输出就是正常的。
网友评论