前言
Pistache 是面向C++的Web 服务器,阅读源码可以很好的理解如何实现一个高并发的服务器,对TCP、HTTP等的理解也会更加深入。
本系列主要从两个方面全面的解析Pistache的源码
- Server的启动过程
- 请求到达时的处理过程
因为C++是面向对象的设计,设计的很多的Class,因此我的方法就是,首先按执行流程,解析上述两个过程都进行了那些操作,然后针对期间使用的类进行详细的解释。
一、Server的初始化
我们以官方的的实例代码为例:
int main(int argc, char* argv[])
{
Port port(9080); // 端口
int thr = 2; // worker线程数
Address addr(Ipv4::any(), port);
auto server = std::make_shared<Http::Endpoint>(addr);
auto opts = Http::Endpoint::options().threads(thr);
server->init(opts);
server->setHandler(Http::make_handler<MyHandler>());
server->serve();
}
可见启动一个服务器端的代码还是很少的,主要的事情包括:
- 使用Any地址和指定端口,定义一个Endpoint对象
- 初始化Endpoint
- 配置Handler,这是定义用户处理函数的入口
- 最后启动Endpoint对象
我们一步一步来看:
1.1 定义Endpoint对象
Endpoint 类的详细解析,见 《 Pistache源码分析 —— Endpoint类 》
可以看到,定义Endpoint对象,最主要的工作就是,初始化listener字段,关于Listener类,详见《 Pistache源码分析 —— Listener类 》。
初始化listener的结构函数中,主要是初始化addr_和transportFactory_两个字段。transportFactory_是一个函数,看名字就知道,他负责创建transport对象,而Transport类就是用于工作线程的处理函数,他主要负责当监听到用户请求时,调用Request类进行解析,然后交给用户处理程序处理,最后调用Respone类的功能将数据写回,详见《 Pistache源码分析 —— Transport类 》。
Endpoint::Endpoint(const Address& addr)
: listener(addr)
{ }
Listener::Listener(const Address& address)
: addr_(address)
, transportFactory_(defaultTransportFactory())
{ }
using TransportFactory = std::function<std::shared_ptr<Transport>()>;
Listener::TransportFactory Listener::defaultTransportFactory() const
{
return [&] {
if (!handler_)
throw std::runtime_error("setHandler() has not been called");
return std::make_shared<Transport>(handler_);
};
}
1.2 初始化Endpoint
-
初始化了Endpoint::option
各个字段代表的含义可在相关Endpoint类的解析中查询
Endpoint::Options::Options()
: threads_(1)
, flags_()
, backlog_(Const::MaxBacklog)
, maxRequestSize_(Const::DefaultMaxRequestSize)
, maxResponseSize_(Const::DefaultMaxResponseSize)
, headerTimeout_(Const::DefaultHeaderTimeout)
, bodyTimeout_(Const::DefaultBodyTimeout)
, logger_(PISTACHE_NULL_STRING_LOGGER)
{ }
初始化的代码,也是很短,主要还是初始化各个字段:
void Endpoint::init(const Endpoint::Options& options)
{
listener.init(options.threads_, options.flags_, options.threadsName_);
listener.setTransportFactory([this, options] {
if (!handler_)
throw std::runtime_error("Must call setHandler()");
auto transport = std::make_shared<TransportImpl>(handler_);
transport->setHeaderTimeout(options.headerTimeout_);
transport->setBodyTimeout(options.bodyTimeout_);
return transport;
});
options_ = options;
logger_ = options.logger_;
}
-
初始化listener
在前面的构造函数中,仅仅给addr_和transportFactory_赋值,在这里对listener的各个字段进行初始化
void Listener::init(size_t workers, Flags<Options> options,
const std::string& workersName, int backlog,
PISTACHE_STRING_LOGGER_T logger)
{
if (workers > hardware_concurrency())
{
// Log::warning() << "More workers than available cores"
}
options_ = options;
backlog_ = backlog;
useSSL_ = false;
workers_ = workers;
workersName_ = workersName;
logger_ = logger;
}
-
重置transportFactory_
原来在构造函数中,我们用Transport类构造了transportFactory_,在这里使用了TransportImpl,TransportImpl是Transport子类,添加了HTTP连接的超时机制,即在setTransportFactory()中的setHeaderTimeout和setBodyTimeout,TransportImpl实现了如果在超时时间内没有新的数据到达,那么将断开连接。详细见对于Transport类的解析。 -
配置Endpoint::options和logger_
logger_在options的初始化中指定了是PISTACHE_NULL_STRING_LOGGER
,所以是不起作用的。
1.3 配置用户处理类 Handler
用户自定义处理类,继承自Http::Handler,详见《 Pistache源码分析 —— Handler类 》。
这个过程,真的好简单的:
void Endpoint::setHandler(const std::shared_ptr<Handler>& handler)
{
handler_ = handler; //设置endpoint的handler_字段
handler_->setMaxRequestSize(options_.maxRequestSize_); // 设置handler_的最大请求字节数
handler_->setMaxResponseSize(options_.maxResponseSize_);// 设置handler_的最大响应字节数
}
1.4 开始运行
有两个运行接口,分别调用了Tcp::Listener::run
和Tcp::Listener::runThreaded
,调用接口使用了serveImpl进行封装。
void Endpoint::serve() { serveImpl(&Tcp::Listener::run); }
void Endpoint::serveThreaded() { serveImpl(&Tcp::Listener::runThreaded); }
先看serveImpl封装函数:
void serveImpl(Method method)
{
#define CALL_MEMBER_FN(obj, pmf) ((obj).*(pmf))
if (!handler_)
throw std::runtime_error("Must call setHandler() prior to serve()");
listener.setHandler(handler_);
listener.bind();
CALL_MEMBER_FN(listener, method)
();
#undef CALL_MEMBER_FN
}
};
主要分为三个步骤
- 检查第三步handler_,并将其赋值给listener
- 执行bind()操作
- 调用method方法
在这里核心的就是bind操作,他的主要任务就是创建server-fd,即执行socket、bind、listen等操作,accept是在epoll收到可读事件后执行的操作,这属于请求处理的部分。
1.4.1 bind
void Listener::bind() { bind(addr_); }
void Listener::bind(const Address& address)
{
addr_ = address;
struct addrinfo hints;
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = address.family();
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
hints.ai_protocol = 0;
const auto& host = addr_.host();
const auto& port = addr_.port().toString();
AddrInfo addr_info;
TRY(addr_info.invoke(host.c_str(), port.c_str(), &hints));
int fd = -1;
const addrinfo* addr = nullptr;
for (addr = addr_info.get_info_ptr(); addr; addr = addr->ai_next)
{
auto socktype = addr->ai_socktype;
if (options_.hasFlag(Options::CloseOnExec))
socktype |= SOCK_CLOEXEC;
fd = ::socket(addr->ai_family, socktype, addr->ai_protocol);
if (fd < 0)
continue;
setSocketOptions(fd, options_);
if (::bind(fd, addr->ai_addr, addr->ai_addrlen) < 0)
{
close(fd);
continue;
}
TRY(::listen(fd, backlog_));
break;
}
// At this point, it is still possible that we couldn't bind any socket. If it
// is the case, the previous loop would have exited naturally and addr will be
// null.
if (addr == nullptr)
{
throw std::runtime_error(strerror(errno));
}
make_non_blocking(fd);
poller.addFd(fd, Flags<Polling::NotifyOn>(Polling::NotifyOn::Read),
Polling::Tag(fd));
listen_fd = fd;
auto transport = transportFactory_();
reactor_.init(Aio::AsyncContext(workers_, workersName_));
transportKey = reactor_.addHandler(transport);
}
- 创建 server-fd(或者叫listen-fd)
代码的1-49行,就是非常标准的基于TCP、IPV4的网络编程的流程,首先使用getaddrinfo(),将Ipv4::Any转化为addrinfo结构,然后基于addrinfo的数据执行socket()、bing()、listen()等一系列操作。对于这一部分,可以参考我翻译的一系列手册结合其他的博客来补充知识:getaddrinfo(3)、getnameinfo(3)、socket(7)、socket(2)、bind(2)、listen(2)、accept(2)等。 -
make_non_blocking(fd);
这一步骤是将server-fd设置为非阻塞,这样在执行accept时,如果当前没有连接到达,那么不会进行阻塞操作,而返回EAGAIN或EWOULDBLOCK错误,其实在我们使用epoll机制之后,阻塞的情况一般是不会发生的,但是也不一定,可以参见accept(2)的NOTES部分。 -
poller.addFd(fd, Flags<Polling::NotifyOn>(Polling::NotifyOn::Read),Polling::Tag(fd));
将server-fd放到epoll中,并监听其Read事件,对于被listen(2)过得fd而言,读事件表示收到了连接请求。注意poller是Listener的成员变量 listen_fd = fd;
-
auto transport = transportFactory_();
reactor_.init(Aio::AsyncContext(workers_, workersName_));
transportKey = reactor_.addHandler(transport);
这三行是极其重要的代码,涉及的操作非常复杂,我们将在Transport类和Reactor类中展开解析.
transportFactory_()是我们在前面赋值的函数类型的成员变量:
listener.setTransportFactory([this, options] {
if (!handler_)
throw std::runtime_error("Must call setHandler()");
auto transport = std::make_shared<TransportImpl>(handler_);
transport->setHeaderTimeout(options.headerTimeout_);
transport->setBodyTimeout(options.bodyTimeout_);
return transport;
});
其核心作用就是构造一个Transport对象。
reactor_是Reactor的对象,负责管理工作线程,因此在初始化reactor_时,以工作线程个数作为参数。而我们前面提到:“Transport类就是用于工作线程的处理函数,他主要负责当监听到用户请求时,调用Request类进行解析,然后交给用户处理程序处理,最后调用Respone类的功能将数据写回”。因此在这里我们将处理函数transport赋值给了工作线程管理类Reactor。
在执行reactor_.addHandler是返回了一个key值,并复制给了Listener的成员变量transportKey,这是因为在设计上,Reactor可以拥有多个处理函数,因此在设计上这个key值就是处理函数的索引,但是在实现上,目前来说,只有一个处理函数。
需要注意,到目前,一共出现了两个处理函数(更精准的叫法应该是处理类),一个是用户自定义的HTTP::Handler,这个主要是用于处理用户请求的;另一个则是Transport类,他本身是作为Reactor类处理函数出现的,因为线程的最终目的还是要处理用户请求,实际上就是通过Transport类来调用HTTP::Handler的接口来实现的,而且Transport类的父类是Pistache::AIO::Handler,HTTP::Handler的父类是Pistache::TCP::Handler,的确就是有两个handler.反正自己好好理解吧,实在不行请参考:Transport类和Reactor类
1.4.2 run
之前提到有两个执行接口:
void Endpoint::serve() { serveImpl(&Tcp::Listener::run); }
void Endpoint::serveThreaded() { serveImpl(&Tcp::Listener::runThreaded); }
主要对应了Listener的两个Run方法 run 和 runThreaded:
// runThreaded区别于run的地方在于,runThreaded会启动一个新的线程来执行run()
// 这样主线程就会从runThreaded返回
void Listener::runThreaded()
{
shutdownFd.bind(poller);
acceptThread = std::thread([=]() { this->run(); });
}
两者的区别就在于是否需要启动一个新的线程在执行epoll.wait
因此我们主要来看run()的执行过程:
void Listener::run()
{
// shutdownFd使用的是Linux的eventfd机制
// 使用epoll监控shutdownFd,当shutdownFd被写入数据(即执行shutdownFd.notify)后变为可读,
// 从而被poller捕获,从而终止下面的for循环
if (!shutdownFd.isBound())
shutdownFd.bind(poller);
reactor_.run();
for (;;)
{
std::vector<Polling::Event> events;
int ready_fds = poller.poll(events);
if (ready_fds == -1)
{
throw Error::system("Polling");
}
for (const auto& event : events)
{
if (event.tag == shutdownFd.tag())
return;
if (event.flags.hasFlag(Polling::NotifyOn::Read))
{
auto fd = event.tag.value();
if (static_cast<ssize_t>(fd) == listen_fd)
{
try
{
handleNewConnection();
}
catch (SocketError& ex)
{
PISTACHE_LOG_STRING_WARN(logger_, "Socket error: " << ex.what());
}
catch (ServerError& ex)
{
PISTACHE_LOG_STRING_FATAL(logger_, "Server error: " << ex.what());
throw;
}
}
}
}
}
}
-
reactor_.run();
启动工作线程,细节就放到Reactor类中了。 -
int ready_fds = poller.poll(events);
开始监听server-fd,poller中,我们仅仅放入了一个fd,那就是在上一步的bing阶段放入的server-fd,用于监听新的连接到达,关于epoll机制,可以参考我翻译的手册:《linux手册翻译——epoll(7)》。 -
handleNewConnection();
连接到达,开始处理新的连接。
到这里启动过程已经完成,就等新连接到达然后被epoll捕获,进而调用handleNewConnection()
,那如何处理新连接和HTTP请求呢:
二、处理请求
这里需要区别,处理连接和处理请求的区别,处理连接指的是和用户建立TCP的连接 ,即执行accept操作,而处理请求是指,建立连接后,收到HTTP请求的处理过程。
void Listener::handleNewConnection()
{
struct sockaddr_in peer_addr;
int client_fd = acceptConnection(peer_addr);
make_non_blocking(client_fd);
std::shared_ptr<Peer> peer;
peer = Peer::Create(client_fd, Address::fromUnix(&peer_addr));
dispatchPeer(peer);
};
2.1 handleNewConnection()
在调用函数之前,epoll已经捕获到了新连接的到达,handleNewConnection()的主要工作包括:
-
2.1.1 accept() 新连接,返回client-fd和用户的地址信息
主要是调用了acceptConnection()函数,此函数封装了accept4(),详细可以参考我翻译的accept(2)手册 -
2.1.2 设置client-fd为非阻塞
这里必须要设置为非阻塞,因为我们需要使用ET即边缘触发模式来监控client-fd,有关边缘模式以及要设置为非阻塞的讨论见我翻译的epoll(7)手册 -
2.1.3 创建Peer对象
见《Pistache源码分析 —— Peer类》
我们为没一个新连接都创建了一个peer对象,用于保存的信息:client-fd及其地址信息 -
2.1.4 将连接派发给一个woker线程
dispatchPeer(peer);
2.2 dispatchPeer(peer);
void Listener::dispatchPeer(const std::shared_ptr<Peer>& peer)
{
auto handlers = reactor_.handlers(transportKey);
auto idx = peer->fd() % handlers.size();
auto transport = std::static_pointer_cast<Transport>(handlers[idx]);
transport->handleNewPeer(peer);
}
-
2.2.1 获取reactor_在指定key下的所有handlers
key 值是在前面的bind()函数的最后生成的:
void Listener::bind() { bind(addr_); }
void Listener::bind(const Address& address)
{
...
auto transport = transportFactory_();
reactor_.init(Aio::AsyncContext(workers_, workersName_));
transportKey = reactor_.addHandler(transport);
}
我之前说过,一个reactor_在设计上可以拥有多个handler,transportKey值就是reactor_的handler索引,但是实现上仅仅添加了一次,因此key的值是0。
但是reactor_.handlers(transportKey)是返回了多个handler,这是因为每当执行reactor_.addHandler()时,会将handler克隆多份,复制给reactor_管理的worker线程,因此reactor_.handlers(transportKey)其实是返回的是每个线程所对应的handler,他们都是我们传入的transport的克隆体,他们拥有相同的key值(即index=0)。
换句话说,每个reactor_可以拥有多种不同handler,不同的handler拥有自己的key值,即索引值。对于同一种handler,reactor_的每个worker线程都拥有一个改handler对象的克隆。而reactor_.handlers(transportKey)返回的就是指定key(索引)的所有worker线程的handler的集合。
这样没一个handler都唯一对应一个worker线程。
-
2.2.2 选择一个worker线程来处理新连接
方法很简单,通过模取的方式来循环的挑选 -
2.2.3 调用选中的handler来处理新连接
请注意,到目前为止,我们还在主线程上,我们选中了特定线程的handler,也即transport对象,那么怎么让对应线程知道呢?其实方法很多,比如搞一个线程队列,然后把连接丢到指定线程的队列中,线程循环检查队列等,缺点就是需要不断的检查。这时候,我们可以结合eventfd(参考我翻译的linux手册翻译——eventfd(2))和epoll实现高效的事件通知机制。
2.3 transport->handleNewPeer(peer);
void Transport::handleNewPeer(const std::shared_ptr<Tcp::Peer>& peer)
{
auto ctx = context();
const bool isInRightThread = std::this_thread::get_id() == ctx.thread();
if (!isInRightThread)
{
PeerEntry entry(peer);
peersQueue.push(std::move(entry));
}
else
{
handlePeer(peer);
}
int fd = peer->fd();
{
Guard guard(toWriteLock);
toWrite.emplace(fd, std::deque<WriteEntry> {});
}
}
- 检查当前线程是否匹配transport所绑定的线程
显然是不会匹配的,因为当前是在主线程中运行的,与transport绑定的是worker线程,之所以这里如此设计,是因为Pistache在设计上还支持单线程模式,类似与Nodejs,可以参考Netty 系列之 Netty 线程模型一文中的Reactor 单线程模型。但是在实现上,是以多线程模式为准的,即Reactor 多线程模型。 - 若不匹配,则将peer,放入peersQueue中
peersQueue就是我们前面提到的,是一种基于eventfd(2)实现的队列,每当向其push数据,都会向监听poller中发送事件,详细见《Pistache源码分析 ——PollableQueue类》,而这个poller就定义在每个线程的结构中,当收到事件通知后,就会调用处理函数:
void Transport::onReady(const Aio::FdSet& fds)
{
for (const auto& entry : fds)
{
if (entry.getTag() == writesQueue.tag())
{
handleWriteQueue();
}
else if (entry.getTag() == timersQueue.tag())
{
handleTimerQueue();
}
else if (entry.getTag() == peersQueue.tag())
{
handlePeerQueue();
}
else if (entry.getTag() == notifier.tag())
{
handleNotify();
}
将会在Transport::onReady处理事件,可以看到当接受到peersQueue的事件时,就会调用handlePeerQueue()。handlePeerQueue()的处理逻辑如下,即对当前Queue中数据(即待处理的peer),执行handlePeer(data->peer);
void Transport::handlePeerQueue()
{
for (;;)
{
auto data = peersQueue.popSafe();
if (!data)
break;
handlePeer(data->peer);
}
}
- 若匹配,则直接调用handlePeer(data->peer);逻辑见下:
- 最后进行执行与写操作相关的内容
见,《Pistache源码分析 —— 异步写机制》
2.4 handlePeer(data->peer);
void Transport::handlePeer(const std::shared_ptr<Peer>& peer)
{
int fd = peer->fd();
peers.insert(std::make_pair(fd, peer));
peer->associateTransport(this);
handler_->onConnection(peer);
reactor()->registerFd(key(), fd, NotifyOn::Read | NotifyOn::Shutdown,
Polling::Mode::Edge);
}
最重要的两步操作就是:
-
handler_->onConnection(peer);
我们在这里创建了Request对象,并将其保存到peer.data_中:
void Handler::onConnection(const std::shared_ptr<Tcp::Peer>& peer)
{
peer->putData(ParserData, std::make_shared<RequestParser>(maxRequestSize_));
}
Request对象是封装在RequestParser中的,RequestParser提供了解析HTTP请求,以填充request对象的方法,详见Request类和ParserImpl类
- 将client-fd注册到poller中
这样当接受到http请求时,将会被见监听到,这里同peersQueue的处理:
void Transport::onReady(const Aio::FdSet& fds){
...
else if (entry.isReadable())
{
auto tag = entry.getTag();
if (isPeerFd(tag))
{
auto& peer = getPeer(tag);
handleIncoming(peer);
}
...
}
可以看到,最终请求由handleIncoming(peer);
处理。
2.5 handleIncoming(peer);
此函数的主要作用就是从client-fd中获取数据,在Request类和ParserImpl类中我们提到,数据可能需要多次读取,因为我并不知道TCP传入的请求有多大,因此buffer
的大小是保守的,所以可能需要多次的读取,这也是epoll的ET模式的标准方式。这就需要我们的解析HTTP请求的方法支持多次读取获取数据,当获取到数据后,将执行 handler_->onInput(buffer, bytes, peer);
void Transport::handleIncoming(const std::shared_ptr<Peer>& peer)
{
char buffer[Const::MaxBuffer] = { 0 };
int fd = peer->fd();
for (;;)
{
ssize_t bytes;
bytes = recv(fd, buffer, Const::MaxBuffer, 0);
if (bytes == -1)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{
}
else
{
handlePeerDisconnection(peer);
}
break;
}
else if (bytes == 0)
{
handlePeerDisconnection(peer);
break;
}
else
{
handler_->onInput(buffer, bytes, peer);
}
}
}
2.6 handler_->onInput(buffer, bytes, peer);
此函数,将用于解析HTTP请求,以填充request对象。
void Handler::onInput(const char* buffer, size_t len,
const std::shared_ptr<Tcp::Peer>& peer)
{
auto parser = getParser(peer);
auto& request = parser->request;
try
{
if (!parser->feed(buffer, len))
{
parser->reset();
throw HttpError(Code::Request_Entity_Too_Large,
"Request exceeded maximum buffer size");
}
auto state = parser->parse();
if (state == Private::State::Done)
{
ResponseWriter response(request.version(), transport(), this, peer);
request.copyAddress(peer->address());
auto connection = request.headers().tryGet<Header::Connection>();
if (connection)
{
response.headers().add<Header::Connection>(connection->control());
}
else
{
response.headers().add<Header::Connection>(ConnectionControl::Close);
}
onRequest(request, std::move(response));
parser->reset();
}
}
catch (const HttpError& err)
{
ResponseWriter response(request.version(), transport(), this, peer);
response.send(static_cast<Code>(err.code()), err.reason());
parser->reset();
}
catch (const std::exception& e)
{
ResponseWriter response(request.version(), transport(), this, peer);
response.send(Code::Internal_Server_Error, e.what());
parser->reset();
}
}
主要的流程无非就是:
parser->feed(buffer, len)
auto state = parser->parse();
相关函数我们在ParserImpl类中有详细的描述。当state == Private::State::Done
为真时,表示数据完成。
此时:
- 初始化response对象,详细见ResponseWriter类
request.copyAddress(peer->address());
- 检查是否Http请求中是否设置了Connection: keep-alive
也就是是否使用长连接,如果是的话,那么在返回的响应中的头部也要写入此Header - 最后,调用
onRequest(request, std::move(response));
然后,就进入了用户的自定义的处理函数,传入的参数包括了:
- http请求信息:request
- 写http响应的接口:respone
至此,从服务器启动,到接受到用户连接再到接受到Http请求的完整过程就结束了。
以至于如何实现的异步写,已经多线程的Rector模型的实现,都在对应的类中详细展开。
完结,撒花~
2.7 后记
当用户的处理逻辑结束之后,onRequest(request, std::move(response))
函数返回。然后将调用parser->reset()
重置request对象。之后工作线程将返回到epoll的等待循环中,等待下一个事件。
其实下一个事件一般就是用户的异步写任务,即完成respone.send的后续,这部分我们将在《Pistache源码分析 —— 异步写机制》中展开。
网友评论