美文网首页
Pistache源码分析 —— Server的初始化和请求处理

Pistache源码分析 —— Server的初始化和请求处理

作者: 蟹蟹宁 | 来源:发表于2021-06-29 10:02 被阅读0次

前言

Pistache 是面向C++的Web 服务器,阅读源码可以很好的理解如何实现一个高并发的服务器,对TCP、HTTP等的理解也会更加深入。

本系列主要从两个方面全面的解析Pistache的源码

  • Server的启动过程
  • 请求到达时的处理过程

因为C++是面向对象的设计,设计的很多的Class,因此我的方法就是,首先按执行流程,解析上述两个过程都进行了那些操作,然后针对期间使用的类进行详细的解释。

一、Server的初始化

我们以官方的的实例代码为例:

int main(int argc, char* argv[])
{
    Port port(9080);    // 端口
    int thr = 2;            // worker线程数
    Address addr(Ipv4::any(), port);
    auto server = std::make_shared<Http::Endpoint>(addr);
    auto opts = Http::Endpoint::options().threads(thr);
    server->init(opts);
    server->setHandler(Http::make_handler<MyHandler>());
    server->serve();
}

可见启动一个服务器端的代码还是很少的,主要的事情包括:

  • 使用Any地址和指定端口,定义一个Endpoint对象
  • 初始化Endpoint
  • 配置Handler,这是定义用户处理函数的入口
  • 最后启动Endpoint对象

我们一步一步来看:

1.1 定义Endpoint对象

Endpoint 类的详细解析,见 《 Pistache源码分析 —— Endpoint类 》

可以看到,定义Endpoint对象,最主要的工作就是,初始化listener字段,关于Listener类,详见《 Pistache源码分析 —— Listener类 》

初始化listener的结构函数中,主要是初始化addr_和transportFactory_两个字段。transportFactory_是一个函数,看名字就知道,他负责创建transport对象,而Transport类就是用于工作线程的处理函数,他主要负责当监听到用户请求时,调用Request类进行解析,然后交给用户处理程序处理,最后调用Respone类的功能将数据写回,详见《 Pistache源码分析 —— Transport类 》

Endpoint::Endpoint(const Address& addr)
        : listener(addr)
    { }

Listener::Listener(const Address& address)
        : addr_(address)
        , transportFactory_(defaultTransportFactory())
    { }

using TransportFactory = std::function<std::shared_ptr<Transport>()>;

Listener::TransportFactory Listener::defaultTransportFactory() const
{
    return [&] {
        if (!handler_)
            throw std::runtime_error("setHandler() has not been called");

        return std::make_shared<Transport>(handler_);
    };
}

1.2 初始化Endpoint

  • 初始化了Endpoint::option
    各个字段代表的含义可在相关Endpoint类的解析中查询
Endpoint::Options::Options()
    : threads_(1)
    , flags_()
    , backlog_(Const::MaxBacklog)
    , maxRequestSize_(Const::DefaultMaxRequestSize)
    , maxResponseSize_(Const::DefaultMaxResponseSize)
    , headerTimeout_(Const::DefaultHeaderTimeout)
    , bodyTimeout_(Const::DefaultBodyTimeout)
    , logger_(PISTACHE_NULL_STRING_LOGGER)
{ }

初始化的代码,也是很短,主要还是初始化各个字段:

void Endpoint::init(const Endpoint::Options& options)
{
    listener.init(options.threads_, options.flags_, options.threadsName_);
    listener.setTransportFactory([this, options] {
        if (!handler_)
            throw std::runtime_error("Must call setHandler()");
        auto transport = std::make_shared<TransportImpl>(handler_);
        transport->setHeaderTimeout(options.headerTimeout_);
        transport->setBodyTimeout(options.bodyTimeout_);
        return transport;
    });
    options_ = options;
    logger_  = options.logger_;
}
  • 初始化listener
    在前面的构造函数中,仅仅给addr_和transportFactory_赋值,在这里对listener的各个字段进行初始化
