架构
基于boost::asio异步开源组件,实现了一个线程池。
异步服务器代码架构可参考boost源码里的样例async_tcp_echo_server.cpp的实现,如下。
#include <cstdlib>
#include <iostream>
#include <memory>
#include <utility>
#include <boost/asio.hpp>
using boost::asio::ip::tcp;
class session
: public std::enable_shared_from_this<session>
{
public:
session(tcp::socket socket)
: socket_(std::move(socket))
{
}
void start()
{
do_read();
}
private:
void do_read()
{
auto self(shared_from_this());
socket_.async_read_some(boost::asio::buffer(data_, max_length),
[this, self](boost::system::error_code ec, std::size_t length)
{
if (!ec)
{
do_write(length);
}
});
}
void do_write(std::size_t length)
{
auto self(shared_from_this());
boost::asio::async_write(socket_, boost::asio::buffer(data_, length),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if (!ec)
{
do_read();
}
});
}
tcp::socket socket_;
enum { max_length = 1024 };
char data_[max_length];
};
class server
{
public:
server(boost::asio::io_context& io_context, short port)
: acceptor_(io_context, tcp::endpoint(tcp::v4(), port))
{
do_accept();
}
private:
void do_accept()
{
acceptor_.async_accept(
[this](boost::system::error_code ec, tcp::socket socket)
{
if (!ec)
{
std::make_shared<session>(std::move(socket))->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
};
int main(int argc, char* argv[])
{
try
{
if (argc != 2)
{
std::cerr << "Usage: async_tcp_echo_server <port>\n";
return 1;
}
boost::asio::io_context io_context;
server s(io_context, std::atoi(argv[1]));
io_context.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}
线程池的实现
参考源码样例io_context_pool.cpp
,路径libs\asio\example\cpp03\http\server2
。
io_context_pool构造时传入线程池大小pool_size,创建出pool_size个asio::io_context
对象。在run()
接口中完成pool_size个thread的创建,并完成所有线程的join()
。提供一个get_io_context()
接口,获取一个可以使用的asio::io_context
对象。
整个过程
1、main函数中添加了一个额外的参数指定线程池大小,并在server类中声明了一个线程池类成员io_service_pool io_service_pool_
。线程池类的构造函数接收一个整型参数io_service_pool_size指定线程池大小。
2、在server类中创建connection实例时需要从线程池中获取asio::io_context
对象,这时使用线程池类的io_service_pool_.get_io_context()
获取asio::io_context
对象。
3、根据asio的约定,异步操作由哪个线程执行与其相关io_context对象有关。调用io_service::run()
的线程才能执行相关异步操作。因此要想实现线程池,只需要在线程池对象中创建多个io_service对象,并创建同样多的线程对象,在每个线程中都调用一个io_service对象的run()方法,这样通过在线程池中均匀的获取io_context对象,就可以实现将异步操作均匀的分配给多个线程来执行了。
先看看server类创建connection的代码:
void server::start_accept()
{
new_connection_.reset(new connection(io_service_pool_.get_io_service(), request_handler_));
acceptor_.async_accept(new_connection_->socket(),
boost::bind(&server::handle_accept, this, boost::asio::placeholders::error));
}
connection构造函数需要一个io_service对象,这里是从线程池中获取的,同时也就指定了这个connection类中的异步操作都由线程池中相应的线程来处理.
下面重点分析一下线程池类的定义和实现.io_service_pool从noncopyable继承,不能进行拷贝复制.
io_service_pool定义的数据成员:
std::vector<io_service_ptr> io_services_;//存放io_service对象的容器.
std::vector<work_ptr> work_; //存放工作者对象的容器
std::size_t next_io_service_; //指定下一个将被分配的io_service
io_service_pool定义的函数成员:
1、显式构造函数:
初始化next_io_service_为0,创建指定线程池大小个数的io_service对象和工作者对象,并分别存放在io_service_容器和work_容器中。
for (std::size_t i = 0; i < pool_size; ++i)//创建多个io_service和多个工作者对象
{
io_service_ptr io_service(new boost::asio::io_service);
work_ptr work(new boost::asio::io_service::work(*io_service));
io_services_.push_back(io_service);
work_.push_back(work);
}
2、run函数:
根据指定线程池大小创建多个线程,这些线程的执行函数为相应io_service对象的run方法.并让主线程阻塞等待所有线程执行完毕。
for (std::size_t i = 0; i < io_services_.size(); ++i)
{
boost::shared_ptr<boost::thread> thread(new boost::thread(
boost::bind(&boost::asio::io_service::run, io_services_[i])));
threads.push_back(thread);
}
//主线程一直阻塞等待 直到所有线程结束
for (std::size_t i = 0; i < threads.size(); ++i)
threads[i]->join();
3、stop函数:
调用所有io_service对象的stop方法,停止处理异步通信.
4、get_io_service函数:
这个函数根据next_io_service_数据成员获取在容器中一个io_service对象,并将next_io_service_加1,如果达到容器的最大个数,则置为0.实现循环获取io_service对象的目的。
boost::asio::io_service& io_service_pool::get_io_service()
{
//循环获取io_service对象 异步操作由创建io_service的线程执行,从而实现为线程池中的线程均匀分配任务
boost::asio::io_service& io_service = *io_services_[next_io_service_];
++next_io_service_;
if (next_io_service_ == io_services_.size())
next_io_service_ = 0;
return io_service;
}
最后注意存放在容器中的线程,io_service对象,工作者对象,线程对象都是使用shared_ptr指针保护的,保证了这些对象的自动释放.
这里创建了多个io_service对象,也可以只创建一个io_service对象,多个线程都执行io_service::run()函数,则异步操作可在这里线程中进行随机分配.请看server3范例。
openssl与gmssl共存——隐藏符号表
openssl与gmssl需要共存,但他们两者对外抛出的接口基本上一样,这样会到值接口冲突。
解决方法:
将gmssl编译成静态库,再封装一层gmsslwarp的动态库对外使用,其中gmsslwarp抛出的接口增加gm前缀,以避免和openssl中的接口冲突。那么其中gmssl静态库、封装的gmsslwarp动态库在编译时需要加上一个gcc编译参数-fvisibility=hidden
,以隐藏大部分未使用的接口。而同时在gmsslwarp动态库的实现的函数头文件前需要加上:__attribute__((visibility("default")))
,以保证对外可见。
fvisibility=hidden说明
-fvisibility=[default|internal|hidden|protected]
fvisibility=hidden
将函数名隐藏,需要暴露给用户的函数接口可以单独通过 __attribute__((visibility ("default")))
声明避免被隐藏。
__attribute__((visibility ("default"))) int func1()
{
return 1;
}
断连问题定位
现象
网关出现断连,日志中出现打开文件失败。
分析过程
1、复现
初步分析原因为文件句柄数使用超过了,可能存在某处文件描述符未关闭的资源泄露,使用ulimit –a/c
,查看和修改文件描述符最大限制。测试复现,发现多次连接断开后,总有几个SOCK未关闭。通过ps –eaf | grep -i vncgate
,查的进程。cd /proc/pid/fd
获取进程打开的文件描述符信息。ls –l | wc –l
该命令实时多执行几次,实时查看当前进程正打开的文件描述符个数。
通过lsof命令查询文件描述符命令:
lsof -p pid
其中NODE一列表示了文件描述的inode号
计算个数:
lsof -p pid | wc -l
2、代码跟踪
发现是sock句柄未关,则在代码中将所有相关连的sock句柄创建和销毁的地方增加日志打印,再运行服务,复现问题。打印信息包括:上下行、文件描述符、状态、inode号。最关键是的inode号,识别唯一的资源。使用如下接口从文件描述符获取对于的inode号。Linux的文件描述符FD与Inode
int getinode(int fd)
{
struct stat fileStat;
if (fstat(fd, &fileStat) < 0) {
return -1;
}
return fileStat.st_ino;
}
3、复现问题,分析日志
分析日志发现,确实有几次创建的sock句柄,并未走入关闭流程。
4、解决
分析代码,增加一个超时关闭机制。
5、卡死根本原因
vag关闭链接时只调用了shutdown,该接口是同步阻塞接口,此时需要客户端回馈一个消息(close_notify),而客户端代码里直接调用底层close方法关闭了ssl连接,未在之前调用shutdown方法,故close_notify消息一直不会被vag收到,故那个sock文件描述符一直不会被正常关闭。
解决办法在vag处增加一个超时关闭机制,即直接调用底层close方法关闭。防止因为网络原因,客户端的close_notify消息收不到的情况。
shutdown close区别
shutdown()函数可以选择关闭全双工连接的读通道或者写通道,如果两个通道同时关闭,则这个连接不能再继续通信。
close()函数会同时关闭全双工连接的读写通道,除了关闭连接外,还会释放套接字占用的文件描述符。而shutdown()只会关闭连接,但是不会释放占用的文件描述符。
GET/POST
采用curl实现了一套以REST为标准的GET/POST方法。
post:上报心跳、网关内存、CPU使用情况,连接个数等数据给管理平面。接收对应的应答结果。
网友评论