一、EventLoopThread类
首先Muduo看一下对于线程的封装,以及如何做到 one loop per thread 的,EventLoopThread类持有一个thread对象,并在它的线程执行的函数的内部创建一个EventLoop对象,EventLoopThread对外会提供一个startLoop()接口,该接口的作用是开始执行thread线程对象,并返回线程内部创建的EventLoop对象。
由于EventLoop是在线程执行的函数中(threadFunc)创建的,所以startLoop和它所启动的线程执行的函数是异步的,startLoop必须等待EventLoop对象在threadFunc中创建完成,才能返回EventLoop对象的地址。
StartLoop函数的逻辑如下:
EventLoop* EventLoopThread::startLoop()
{
thread_.detach(); //开始在线程中执行threadFunc()
{
//等待EventLoop对象创建完成
std::unique_lock<std::mutex> lock(mutex_);
while (loop_ == NULL)
{
cond_.wait(lock);
}
}
return loop_; //返回线程独有的EventLoop对象
}
其中threadFunc()函数在EventLoopThread对象刚创建时就与thread对象绑定
EventLoopThread::EventLoopThread()
: loop_(NULL),
exiting_(false),
thread_(std::bind(&EventLoopThread::threadFunc, this))
{
}
当startLoop()函数执行到thread_.detach();时,该函数中的代码就开始执行
void EventLoopThread::threadFunc()
{
EventLoop loop; //在线程内部创建一个loop对象
{
std::lock_guard<std::mutex> lock(mutex_);
loop_ = &loop;
cond_.notify_all(); //通知startLoop函数,可以返回loop对象的地址了
}
loop.loop(); //开启loop循环
}
二、线程池EventLoopThreadPool
线程池包含两个vector成员,其中一个vector用来存放EventLoopThread对象,另一个vector用来存放EventLoopThread对象中创建的EventLoop对象的地址。
每当创建一个EventLoopThread对象,并将其插入到vector中时,就将其中的EventLoop对象的地址插入到另一个vector中
EventLoopThreadPool对外提供了一个start接口,start接口用来创建指定数目的EventLoopThread对象,并将EventLoopThread对象和它的EventLoop地址分别插入到vector中,其中线程的数据需要提前调用setThreadNum(int numThreads)接口指定
为了保证start接口是在创建TcpServer的线程中执行的,需要在创建线程池对象时将TcpServer的EventLoop对象传入
EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop)
: baseLoop_(baseLoop),
started_(false),
numThreads_(0),
next_(0)
{
}
Start的逻辑如下:
void EventLoopThreadPool::start()
{
//必须在IO线程中执行,创建TcpServer的那个线程中执行
baseLoop_->assertInLoopThread();
started_ = true;
for (int i = 0; i < numThreads_; ++i)
{
EventLoopThread* t = new EventLoopThread;
threads_.push_back(std::shared_ptr<EventLoopThread>(t));
loops_.push_back(t->startLoop());
}
}
线程池对外提供的一个比较关键的接口是获取每个线程的eventLoop对象的地址,它会从vector中依次获取下一个loop对象
EventLoop* EventLoopThreadPool::getNextLoop()
{
baseLoop_->assertInLoopThread();
EventLoop* loop = baseLoop_;
if (!loops_.empty())
{
loop = loops_[next_];
++next_;
if (static_cast<size_t>(next_) >= loops_.size())
{
next_ = 0;
}
}
return loop;
}
从Channel类的定义可知,Channel类每次创建时,需要外边传入一个EventLoop对象, 从而Channel对象就会将它自己插入到该EventLoop对象所持有的Poller类的vector<Channel>中,即表示该Channel对象也就归该EventLoop对象管理
void Channel::update()
{
loop_->updateChannel(this);
}
三、多线程TcpServer
每个TcpServer会持有一个线程池对象,当启动TcpServer时,会启动线程池对象
void TcpServer::start()
{
if (!started_)
{
started_ = true;
threadPool_->start();
}
if (!acceptor_->listenning())
{
loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_));
}
}
每当TcpServer收到一个新的连接时,就从线程池中获取到下一个EventLoop对象,并将它传入TcpConnection
void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
EventLoop* ioLoop = threadPool_->getNextLoop();
TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));
ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}
由于每个TcpConnection包含一个Channel,所以该Channel就由传入TcpConnection的EventLoop来管理
另外要注意连接的销毁要在TcpServer所在的线程中执行
void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
loop_->assertInLoopThread();
size_t n = connections_.erase(conn->name());
EventLoop* ioLoop = conn->getLoop();
ioLoop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
}
网友评论