void Listener::init(size_t workers, Flags<Options> options,
                    const std::string& workersName, int backlog,
                    PISTACHE_STRING_LOGGER_T logger)
{
    if (workers > hardware_concurrency())
    {
        // Log::warning() << "More workers than available cores"
    }

    options_     = options;
    backlog_     = backlog;
    useSSL_      = false;
    workers_     = workers;
    workersName_ = workersName;
    logger_      = logger;
}
  • 重置transportFactory_
    原来在构造函数中,我们用Transport类构造了transportFactory_,在这里使用了TransportImpl,TransportImpl是Transport子类,添加了HTTP连接的超时机制,即在setTransportFactory()中的setHeaderTimeout和setBodyTimeout,TransportImpl实现了如果在超时时间内没有新的数据到达,那么将断开连接。详细见对于Transport类的解析。
  • 配置Endpoint::options和logger_
    logger_在options的初始化中指定了是PISTACHE_NULL_STRING_LOGGER,所以是不起作用的。

1.3 配置用户处理类 Handler

用户自定义处理类,继承自Http::Handler,详见《 Pistache源码分析 —— Handler类 》
这个过程,真的好简单的:

void Endpoint::setHandler(const std::shared_ptr<Handler>& handler)
    {
        handler_ = handler;  //设置endpoint的handler_字段
        handler_->setMaxRequestSize(options_.maxRequestSize_);  // 设置handler_的最大请求字节数
        handler_->setMaxResponseSize(options_.maxResponseSize_);// 设置handler_的最大响应字节数
    }

1.4 开始运行

有两个运行接口,分别调用了Tcp::Listener::runTcp::Listener::runThreaded,调用接口使用了serveImpl进行封装。

void Endpoint::serve() { serveImpl(&Tcp::Listener::run); }

void Endpoint::serveThreaded() { serveImpl(&Tcp::Listener::runThreaded); }

先看serveImpl封装函数:

        void serveImpl(Method method)
        {
#define CALL_MEMBER_FN(obj, pmf) ((obj).*(pmf))
            if (!handler_)
                throw std::runtime_error("Must call setHandler() prior to serve()");

            listener.setHandler(handler_);
            listener.bind();

            CALL_MEMBER_FN(listener, method)
            ();
#undef CALL_MEMBER_FN
        }
    };

主要分为三个步骤

  • 检查第三步handler_,并将其赋值给listener
  • 执行bind()操作
  • 调用method方法

在这里核心的就是bind操作,他的主要任务就是创建server-fd,即执行socket、bind、listen等操作,accept是在epoll收到可读事件后执行的操作,这属于请求处理的部分。

1.4.1 bind

void Listener::bind() { bind(addr_); }

void Listener::bind(const Address& address)
{
    addr_ = address;
    struct addrinfo hints;
    memset(&hints, 0, sizeof(struct addrinfo));
    hints.ai_family   = address.family();
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_flags    = AI_PASSIVE;
    hints.ai_protocol = 0;
    const auto& host = addr_.host();
    const auto& port = addr_.port().toString();
    AddrInfo addr_info;
    TRY(addr_info.invoke(host.c_str(), port.c_str(), &hints));

    int fd = -1;

    const addrinfo* addr = nullptr;
    for (addr = addr_info.get_info_ptr(); addr; addr = addr->ai_next)
    {
        auto socktype = addr->ai_socktype;
        if (options_.hasFlag(Options::CloseOnExec))
            socktype |= SOCK_CLOEXEC;

        fd = ::socket(addr->ai_family, socktype, addr->ai_protocol);
        if (fd < 0)
            continue;

        setSocketOptions(fd, options_);

        if (::bind(fd, addr->ai_addr, addr->ai_addrlen) < 0)
        {
            close(fd);
            continue;
        }

        TRY(::listen(fd, backlog_));
        break;
    }

    // At this point, it is still possible that we couldn't bind any socket. If it
    // is the case, the previous loop would have exited naturally and addr will be
    // null.
    if (addr == nullptr)
    {
        throw std::runtime_error(strerror(errno));
    }

    make_non_blocking(fd);
    poller.addFd(fd, Flags<Polling::NotifyOn>(Polling::NotifyOn::Read),
                 Polling::Tag(fd));
    listen_fd = fd;

    auto transport = transportFactory_();

    reactor_.init(Aio::AsyncContext(workers_, workersName_));
    transportKey = reactor_.addHandler(transport);
}
  • 创建 server-fd(或者叫listen-fd)
    代码的1-49行,就是非常标准的基于TCP、IPV4的网络编程的流程,首先使用getaddrinfo(),将Ipv4::Any转化为addrinfo结构,然后基于addrinfo的数据执行socket()、bing()、listen()等一系列操作。对于这一部分,可以参考我翻译的一系列手册结合其他的博客来补充知识:getaddrinfo(3)getnameinfo(3)socket(7)socket(2)bind(2)listen(2)accept(2)等。
  • make_non_blocking(fd);
    这一步骤是将server-fd设置为非阻塞,这样在执行accept时,如果当前没有连接到达,那么不会进行阻塞操作,而返回EAGAIN或EWOULDBLOCK错误,其实在我们使用epoll机制之后,阻塞的情况一般是不会发生的,但是也不一定,可以参见accept(2)的NOTES部分。
  • poller.addFd(fd, Flags<Polling::NotifyOn>(Polling::NotifyOn::Read),Polling::Tag(fd));
    将server-fd放到epoll中,并监听其Read事件,对于被listen(2)过得fd而言,读事件表示收到了连接请求。注意poller是Listener的成员变量
  • listen_fd = fd;
  • auto transport = transportFactory_();
    reactor_.init(Aio::AsyncContext(workers_, workersName_));
    transportKey = reactor_.addHandler(transport);
    这三行是极其重要的代码,涉及的操作非常复杂,我们将在Transport类Reactor类中展开解析.
    transportFactory_()是我们在前面赋值的函数类型的成员变量:
    listener.setTransportFactory([this, options] {
        if (!handler_)
            throw std::runtime_error("Must call setHandler()");
        auto transport = std::make_shared<TransportImpl>(handler_);
        transport->setHeaderTimeout(options.headerTimeout_);
        transport->setBodyTimeout(options.bodyTimeout_);
        return transport;
    });

