美文网首页
Pistache源码分析 —— PollableQueue类

Pistache源码分析 —— PollableQueue类

作者: 蟹蟹宁 | 来源:发表于2021-07-01 10:16 被阅读0次

    前言

    PollableQueue,在普通Queue的基础之上,使用epolleventfd机制,实现的可监听的Queue,可以用来实现异步操作。

    Pistache 没有使用std::queue,而是自己实现了一个基于 Non-intrusive MPSC node-based queue设计的队列Pistache::Queue,MPSC,即多生产者,单消费者的队列模型,无锁的设计。

    Pistache::Queue 是非阻塞的,如果空队列执行pop()返回nullptr。

    一般使用popSafe(),来返回数据,Safe根据的我的理解并不是线程安全,而是会帮你释放资源,具体的看Queue的实现。

    PollableQueue

    PollableQueue的实现还是非常简单的,主要就是新增了对eventfd的操作。PollableQueue应该不是线程安全的,但是单生产者,单消费者的情况下是可以的。

        template <typename T>
        class PollableQueue : public Queue<T>
        {
        public:
            typedef typename Queue<T>::Entry Entry;
            PollableQueue()
                : event_fd(-1)
            { }
            ~PollableQueue() override
            {
                if (event_fd != -1)
                    close(event_fd);
            }
            bool isBound() const { return event_fd != -1; }
            Polling::Tag bind(Polling::Epoll& poller);
    
            template <class U>
            void push(U&& u);
    
            Entry* pop() override;
            Polling::Tag tag() const;
            void unbind(Polling::Epoll& poller);
        private:
            int event_fd;
        };
    
    • bool isBound() const
      判断PollableQueue是否,设置了eventfd,并绑定到了epoll中。
      如果没有和epoll绑定,那么PollableQueue和普通Queue没有任何区别

    • Polling::Tag bind(Polling::Epoll& poller)
      初始化eventfd,并绑定到epoll上,注意这里使用了LT模式。初始化eventfd时,指定了非阻塞参数,详见eventfd

    Polling::Tag bind(Polling::Epoll& poller)
    {
        using namespace Polling;
    
        if (isBound())
        {
            throw std::runtime_error("The queue has already been bound");
        }
    
        event_fd = TRY_RET(eventfd(0, EFD_NONBLOCK));
        Tag tag_(event_fd);
        poller.addFd(event_fd, Flags<Polling::NotifyOn>(NotifyOn::Read), tag_);
    
        return tag_;
    }
    
    • template <class U> void push(U&& u)
      向队列中写入一条数据,如果绑定了epoll,则向eventfd写入1。
      这样epoll就会收到POLLIN的事件,从而处理队列中的数据。
    template <class U>
    void push(U&& u)
    {
        Queue<T>::push(std::forward<U>(u));
    
        if (isBound())
        {
            uint64_t val = 1;
            TRY(write(event_fd, &val, sizeof val));
        }
    }
    
    • Entry* pop() override
      弹出一条队列头数据,同时read一下eventfd,这回导致eventfd直接清零,这样下一次read的时候就会返回EAGAIN。执行一下read是必须的,LT模式下,只要可读,就会一直触发事件,不像ET仅仅在新数据到达时才会触发。但是for循环的作用其实没有,理论上只需要read一次就可以了。需要注意,在pistache在使用上进行pop操作的时候,是不会进行push操作的,worker线程是单线程的事件循环,pop和push不会在同一个循环上使用。 上面这句话对于异步写是适用的,但是对于peerQueue而言,的确是主线程负责push,而工作线程进行pop,这样就可能引起线程安全。
      此外需要注意,pop()是重载了父类的Queue的函数,其返回值是Entry*,这是因为我们传入的数据类型,被Queue::Entry给封装了,所以pop()之后再执行entry.data()即可。或者执行popSafe()返回一个指向结果的智能指针。
      推荐使用popSafe()返回数据,因为popSafe()会释放资源,如果使用pop()需要手动释放。
    Entry* pop() override
    {
        auto ret = Queue<T>::pop();
        if (isBound())
        {
            uint64_t val;
            for (;;)
            {
                ssize_t bytes = read(event_fd, &val, sizeof val);
                if (bytes == -1)
                {
                    if (errno == EAGAIN || errno == EWOULDBLOCK)
                        break;
                    else
                    {
                        // TODO
                    }
                }
            }
        }
        return ret;
    }
    
    • void unbind(Polling::Epoll& poller)
      取消绑定,需要将eventfd从epoll中移除,然后close(eventfd)
    void unbind(Polling::Epoll& poller)
    {
        if (event_fd == -1)
        {
            throw std::runtime_error("The mailbox is not bound");
        }
    
        poller.removeFd(event_fd);
        close(event_fd), event_fd = -1;
    }
    

    应用:PollableQueue<PeerEntry> peersQueue

    首先看一下PeerEntry,就是存放了peer的只能指针。

    struct PeerEntry
    {
        explicit PeerEntry(std::shared_ptr<Peer> peer_)
            : peer(std::move(peer_))
        { }
    
        std::shared_ptr<Peer> peer;
    };
    

    我们在《Pistache源码分析 —— Server的初始化和请求处理》中,描述了peersQueue的使用:

    • 主线程处理新连接的最后一步就是将新连接的peer添加到peerQueue中:
    PeerEntry entry(peer);
    peersQueue.push(std::move(entry));
    
    • 然后worker线程的epoll就会监听到,并开始执行事件处理函数:
    else if (entry.getTag() == peersQueue.tag())
    {
        handlePeerQueue();
    }
    
    • peersQueue中可能包含了多个数据,因为第一步可能执行了多次,显然概率不大
    void Transport::handlePeerQueue()
    {
        for (;;)
        {
            auto data = peersQueue.popSafe();
            if (!data)
                break;
    
            handlePeer(data->peer);
        }
    }
    

    需要注意这里使用的是peersQueue.popSafe();

    相关文章

      网友评论

          本文标题:Pistache源码分析 —— PollableQueue类

          本文链接:https://www.haomeiwen.com/subject/ubkjultx.html