美文网首页
Muduo库设计(8)——多线程

Muduo库设计(8)——多线程

作者: Magic11 | 来源:发表于2019-12-03 17:14 被阅读0次

一、EventLoopThread类

首先Muduo看一下对于线程的封装,以及如何做到 one loop per thread 的,EventLoopThread类持有一个thread对象,并在它的线程执行的函数的内部创建一个EventLoop对象,EventLoopThread对外会提供一个startLoop()接口,该接口的作用是开始执行thread线程对象,并返回线程内部创建的EventLoop对象。
由于EventLoop是在线程执行的函数中(threadFunc)创建的,所以startLoop和它所启动的线程执行的函数是异步的,startLoop必须等待EventLoop对象在threadFunc中创建完成,才能返回EventLoop对象的地址。
StartLoop函数的逻辑如下:

EventLoop* EventLoopThread::startLoop()
{

  thread_.detach();    //开始在线程中执行threadFunc()

  {
    //等待EventLoop对象创建完成
    std::unique_lock<std::mutex> lock(mutex_);
    while (loop_ == NULL)
    {
      cond_.wait(lock);
    }
  }

  return loop_;    //返回线程独有的EventLoop对象
}

其中threadFunc()函数在EventLoopThread对象刚创建时就与thread对象绑定

EventLoopThread::EventLoopThread()
  : loop_(NULL),
    exiting_(false),
    thread_(std::bind(&EventLoopThread::threadFunc, this))
{
}

当startLoop()函数执行到thread_.detach();时,该函数中的代码就开始执行

void EventLoopThread::threadFunc()
{
  EventLoop loop;   //在线程内部创建一个loop对象

  {
    std::lock_guard<std::mutex> lock(mutex_);
    loop_ = &loop;
    cond_.notify_all(); //通知startLoop函数,可以返回loop对象的地址了
  }

  loop.loop();     //开启loop循环
}

二、线程池EventLoopThreadPool

线程池包含两个vector成员,其中一个vector用来存放EventLoopThread对象,另一个vector用来存放EventLoopThread对象中创建的EventLoop对象的地址。
每当创建一个EventLoopThread对象,并将其插入到vector中时,就将其中的EventLoop对象的地址插入到另一个vector中
EventLoopThreadPool对外提供了一个start接口,start接口用来创建指定数目的EventLoopThread对象,并将EventLoopThread对象和它的EventLoop地址分别插入到vector中,其中线程的数据需要提前调用setThreadNum(int numThreads)接口指定
为了保证start接口是在创建TcpServer的线程中执行的,需要在创建线程池对象时将TcpServer的EventLoop对象传入

EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop)
  : baseLoop_(baseLoop),
    started_(false),
    numThreads_(0),
    next_(0)
{
}

Start的逻辑如下:

void EventLoopThreadPool::start()
{
  //必须在IO线程中执行,创建TcpServer的那个线程中执行
  baseLoop_->assertInLoopThread();

  started_ = true;

  for (int i = 0; i < numThreads_; ++i)
  {
    EventLoopThread* t = new EventLoopThread;
    threads_.push_back(std::shared_ptr<EventLoopThread>(t));
    loops_.push_back(t->startLoop());
  }
}

线程池对外提供的一个比较关键的接口是获取每个线程的eventLoop对象的地址,它会从vector中依次获取下一个loop对象

EventLoop* EventLoopThreadPool::getNextLoop()
{
  baseLoop_->assertInLoopThread();
  EventLoop* loop = baseLoop_;

  if (!loops_.empty())
  {
    loop = loops_[next_];
    ++next_;
    if (static_cast<size_t>(next_) >= loops_.size())
    {
      next_ = 0;
    }
  }
  return loop;
}

从Channel类的定义可知,Channel类每次创建时,需要外边传入一个EventLoop对象, 从而Channel对象就会将它自己插入到该EventLoop对象所持有的Poller类的vector<Channel>中,即表示该Channel对象也就归该EventLoop对象管理

void Channel::update()
{
  loop_->updateChannel(this);
}

三、多线程TcpServer

每个TcpServer会持有一个线程池对象,当启动TcpServer时,会启动线程池对象

void TcpServer::start()
{
  if (!started_)
  {
    started_ = true;
    threadPool_->start();
  }

  if (!acceptor_->listenning())
  {
    loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_));  
  }
}

每当TcpServer收到一个新的连接时,就从线程池中获取到下一个EventLoop对象,并将它传入TcpConnection

void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
  loop_->assertInLoopThread();

  EventLoop* ioLoop = threadPool_->getNextLoop();
  TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));

  ioLoop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));
}

由于每个TcpConnection包含一个Channel,所以该Channel就由传入TcpConnection的EventLoop来管理
另外要注意连接的销毁要在TcpServer所在的线程中执行

void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
  loop_->runInLoop(std::bind(&TcpServer::removeConnectionInLoop, this, conn));
}

void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
  loop_->assertInLoopThread();
  size_t n = connections_.erase(conn->name());
  EventLoop* ioLoop = conn->getLoop();
  ioLoop->queueInLoop(std::bind(&TcpConnection::connectDestroyed, conn));
}

相关文章

网友评论

      本文标题:Muduo库设计(8)——多线程

      本文链接:https://www.haomeiwen.com/subject/hzxhgctx.html