其核心作用就是构造一个Transport对象。

reactor_是Reactor的对象,负责管理工作线程,因此在初始化reactor_时,以工作线程个数作为参数。而我们前面提到:“Transport类就是用于工作线程的处理函数,他主要负责当监听到用户请求时,调用Request类进行解析,然后交给用户处理程序处理,最后调用Respone类的功能将数据写回”。因此在这里我们将处理函数transport赋值给了工作线程管理类Reactor。

在执行reactor_.addHandler是返回了一个key值,并复制给了Listener的成员变量transportKey,这是因为在设计上,Reactor可以拥有多个处理函数,因此在设计上这个key值就是处理函数的索引,但是在实现上,目前来说,只有一个处理函数。

需要注意,到目前,一共出现了两个处理函数(更精准的叫法应该是处理类),一个是用户自定义的HTTP::Handler,这个主要是用于处理用户请求的;另一个则是Transport类,他本身是作为Reactor类处理函数出现的,因为线程的最终目的还是要处理用户请求,实际上就是通过Transport类来调用HTTP::Handler的接口来实现的,而且Transport类的父类是Pistache::AIO::Handler,HTTP::Handler的父类是Pistache::TCP::Handler,的确就是有两个handler.反正自己好好理解吧,实在不行请参考:Transport类Reactor类

1.4.2 run

之前提到有两个执行接口:
void Endpoint::serve() { serveImpl(&Tcp::Listener::run); }
void Endpoint::serveThreaded() { serveImpl(&Tcp::Listener::runThreaded); }
主要对应了Listener的两个Run方法 run 和 runThreaded:

    // runThreaded区别于run的地方在于,runThreaded会启动一个新的线程来执行run()
    // 这样主线程就会从runThreaded返回
    void Listener::runThreaded()
    {
        shutdownFd.bind(poller);
        acceptThread = std::thread([=]() { this->run(); });
    }

两者的区别就在于是否需要启动一个新的线程在执行epoll.wait

