美文网首页
Muduo库设计(7)——采用Buffer接收和发送数据

Muduo库设计(7)——采用Buffer接收和发送数据

作者: Magic11 | 来源:发表于2019-12-02 15:38 被阅读0次

    一、Buffer读取数据

    首先当channel有可读消息时,Channel会通知TcpConnection对象(当有新的连接时,channel会通知Acceptor对象),由此可见当有事件发生时,channel只负责通知上层调用者(TcpConnection, Acceptor、TimerQueue)

    void Channel::handleEvent(Timestamp receiveTime)
    {
      if (revents_ & POLLNVAL) {
        LOG_WARN << "Channel::handle_event() POLLNVAL";
      }
    
      if ((revents_ & POLLHUP) && !(revents_ & POLLIN)) {
        if (closeCallback_) closeCallback_();
      }
      if (revents_ & (POLLERR | POLLNVAL)) {
        if (errorCallback_) errorCallback_();
      }
      if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) {
        if (readCallback_) readCallback_(receiveTime);
      }
      if (revents_ & POLLOUT) {
        if (writeCallback_) writeCallback_();
      }
      eventHandling_ = false;
    }
    

    TcpConnection对象负责将该连接(fd)上的消息读取到一个缓冲区里面,然后将这个缓冲区的地址告诉上层调用者。

    void TcpConnection::handleRead(Timestamp receiveTime)
    {
      int savedErrno = 0;
      ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
      if (n > 0) {
        messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
      } else if (n == 0) {
        handleClose();
      } else {
        errno = savedErrno;
        handleError();
      }
    }
    

    实际读取消息的逻辑如下:

    ssize_t Buffer::readFd(int fd, int* savedErrno)
    {
      char extrabuf[65536];
      struct iovec vec[2];
      const size_t writable = writableBytes();
      vec[0].iov_base = begin()+writerIndex_;
      vec[0].iov_len = writable;
      vec[1].iov_base = extrabuf;
      vec[1].iov_len = sizeof extrabuf;
      const ssize_t n = readv(fd, vec, 2);
      if (n < 0) {
        *savedErrno = errno;
      } else if (implicit_cast<size_t>(n) <= writable) {
        writerIndex_ += n;
      } else {
        writerIndex_ = buffer_.size();
        append(extrabuf, n - writable);
      }
      return n;
    }
    

    二、TcpConnection发送数据

    TcpConnection提供发送消息的接口send(const std::string& message),该接口可以跨线程调用

    void TcpConnection::send(const std::string& message)
    {
      if (state_ == kConnected) {
        if (loop_->isInLoopThread()) {
          sendInLoop(message);
        } else {
          loop_->runInLoop(std::bind(&TcpConnection::sendInLoop, this, message));
        }
      }
    }
    

    sendInLoop函数必须在当前线程(IO线程)中执行,其发送的逻辑如下:
    1)若当前channel没有监测写事件,并且写缓冲区中无数据,则可以直接向对应的fd发送数据,若数据一次没有全部发送完,则将剩余的数据写入缓冲区。
    2)若当前channel正在监测写事件,或者写缓冲区中存在数据,则将剩余的数据写入缓冲区
    代码逻辑如下:

    void TcpConnection::sendInLoop(const std::string& message)
    {
      loop_->assertInLoopThread();
      ssize_t nwrote = 0;
      //如果当前channel不关注是否可写事件,并且写缓冲区中没有任何数据,则直接发送消息
      if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
        nwrote = ::write(channel_->fd(), message.data(), message.size());
        if (nwrote >= 0) {
          if (implicit_cast<size_t>(nwrote) < message.size()) {//消息一次没有全部发送
            LOG_TRACE << "I am going to write more data";
          }
        } else {
          nwrote = 0;
          if (errno != EWOULDBLOCK) {
            LOG_SYSERR << "TcpConnection::sendInLoop";
          }
        }
      }
    
      //将没有发送完的消息放入缓冲队列
      if (implicit_cast<size_t>(nwrote) < message.size()) {
        outputBuffer_.append(message.data()+nwrote, message.size()-nwrote);//将未发送的数据写入缓冲区
        if (!channel_->isWriting()) {
          channel_->enableWriting();
        }
      }
    }
    
    

    对于未发送完的数据,channel会开始监测可写事件,一旦有可写事件发生就会调用handleWrite继续发送缓冲区中剩余的数据,一旦数据发送完,立即关闭对可写事件的监测,避免busy loop,代码逻辑如下:

    void TcpConnection::handleWrite()
    {
      loop_->assertInLoopThread();
      if (channel_->isWriting()) {
        ssize_t n = ::write(channel_->fd(), outputBuffer_.peek(), outputBuffer_.readableBytes());
        if (n > 0) {
          outputBuffer_.retrieve(n);
          if (outputBuffer_.readableBytes() == 0) {
            channel_->disableWriting();   //一旦数据发送完,立即关闭对可写事件的监测,避免busy loop
            if (state_ == kDisconnecting) {
              shutdownInLoop();
            }
          } else {  //没有写完继续监测写事件
            LOG_TRACE << "I am going to write more data";
          }
        } else {
          LOG_SYSERR << "TcpConnection::handleWrite";
        }
      } else {
        LOG_TRACE << "Connection is down, no more writing";
      }
    }
    

    相关文章

      网友评论

          本文标题:Muduo库设计(7)——采用Buffer接收和发送数据

          本文链接:https://www.haomeiwen.com/subject/kohfgctx.html