前言
PollableQueue,在普通Queue的基础之上,使用epoll和eventfd机制,实现的可监听的Queue,可以用来实现异步操作。
Pistache 没有使用std::queue
,而是自己实现了一个基于 Non-intrusive MPSC node-based queue设计的队列Pistache::Queue
,MPSC,即多生产者,单消费者的队列模型,无锁的设计。
Pistache::Queue
是非阻塞的,如果空队列执行pop()返回nullptr。
一般使用popSafe(),来返回数据,Safe根据的我的理解并不是线程安全,而是会帮你释放资源,具体的看Queue的实现。
PollableQueue
PollableQueue的实现还是非常简单的,主要就是新增了对eventfd的操作。PollableQueue应该不是线程安全的,但是单生产者,单消费者的情况下是可以的。
template <typename T>
class PollableQueue : public Queue<T>
{
public:
typedef typename Queue<T>::Entry Entry;
PollableQueue()
: event_fd(-1)
{ }
~PollableQueue() override
{
if (event_fd != -1)
close(event_fd);
}
bool isBound() const { return event_fd != -1; }
Polling::Tag bind(Polling::Epoll& poller);
template <class U>
void push(U&& u);
Entry* pop() override;
Polling::Tag tag() const;
void unbind(Polling::Epoll& poller);
private:
int event_fd;
};
-
bool isBound() const
判断PollableQueue是否,设置了eventfd,并绑定到了epoll中。
如果没有和epoll绑定,那么PollableQueue和普通Queue没有任何区别 -
Polling::Tag bind(Polling::Epoll& poller)
初始化eventfd,并绑定到epoll上,注意这里使用了LT模式。初始化eventfd时,指定了非阻塞参数,详见eventfd。
Polling::Tag bind(Polling::Epoll& poller)
{
using namespace Polling;
if (isBound())
{
throw std::runtime_error("The queue has already been bound");
}
event_fd = TRY_RET(eventfd(0, EFD_NONBLOCK));
Tag tag_(event_fd);
poller.addFd(event_fd, Flags<Polling::NotifyOn>(NotifyOn::Read), tag_);
return tag_;
}
-
template <class U> void push(U&& u)
向队列中写入一条数据,如果绑定了epoll,则向eventfd写入1。
这样epoll就会收到POLLIN的事件,从而处理队列中的数据。
template <class U>
void push(U&& u)
{
Queue<T>::push(std::forward<U>(u));
if (isBound())
{
uint64_t val = 1;
TRY(write(event_fd, &val, sizeof val));
}
}
-
Entry* pop() override
弹出一条队列头数据,同时read一下eventfd,这回导致eventfd直接清零,这样下一次read的时候就会返回EAGAIN。执行一下read是必须的,LT模式下,只要可读,就会一直触发事件,不像ET仅仅在新数据到达时才会触发。但是for循环的作用其实没有,理论上只需要read一次就可以了。需要注意,在pistache在使用上进行pop操作的时候,是不会进行push操作的,worker线程是单线程的事件循环,pop和push不会在同一个循环上使用。上面这句话对于异步写是适用的,但是对于peerQueue而言,的确是主线程负责push,而工作线程进行pop,这样就可能引起线程安全。
此外需要注意,pop()是重载了父类的Queue的函数,其返回值是Entry*,这是因为我们传入的数据类型,被Queue::Entry给封装了,所以pop()之后再执行entry.data()即可。或者执行popSafe()返回一个指向结果的智能指针。
推荐使用popSafe()返回数据,因为popSafe()会释放资源,如果使用pop()需要手动释放。
Entry* pop() override
{
auto ret = Queue<T>::pop();
if (isBound())
{
uint64_t val;
for (;;)
{
ssize_t bytes = read(event_fd, &val, sizeof val);
if (bytes == -1)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
else
{
// TODO
}
}
}
}
return ret;
}
-
void unbind(Polling::Epoll& poller)
取消绑定,需要将eventfd从epoll中移除,然后close(eventfd)
void unbind(Polling::Epoll& poller)
{
if (event_fd == -1)
{
throw std::runtime_error("The mailbox is not bound");
}
poller.removeFd(event_fd);
close(event_fd), event_fd = -1;
}
应用:PollableQueue<PeerEntry> peersQueue
首先看一下PeerEntry,就是存放了peer的只能指针。
struct PeerEntry
{
explicit PeerEntry(std::shared_ptr<Peer> peer_)
: peer(std::move(peer_))
{ }
std::shared_ptr<Peer> peer;
};
我们在《Pistache源码分析 —— Server的初始化和请求处理》中,描述了peersQueue的使用:
- 主线程处理新连接的最后一步就是将新连接的peer添加到peerQueue中:
PeerEntry entry(peer);
peersQueue.push(std::move(entry));
- 然后worker线程的epoll就会监听到,并开始执行事件处理函数:
else if (entry.getTag() == peersQueue.tag())
{
handlePeerQueue();
}
- peersQueue中可能包含了多个数据,因为第一步可能执行了多次,显然概率不大
void Transport::handlePeerQueue()
{
for (;;)
{
auto data = peersQueue.popSafe();
if (!data)
break;
handlePeer(data->peer);
}
}
需要注意这里使用的是peersQueue.popSafe();
网友评论