因此我们主要来看run()的执行过程:

    void Listener::run()
    {
        // shutdownFd使用的是Linux的eventfd机制
        // 使用epoll监控shutdownFd,当shutdownFd被写入数据(即执行shutdownFd.notify)后变为可读,
        // 从而被poller捕获,从而终止下面的for循环
        if (!shutdownFd.isBound())
            shutdownFd.bind(poller);
        reactor_.run();

        for (;;)
        {
            std::vector<Polling::Event> events;
            int ready_fds = poller.poll(events);

            if (ready_fds == -1)
            {
                throw Error::system("Polling");
            }
            for (const auto& event : events)
            {
                if (event.tag == shutdownFd.tag())
                    return;

                if (event.flags.hasFlag(Polling::NotifyOn::Read))
                {
                    auto fd = event.tag.value();
                    if (static_cast<ssize_t>(fd) == listen_fd)
                    {
                        try
                        {
                            handleNewConnection();
                        }
                        catch (SocketError& ex)
                        {
                            PISTACHE_LOG_STRING_WARN(logger_, "Socket error: " << ex.what());
                        }
                        catch (ServerError& ex)
                        {
                            PISTACHE_LOG_STRING_FATAL(logger_, "Server error: " << ex.what());
                            throw;
                        }
                    }
                }
            }
        }
    }
  • reactor_.run();
    启动工作线程,细节就放到Reactor类中了。
  • int ready_fds = poller.poll(events);
    开始监听server-fd,poller中,我们仅仅放入了一个fd,那就是在上一步的bing阶段放入的server-fd,用于监听新的连接到达,关于epoll机制,可以参考我翻译的手册:《linux手册翻译——epoll(7)》。
  • handleNewConnection();
    连接到达,开始处理新的连接。

到这里启动过程已经完成,就等新连接到达然后被epoll捕获,进而调用handleNewConnection(),那如何处理新连接和HTTP请求呢:

二、处理请求

这里需要区别,处理连接和处理请求的区别,处理连接指的是和用户建立TCP的连接 ,即执行accept操作,而处理请求是指,建立连接后,收到HTTP请求的处理过程。

void Listener::handleNewConnection()
{
    struct sockaddr_in peer_addr;
    int client_fd = acceptConnection(peer_addr);

    make_non_blocking(client_fd);

    std::shared_ptr<Peer> peer;
    peer = Peer::Create(client_fd, Address::fromUnix(&peer_addr));

    dispatchPeer(peer);
};

2.1 handleNewConnection()

在调用函数之前,epoll已经捕获到了新连接的到达,handleNewConnection()的主要工作包括:

  • 2.1.1 accept() 新连接,返回client-fd和用户的地址信息
    主要是调用了acceptConnection()函数,此函数封装了accept4(),详细可以参考我翻译的accept(2)手册
  • 2.1.2 设置client-fd为非阻塞
    这里必须要设置为非阻塞,因为我们需要使用ET即边缘触发模式来监控client-fd,有关边缘模式以及要设置为非阻塞的讨论见我翻译的epoll(7)手册
  • 2.1.3 创建Peer对象
    见《Pistache源码分析 —— Peer类
    我们为没一个新连接都创建了一个peer对象,用于保存的信息:client-fd及其地址信息
  • 2.1.4 将连接派发给一个woker线程
    dispatchPeer(peer);

2.2 dispatchPeer(peer);

void Listener::dispatchPeer(const std::shared_ptr<Peer>& peer)
{
    auto handlers  = reactor_.handlers(transportKey);
    auto idx       = peer->fd() % handlers.size();
    auto transport = std::static_pointer_cast<Transport>(handlers[idx]);

    transport->handleNewPeer(peer);
}
  • 2.2.1 获取reactor_在指定key下的所有handlers
    key 值是在前面的bind()函数的最后生成的:
void Listener::bind() { bind(addr_); }
void Listener::bind(const Address& address)
{

    ...

    auto transport = transportFactory_();

    reactor_.init(Aio::AsyncContext(workers_, workersName_));
    transportKey = reactor_.addHandler(transport);
}

我之前说过,一个reactor_在设计上可以拥有多个handler,transportKey值就是reactor_的handler索引,但是实现上仅仅添加了一次,因此key的值是0。

但是reactor_.handlers(transportKey)是返回了多个handler,这是因为每当执行reactor_.addHandler()时,会将handler克隆多份,复制给reactor_管理的worker线程,因此reactor_.handlers(transportKey)其实是返回的是每个线程所对应的handler,他们都是我们传入的transport的克隆体,他们拥有相同的key值(即index=0)。

