一、Buffer读取数据
首先当channel有可读消息时,Channel会通知TcpConnection对象(当有新的连接时,channel会通知Acceptor对象),由此可见当有事件发生时,channel只负责通知上层调用者(TcpConnection, Acceptor、TimerQueue)
void Channel::handleEvent(Timestamp receiveTime)
{
if (revents_ & POLLNVAL) {
LOG_WARN << "Channel::handle_event() POLLNVAL";
}
if ((revents_ & POLLHUP) && !(revents_ & POLLIN)) {
if (closeCallback_) closeCallback_();
}
if (revents_ & (POLLERR | POLLNVAL)) {
if (errorCallback_) errorCallback_();
}
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP)) {
if (readCallback_) readCallback_(receiveTime);
}
if (revents_ & POLLOUT) {
if (writeCallback_) writeCallback_();
}
eventHandling_ = false;
}
TcpConnection对象负责将该连接(fd)上的消息读取到一个缓冲区里面,然后将这个缓冲区的地址告诉上层调用者。
void TcpConnection::handleRead(Timestamp receiveTime)
{
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0) {
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
} else if (n == 0) {
handleClose();
} else {
errno = savedErrno;
handleError();
}
}
实际读取消息的逻辑如下:
ssize_t Buffer::readFd(int fd, int* savedErrno)
{
char extrabuf[65536];
struct iovec vec[2];
const size_t writable = writableBytes();
vec[0].iov_base = begin()+writerIndex_;
vec[0].iov_len = writable;
vec[1].iov_base = extrabuf;
vec[1].iov_len = sizeof extrabuf;
const ssize_t n = readv(fd, vec, 2);
if (n < 0) {
*savedErrno = errno;
} else if (implicit_cast<size_t>(n) <= writable) {
writerIndex_ += n;
} else {
writerIndex_ = buffer_.size();
append(extrabuf, n - writable);
}
return n;
}
二、TcpConnection发送数据
TcpConnection提供发送消息的接口send(const std::string& message),该接口可以跨线程调用
void TcpConnection::send(const std::string& message)
{
if (state_ == kConnected) {
if (loop_->isInLoopThread()) {
sendInLoop(message);
} else {
loop_->runInLoop(std::bind(&TcpConnection::sendInLoop, this, message));
}
}
}
sendInLoop函数必须在当前线程(IO线程)中执行,其发送的逻辑如下:
1)若当前channel没有监测写事件,并且写缓冲区中无数据,则可以直接向对应的fd发送数据,若数据一次没有全部发送完,则将剩余的数据写入缓冲区。
2)若当前channel正在监测写事件,或者写缓冲区中存在数据,则将剩余的数据写入缓冲区
代码逻辑如下:
void TcpConnection::sendInLoop(const std::string& message)
{
loop_->assertInLoopThread();
ssize_t nwrote = 0;
//如果当前channel不关注是否可写事件,并且写缓冲区中没有任何数据,则直接发送消息
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) {
nwrote = ::write(channel_->fd(), message.data(), message.size());
if (nwrote >= 0) {
if (implicit_cast<size_t>(nwrote) < message.size()) {//消息一次没有全部发送
LOG_TRACE << "I am going to write more data";
}
} else {
nwrote = 0;
if (errno != EWOULDBLOCK) {
LOG_SYSERR << "TcpConnection::sendInLoop";
}
}
}
//将没有发送完的消息放入缓冲队列
if (implicit_cast<size_t>(nwrote) < message.size()) {
outputBuffer_.append(message.data()+nwrote, message.size()-nwrote);//将未发送的数据写入缓冲区
if (!channel_->isWriting()) {
channel_->enableWriting();
}
}
}
对于未发送完的数据,channel会开始监测可写事件,一旦有可写事件发生就会调用handleWrite继续发送缓冲区中剩余的数据,一旦数据发送完,立即关闭对可写事件的监测,避免busy loop,代码逻辑如下:
void TcpConnection::handleWrite()
{
loop_->assertInLoopThread();
if (channel_->isWriting()) {
ssize_t n = ::write(channel_->fd(), outputBuffer_.peek(), outputBuffer_.readableBytes());
if (n > 0) {
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0) {
channel_->disableWriting(); //一旦数据发送完,立即关闭对可写事件的监测,避免busy loop
if (state_ == kDisconnecting) {
shutdownInLoop();
}
} else { //没有写完继续监测写事件
LOG_TRACE << "I am going to write more data";
}
} else {
LOG_SYSERR << "TcpConnection::handleWrite";
}
} else {
LOG_TRACE << "Connection is down, no more writing";
}
}
网友评论