美文网首页
Pistache源码分析 —— 异步写机制

Pistache源码分析 —— 异步写机制

作者: 蟹蟹宁 | 来源:发表于2021-07-01 10:16 被阅读0次

    一、前言

    Pistache为了进行异步编程实现了异步写网络请求,即respone.send。它是基于Pistache的Promise以及PollableQueue机制共同实现的,流程其实和peerQueue类似,只要详细的看过前面的写的《Pistache —— Promise》以及《Pistache源码分析 —— PollableQueue类》机制后,对其原理已经不看源码,也能明白是如何实现的。

    二、asyncWrite

    template <typename Buf>
    Async::Promise<ssize_t> asyncWrite(Fd fd, const Buf& buffer, int flags = 0)
    {
        // Always enqueue responses for sending. Giving preference to consumer
        // context means chunked responses could be sent out of order.
        return Async::Promise<ssize_t>(
            [=](Async::Deferred<ssize_t> deferred) mutable {
                BufferHolder holder { buffer };
                WriteEntry write(std::move(deferred), std::move(holder), fd, flags);
                writesQueue.push(std::move(write));
            });
    }
    
    2.1 参数
    • Fd fd
      要写入数据的文件描述符,在这是由accept()创建的client-fd,由peer类保存。
    • const Buf& buffer
      要发送的数据,这里的Buf是模板,可以是RawBuffer或者FileBuffer,这两个类比较简单,就不详细展开了
    • int flags = 0
      这是send(2)系统调用中使用的参数,因为最终还是调用send(2)/sendfile(2)来进行网络数据的传送
    2.2执行流程
    • BufferHolder holder { buffer }
      将要写的数据进行包装到BufferHolder中,BufferHolder将RawBuffer和FileBuffer统一封装,并提供了重要的offset字段,以标志数据发送到哪了,BufferHolder也不复杂,就不展开了。
    • 将数据打包到WriteEntry中
      WriteEntry中有一个重要的字段,那就是deferred,这是Promise给我们的进行数据填充的接口,见Promise
    • 放到writesQueue中
      到这里asyncWrite,就可以返回了,worker线程的epoll会监听到writesQueue放入了数据,然后进行处理

    三、epoll捕获写任务

    void Transport::onReady(const Aio::FdSet& fds)
    {
        for (const auto& entry : fds)
        {
            if (entry.getTag() == writesQueue.tag())
            {
                handleWriteQueue();
            }
    

    看一下处理函数的实现:

    void Transport::handleWriteQueue()
    {
        // Let's drain the queue
        for (;;)
        {
            auto write = writesQueue.popSafe();
            if (!write)
                break;
            auto fd = write->peerFd;
            if (!isPeerFd(fd))
                continue;
            {
                Guard guard(toWriteLock);
                toWrite[fd].push_back(std::move(*write));
            }
    
            reactor()->modifyFd(key(), fd, NotifyOn::Read | NotifyOn::Write,
                                Polling::Mode::Edge);
        }
    }
    

    用for循环处理writeQueue:

    • 将 writeEntry 放到toWrite中
      toWrite是Transport的字段,是一个哈希映射,用于记录每个连接(client-fd)的写请求(writeEntry)。每个transport或者说每个worker线程同时处理多个连接(client-fd),而每个连接,有可能存在多个写请求。
      std::unordered_map<Fd, std::deque<WriteEntry>> toWrite;
    • 修改存在写请求的连接的epoll关注事件
      因为只要写缓冲区空闲,就可以报告写事件,这样就会一直受到POLLOUT事件(ET模式可能不会一直报告),因此在存在写请求的时候,开始关注POLLOUT事件是很合理的

    这个时候,如果写缓冲区空闲,那么就会触发关注了POLLOUT事件fd:

    else if (entry.isWritable())
    {
        auto tag = entry.getTag();
        auto fd  = static_cast<Fd>(tag.value());
    
        {
            Guard guard(toWriteLock);
            auto it = toWrite.find(fd);
            if (it == std::end(toWrite))
            {
                throw std::runtime_error(
                    "Assertion Error: could not find write data");
            }
        }
    
        reactor()->modifyFd(key(), fd, NotifyOn::Read, Polling::Mode::Edge);
    
        // Try to drain the queue
        asyncWriteImpl(fd);
    }
    

    处理函数主要做两件事:

    • 就是停止POLLOUT事件
      目的是避免频繁的收到POLLOUT事件,直到又有了新的请求再开启,这与epoll的EPOLLNESHOT标志很像,EPOLLNESHOT标志会在接受到时间后,停止对fd的监控,一般用于多线程中,避免多个线程同时操作同一个fd,但是这里如果用EPOLLNESHOT标志的话,POLLOIN也无法收到了。
      这里需要注意的是,上述整个过程,都是在同一个线程中实现的,这就是所谓的经典的epoll事件循环,send是发生在Readable(即HTTP请求到达)的事件处理,而异步写是实现是在writeQueue和Writable,他们都在一个线程中被处理,是不会并发的。
    • asyncWriteImpl(fd)
      开始执行写数据

    四、asyncWriteImpl(fd)

    下面这段看似复杂,其实就是就是调用send(2),sendfile(2)等系统调用来发送数据,可以先参考我翻译的手册:
    linux手册翻译——send(2)
    linux手册翻译——recv(2)
    linux手册翻译——sendfile(2)
    整个流程就是逻辑比较简单,主要需要理解非阻塞模式下的send过程:

    • 首先我们是在收到了epoll的可写事件后再调用此函数的,说明当前的套接字(fd)是可以写入的
    • send()和sendfile()在成功时,返回的是写入的字节大小,显然成功写入多少是取决于当前TCP套接字的写入缓冲区的剩余空间的
    • 失败时,将返回-1 ,这个时候可能的情况:
      • errno == EAGAIN || errno == EWOULDBLOCK
        这是典型的发生阻塞,说明当前fd不能写入了,返回值是成功写入字节数,接下来我们进行了以下操作:

        1. 扣除了原buf已经成功发生的部分,然后将新任务内容重新放到了队列的头部
        auto bufferHolder = buffer.detach(totalWritten);
        wq.pop_front();
        wq.push_front(WriteEntry(std::move(deferred), bufferHolder, flags));
        
        1. 重新修改epoll,令其监听POLLOUT
        reactor()->modifyFd(key(), fd, NotifyOn::Read | NotifyOn::Write, Polling::Mode::Edge);
        

        这样就保证了任务写入的完整性,即一旦任务1开始写,那么在任务1写完之前,不能写任务2的内容。因为新的任务都是从队尾追加的,我们将优先处理头部的任务。然后这个时候理论上应该是推出整个循环的以等待fd的可写事件,但是代码中并没有退出循环,我认为这是一个BUG,我在Github上提交了这个ISSUE 。我认为这种设计是为了实现flush()函数,为了保证flush()会在将toWrite的所有缓冲区清空之前,不能返回。所以我的ISSUE 也不完善,因为没有考虑flush(),我觉得可以实现一个同步的写实现,专用于flush()。

      • errno == EBADF || errno == EPIPE || errno == ECONNRESET
        这是fd出现了问题,写入已经无法完成,这时候需要将其从toWrite中删除

      • 其他的错误
        作者认为,当前进行的写入任务出了问题,但是fd还有救,因此执行了cleanUp():

        auto cleanUp = [&]() {
            wq.pop_front();
            if (wq.empty())
            {
                toWrite.erase(fd);
                reactor()->modifyFd(key(), fd, NotifyOn::Read, Polling::Mode::Edge);
                stop = true;
            }
            lock.unlock()
        };
        

    此函数首先移除当前的写入任务,然后判断fd中是否还有其他写任务,如果没有就结束执行。

    • 如果执行成功,那么将判断写入数据总数是否已经等于要写入的数据总数,如果是那么执行上面的cleanUp(),如果没有,则继续循环执行send()和sendfile()
    源码实现:
    void Transport::asyncWriteImpl(Fd fd)
    {
        bool stop = false;
        while (!stop)
        {
            std::unique_lock<std::mutex> lock(toWriteLock);
    
            auto it = toWrite.find(fd);
    
            // cleanup will have been handled by handlePeerDisconnection
            if (it == std::end(toWrite))
            {
                return;
            }
            auto& wq = it->second;
            if (wq.empty())
            {
                break;
            }
    
            auto& entry                       = wq.front();
            int flags                         = entry.flags;
            BufferHolder& buffer              = entry.buffer;
            Async::Deferred<ssize_t> deferred = std::move(entry.deferred);
    
            auto cleanUp = [&]() {
                wq.pop_front();
                if (wq.empty())
                {
                    toWrite.erase(fd);
                    reactor()->modifyFd(key(), fd, NotifyOn::Read, Polling::Mode::Edge);
                    stop = true;
                }
                lock.unlock();
            };
    
            size_t totalWritten = buffer.offset();
            for (;;)
            {
                ssize_t bytesWritten = 0;
                auto len             = buffer.size() - totalWritten;
    
                if (buffer.isRaw())
                {
                    auto raw        = buffer.raw();
                    const auto* ptr = raw.data().c_str() + totalWritten;
                    bytesWritten    = sendRawBuffer(fd, ptr, len, flags);
                }
                else
                {
                    auto file    = buffer.fd();
                    off_t offset = totalWritten;
                    bytesWritten = sendFile(fd, file, offset, len);
                }
                if (bytesWritten < 0)
                {
                    if (errno == EAGAIN || errno == EWOULDBLOCK)
                    {
    
                        auto bufferHolder = buffer.detach(totalWritten);
    
                        // pop_front kills buffer - so we cannot continue loop or use buffer
                        // after this point
                        wq.pop_front();
                        wq.push_front(WriteEntry(std::move(deferred), bufferHolder, flags));
                        reactor()->modifyFd(key(), fd, NotifyOn::Read | NotifyOn::Write,
                                            Polling::Mode::Edge);
                    }
                    // EBADF can happen when the HTTP parser, in the case of
                    // an error, closes fd before the entire request is processed.
                    // https://github.com/pistacheio/pistache/issues/501
                    else if (errno == EBADF || errno == EPIPE || errno == ECONNRESET)
                    {
                        wq.pop_front();
                        toWrite.erase(fd);
                        stop = true;
                    }
                    else
                    {
                        cleanUp();
                        deferred.reject(Pistache::Error::system("Could not write data"));
                    }
                    break;
                }
                else
                {
                    totalWritten += bytesWritten;
                    if (totalWritten >= buffer.size())
                    {
                        if (buffer.isFile())
                        {
                            // done with the file buffer, nothing else knows whether to
                            // close it with the way the code is written.
                            ::close(buffer.fd());
                        }
    
                        cleanUp();
    
                        // Cast to match the type of defered template
                        // to avoid a BadType exception
                        deferred.resolve(static_cast<ssize_t>(totalWritten));
                        break;
                    }
                }
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:Pistache源码分析 —— 异步写机制

          本文链接:https://www.haomeiwen.com/subject/abkjultx.html