换句话说,每个reactor_可以拥有多种不同handler,不同的handler拥有自己的key值,即索引值。对于同一种handler,reactor_的每个worker线程都拥有一个改handler对象的克隆。而reactor_.handlers(transportKey)返回的就是指定key(索引)的所有worker线程的handler的集合。

这样没一个handler都唯一对应一个worker线程。

  • 2.2.2 选择一个worker线程来处理新连接
    方法很简单,通过模取的方式来循环的挑选
  • 2.2.3 调用选中的handler来处理新连接
    请注意,到目前为止,我们还在主线程上,我们选中了特定线程的handler,也即transport对象,那么怎么让对应线程知道呢?其实方法很多,比如搞一个线程队列,然后把连接丢到指定线程的队列中,线程循环检查队列等,缺点就是需要不断的检查。这时候,我们可以结合eventfd(参考我翻译的linux手册翻译——eventfd(2))和epoll实现高效的事件通知机制。

2.3 transport->handleNewPeer(peer);

    void Transport::handleNewPeer(const std::shared_ptr<Tcp::Peer>& peer)
    {
        auto ctx                   = context();
        const bool isInRightThread = std::this_thread::get_id() == ctx.thread();
        if (!isInRightThread)
        {
            PeerEntry entry(peer);
            peersQueue.push(std::move(entry));
        }
        else
        {
            handlePeer(peer);
        }
        int fd = peer->fd();
        {
            Guard guard(toWriteLock);
            toWrite.emplace(fd, std::deque<WriteEntry> {});
        }
    }
  • 检查当前线程是否匹配transport所绑定的线程
    显然是不会匹配的,因为当前是在主线程中运行的,与transport绑定的是worker线程,之所以这里如此设计,是因为Pistache在设计上还支持单线程模式,类似与Nodejs,可以参考Netty 系列之 Netty 线程模型一文中的Reactor 单线程模型。但是在实现上,是以多线程模式为准的,即Reactor 多线程模型
  • 若不匹配,则将peer,放入peersQueue中
    peersQueue就是我们前面提到的,是一种基于eventfd(2)实现的队列,每当向其push数据,都会向监听poller中发送事件,详细见《Pistache源码分析 ——PollableQueue类》,而这个poller就定义在每个线程的结构中,当收到事件通知后,就会调用处理函数:
    void Transport::onReady(const Aio::FdSet& fds)
    {
        for (const auto& entry : fds)
        {
            if (entry.getTag() == writesQueue.tag())
            {
                handleWriteQueue();
            }
            else if (entry.getTag() == timersQueue.tag())
            {
                handleTimerQueue();
            }
            else if (entry.getTag() == peersQueue.tag())
            {
                handlePeerQueue();
            }
            else if (entry.getTag() == notifier.tag())
            {
                handleNotify();
            }

将会在Transport::onReady处理事件,可以看到当接受到peersQueue的事件时,就会调用handlePeerQueue()。handlePeerQueue()的处理逻辑如下,即对当前Queue中数据(即待处理的peer),执行handlePeer(data->peer);

    void Transport::handlePeerQueue()
    {
        for (;;)
        {
            auto data = peersQueue.popSafe();
            if (!data)
                break;

            handlePeer(data->peer);
        }
    }

2.4 handlePeer(data->peer);

    void Transport::handlePeer(const std::shared_ptr<Peer>& peer)
    {
        int fd = peer->fd();
        peers.insert(std::make_pair(fd, peer));

        peer->associateTransport(this);

        handler_->onConnection(peer);
        reactor()->registerFd(key(), fd, NotifyOn::Read | NotifyOn::Shutdown,
                              Polling::Mode::Edge);
    }

最重要的两步操作就是:

  • handler_->onConnection(peer);
    我们在这里创建了Request对象,并将其保存到peer.data_中:
void Handler::onConnection(const std::shared_ptr<Tcp::Peer>& peer)
    {
        peer->putData(ParserData, std::make_shared<RequestParser>(maxRequestSize_));
    }

Request对象是封装在RequestParser中的,RequestParser提供了解析HTTP请求,以填充request对象的方法,详见Request类ParserImpl类

  • 将client-fd注册到poller中
    这样当接受到http请求时,将会被见监听到,这里同peersQueue的处理:
 void Transport::onReady(const Aio::FdSet& fds){
...
    else if (entry.isReadable())
            {
                auto tag = entry.getTag();
                if (isPeerFd(tag))
                {
                    auto& peer = getPeer(tag);
                    handleIncoming(peer);
                }
...
}

可以看到,最终请求由handleIncoming(peer);处理。

2.5 handleIncoming(peer);

此函数的主要作用就是从client-fd中获取数据,在Request类ParserImpl类中我们提到,数据可能需要多次读取,因为我并不知道TCP传入的请求有多大,因此buffer的大小是保守的,所以可能需要多次的读取,这也是epoll的ET模式的标准方式。这就需要我们的解析HTTP请求的方法支持多次读取获取数据,当获取到数据后,将执行 handler_->onInput(buffer, bytes, peer);

void Transport::handleIncoming(const std::shared_ptr<Peer>& peer)
{
    char buffer[Const::MaxBuffer] = { 0 };
    int fd             = peer->fd();
    for (;;)
    {
        ssize_t bytes;
        bytes = recv(fd, buffer, Const::MaxBuffer, 0);
        if (bytes == -1)
        {
            if (errno == EAGAIN || errno == EWOULDBLOCK)
            {
                
            }
            else
            {
                handlePeerDisconnection(peer);
            }
            break;
        }
        else if (bytes == 0)
        {
            handlePeerDisconnection(peer);
            break;
        }

        else
        {
            handler_->onInput(buffer, bytes, peer);
        }
    }
}

2.6 handler_->onInput(buffer, bytes, peer);

此函数,将用于解析HTTP请求,以填充request对象。

void Handler::onInput(const char* buffer, size_t len,
                      const std::shared_ptr<Tcp::Peer>& peer)
{
    auto parser   = getParser(peer);
    auto& request = parser->request;
    try
    {
        if (!parser->feed(buffer, len))
        {
            parser->reset();
            throw HttpError(Code::Request_Entity_Too_Large,
                            "Request exceeded maximum buffer size");
        }

        auto state = parser->parse();

        if (state == Private::State::Done)
        {
            ResponseWriter response(request.version(), transport(), this, peer);
            
            request.copyAddress(peer->address());

            auto connection = request.headers().tryGet<Header::Connection>();

            if (connection)
            {
                response.headers().add<Header::Connection>(connection->control());
            }
            else
            {
                response.headers().add<Header::Connection>(ConnectionControl::Close);
            }

            onRequest(request, std::move(response));
            parser->reset();
        }
    }
    catch (const HttpError& err)
    {
        ResponseWriter response(request.version(), transport(), this, peer);
        response.send(static_cast<Code>(err.code()), err.reason());
        parser->reset();
    }

    catch (const std::exception& e)
    {
        ResponseWriter response(request.version(), transport(), this, peer);
        response.send(Code::Internal_Server_Error, e.what());
        parser->reset();
    }
}

主要的流程无非就是:

  • parser->feed(buffer, len)
  • auto state = parser->parse();

相关函数我们在ParserImpl类中有详细的描述。当state == Private::State::Done为真时,表示数据完成。
此时:

  • 初始化response对象,详细见ResponseWriter类
  • request.copyAddress(peer->address());
  • 检查是否Http请求中是否设置了Connection: keep-alive
    也就是是否使用长连接,如果是的话,那么在返回的响应中的头部也要写入此Header
  • 最后,调用onRequest(request, std::move(response));

然后,就进入了用户的自定义的处理函数,传入的参数包括了:

  • http请求信息:request
  • 写http响应的接口:respone

至此,从服务器启动,到接受到用户连接再到接受到Http请求的完整过程就结束了。

以至于如何实现的异步写,已经多线程的Rector模型的实现,都在对应的类中详细展开。

完结,撒花~

2.7 后记

当用户的处理逻辑结束之后,onRequest(request, std::move(response))函数返回。然后将调用parser->reset()重置request对象。之后工作线程将返回到epoll的等待循环中,等待下一个事件。

其实下一个事件一般就是用户的异步写任务,即完成respone.send的后续,这部分我们将在《Pistache源码分析 —— 异步写机制》中展开。

相关文章

网友评论

      本文标题:Pistache源码分析 —— Server的初始化和请求处理

      本文链接:https://www.haomeiwen.com/subject/mhtqultx.html