注:本文为阅读《Linux多线程服务端编程:使用muduo C++网络库》的一点笔记
空闲连接指的是一段时间内没有受到任何数据的连接。我们需要做的是每隔一段时间断开这些空闲连接,以免浪费资源。剔除空闲连接这一任务大概有如下两个特点和需求:
- 无需精准定时:只需要一个大致的间隔,比如说10s左右未受到数据,则判定其为空闲连接
- 应尽量简单:剔除空闲连接应该是一个简洁明了的操作,不应占据太多的计算或空间资源
time wheel运用了桶排序的思路,在系统中设置N个桶,共同组成一个队列。第i个桶中存放i秒后将要变为空闲连接的连接。这样一来,我们只需要每秒剔除第0个桶中的连接即可,无需遍历全部连接,剔除之后,将第0个桶移动到尾部。而一个连接如果接收到了新数据,那么该连接就将自己重新放入最后一个桶中。
以循环队列构造的time wheel
接下来,我先用自己的语言描述一下书中作者所使用的数据结构:
- 桶中存放
TcpConnection
?: 我们在断掉连接之前还需要判断其是否还处于连接状态,结合RAII技术,使用using Entry = std::weak_ptr<TcpConnection>
是最有效的。(weak_ptr是棉线,我们在Entry
的析构函数中去判断TcpConnection
出否还处于连接状态) - 每个
Entry
需要在有新数据到达时转移在桶中的位置?:其实不必真的去移动桶中的数据,利用using EntryPtr = std::shared_prt<Entry>
的引用计数,用RAII计数去管理资源。没错,用shared_ptr
去管理weak_ptr
,虽然看起来怪怪的,但此时一个weak_ptr
就是一个Entry
,故将其看作为一份资源。如此一来,我们无需再去频繁移动Entry
,转而在桶中存放同一个Entry
的多个shared_ptr
,并在引用计数变为0的时候去析构对应的资源。 - 如何构造time wheel中的每个桶?:因为需要频繁的删除和插入,所以可以使用哈希表
using Bucket = std::unordered_set<Entry>
去构造桶 - 如何构造time wheel?:使用
using WeakConnectionList = boost::circular_buffer<Bucket>
,该数据结构的元素个数在初始化时被固定,此后在每秒中,我们只需要在尾部重新添加一个空桶,那么头部的桶就会被清除
嗯...上面一段听起来不太像人话。接下来是我对上述数据结构的理解和剖析:
-
明确time wheel服务的目标:
明确这一点是很重要的,time wheel的核心仅仅是"⏲计时"。这意味time wheel只是去判断一个TCP连接是否已经超时成为空闲连接,而不去判断、也不去影响这个TCP连接的其他状态,比如说这个TCP连接是否已经断开。那或许有人会这样问了:“当time wheel得知某一个TCP连接已经超时并变成空闲连接之后,由谁来执行断开操作呢?”答案是交给其他合适的数据结构。明白了这一点,使用using Entry = shd::weak_ptr<TcpConnection>
作为time wheel和TcpConnection之间的桥梁也就是顺其而然的事情了,因为weak_ptr
的特点就是,它可以探测到其指向的资源的状态,但本身并不影响其状态。那为什么非得要这样来设计呢?我觉得这是一个明确分工的问题,一个数据结构应该专注于将自己的任务做好,而不应过分插手其他的任务。 -
直观的解决方案
其实明白了第1点的内涵之后,再结合time wheel的思路,我们就已经可以有一个比较直观的方案了。当服务器发现新的TcpConnection
之后,就生成这个TcpConnection
对应的Entry
,并将其放置到尾部的桶中。而每当任意TcpConnection
接收到新的消息之后,就将对应的Entry
从原来的桶再移动到尾部的桶中。最后,利用定时器设定一个每秒执行一次的回调函数,该函数将头部的桶中的全部Entry
逐个进行delete
,而Entry
中的析构函数会去断开对应的TcpConnection
。上述的思路就是很直观明了的。 -
反思:应该充分利用C++语言特性
当前Entry有4个EntryPtr
在第2点谈到的初步方案中,我们利用Entry
的析构函数去断开对应的TcpConnection
,这已经体现了RAII的思路了,但方案很麻烦的一点在于,我们需要对Entry
进行频繁的移动。试想一下,为了移动Entry
,我们必须要记录每个Entry
位于哪一个桶中,这就又需要一个新的数据结构,变来变去徒增烦恼。还是再一次发挥RAII的优势,我们再使用一层using EntryPtr = shared_ptr<Entry>;
去管理Entry
。具体的思路是这样的,桶中的元素类型从Entry
变为EntryPtr
,这样一来,每当任意TcpConnection
接收到新的消息之后,我们只需要在尾部的桶中新增一个对应Entry
的EntryPtr
。定时器的回调函数还是同样的模式,这样一来,每当一个TcpConnection
有新消息时,其对应的Entry
的引用计数会增加,而每秒执行一次的定时函数又会减少头部桶中的各个Entry
的引用计数。完美,我们不再需要新的数据结构去记录Entry
的位置了。下图是一个简单的示意,当前这个Entry
在桶中共存放了4个EntryPtr
,如果后续的6秒内该TcpConnection
均没有接收到数据,那么EntryPtr
的引用计数就会变为0,那么系统自动执行Entry
的析构函数,也就是去断开对应的TcpConnection
。
- 细节:一个
TcpConnection
仅对应一个Entry
这是一个非常隐秘的细节,但也及其重要。试想一下,如果同一个TcpConnection
生成了两个Entry
,而这两个Entry
各自的多个EntryPtr
又都放入了桶中,那么毫无疑问,该TcpConnection
会被"断开两次",而放入到此处的语义中,则是TcpConnection
会被提前断开连接。下图是一个示例,图中的TcpConnection
生成了两个Entry
,而Entry2
最多在4秒以后就会因为引用计数变为0而进行析构(假设这个连接之后不会再接收数据),即断开了对应的TCP连接,但实际上,桶6中也有对应的代理,这意味着理论上这个TcpConnection
至少还应该存在6秒。这也就是书中两个思考题的答案了,我们需要在TcpConnection
的context
中保存那唯一一个Entry
,而这个Entry
是在服务器探测到新连接的时候为这个新连接创建出来的。
错误的示例,单个TcpConnection生成了两个Entry
接下来就是实际操作了,明白了原理之后,就简单很多了。
/* 省略各个头文件 */
/* ... */
class EchoServer
{
using WeakTcpConnectionPtr = std::weak_ptr<TcpConnection>;
struct Entry {
std::weak_ptr<TcpConnection> ptr_;
Entry(const WeakTcpConnectionPtr &ptr): ptr_(ptr) {} // weak_ptr是可以通过shared_ptr进行构造的
~Entry(){
const TcpConnectionPtr conn = ptr_.lock(); // return the corresponding shared_ptr
if(conn){ // must check the activity before shutdown action
conn->shutdown();
}
}
};
using EntryPtr = std::shared_ptr<Entry>;
using Bucket = std::unordered_set<EntryPtr>;
using BucketList = boost::circular_buffer<Bucket>; // our finale data structure
using WeakEntryPtr = std::weak_ptr<Entry>;
public:
EchoServer(EventLoop *loop, const InetAddress &addr, const int &num_buckets):
loop_(loop), server_(loop, addr, "ECHO SERVER"), buckets_(num_buckets)
{
server_.setConnectionCallback(std::bind(&EchoServer::onConnection, this, _1)); // must have the &
server_.setMessageCallback(std::bind(&EchoServer::onMessage, this, _1, _2, _3));
loop_->runEvery(1.0, std::bind(&EchoServer::onTime, this));
}
// ~EchoServer();
void start_(){
server_.start();
}
private:
EventLoop *loop_;
TcpServer server_;
BucketList buckets_;
void onTime(){
buckets_.push_back(Bucket()); // the head bucket is popped automatically
}
void onConnection(const TcpConnectionPtr &conn){
LOG_INFO << "ECHO_SERVER: " << conn->peerAddress().toIpPort()
<< "-> " << conn->localAddress().toIpPort() << " is "
<< ( conn->connected() ? " ON " : " OFF " );
if(conn->connected()){
EntryPtr newPtr(new Entry(conn)); // shared_ptr can be transformed to weak_ptr
buckets_.back().insert(newPtr);
conn->setContext(WeakEntryPtr(newPtr)); // store weak_ptr of Entry
}
}
void onMessage(const TcpConnectionPtr &conn, Buffer *buf, Timestamp time_){
muduo::string msg(buf->retrieveAllAsString());
LOG_INFO << "ECHO_SERVER: " << conn->peerAddress().toIpPort()
<< " has message coming, size is: " << msg.size();
WeakEntryPtr tmp = boost::any_cast<WeakEntryPtr>(conn->getContext());
buckets_.back().insert(tmp.lock());// add the count
conn->send(msg);
}
};
#endif /* ECHO_SERVER_H */
int main(int argc, char *argv[])
{
EventLoop loop;
InetAddress addr(2333);
EchoServer server(&loop, addr, 10); // 设置超时时间为10s
server.start_();
loop.loop();
return 0;
}
网友评论