使用read读取非阻塞socket
read()
函数可能有三种返回值:
-
-1
:出错或者无数据可读。 -
0
:对端关闭socket。 -
>0
:正常读取到了数据。
#define REDIS_IOBUF_LEN 1024
void readQueryFromClient(int fd)
{
char buf[REDIS_IOBUF_LEN];
int nread;
nread = read(fd, buf, REDIS_IOBUF_LEN);
if (nread == -1)
{
// 处理EAGAIN(因为fd为非阻塞的)
if (errno == EAGAIN)
{
nread = 0;
}
// read出错
else
{
// TODO:断开连接
return;
}
}
// 客户端断开连接
else if (nread == 0)
{
// TODO:断开连接
return;
}
if (nread)
{
// TODO:处理buf
}
// 此处的else不能省略,因为当read()返回-1,且errno为EAGAIN时,表示此时无数据可读,会将nread置0,就会走else分支
else
{
return;
}
}
疑问:需要考虑read()
函数被信号打断,从而返回-1
的情况吗?在Redis源码中,没有考虑这一点。
使用while循环处理系统调用可能被信号中断的情况
// 版本1
int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len)
{
int fd;
// 使用while循环的目的是为了处理EINTR错误
while(1)
{
fd = accept(s, sa, len);
if (fd == -1)
{
if (errno == EINTR)
{
continue;
}
else
{
anetSetError(err, "accept: %s", strerror(errno));
return ANET_ERR;
}
}
break;
}
return fd;
}
// 版本2
int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len)
{
int fd;
do
{
fd = accept(s, sa, len);
} while(-1 == fd && EINTR == errno);
if (-1 == fd)
{
anetSetError(err, "accept: %s", strerror(errno));
return ANET_ERR;
}
return fd;
}
可变参数的使用
anetSetError(err, "accept: %s", strerror(errno));
static void anetSetError(char *err, const char *fmt, ...)
{
va_list ap;
if (!err)
{
return;
}
va_start(ap, fmt);
vsnprintf(err, ANET_ERR_LEN, fmt, ap); // 其中,fmt既可以是格式化字符串(如"%s"),也可以是普通字符串(如"error"),或者两者结合(如"error:%s")
va_end(ap);
}
检查线程是否存在
int ret = pthread_kill(tid, 0);
if (0 == ret)
{
// 线程存在
}
else if (ESRCH == ret)
{
// 线程不存在
}
解析PASV模式下的ftp服务器的IP和端口号
const char *passive_param = "(127,0,0,1,12,34)";
const char *p1 = strchr(passive_param, '(');
if (NULL == p1)
{
// TODO
}
const char *p2 = strchr(passive_param, ')');
if (NULL == p2)
{
// TODO
}
if (p1 >= p2)
{
// TODO
}
int ip1, ip2, ip3, ip4;
int port1, port2;
sscanf(p1 + 1, "%d,%d,%d,%d,%d,%d", &ip1, &ip2, &ip3, &ip4, &port1, &port2);
char ip[16] = {0};
snprintf(ip, sizeof(ip), "%d.%d.%d.%d", ip1, ip2, ip3, ip4);
unsigned short port;
port = (port1 << 8) + port2; // 注意:port1<<8一定要加括号
通过inet_aton函数判断IP地址是域名还是点分十进制格式
if (inet_aton(addr, &sa.sin_addr) == 0) // 如果addr不是有效的IP地址,则inet_aton()会返回0
{
struct hostent *he;
he = gethostbyname(addr);
if (he == NULL)
{
__redisSetError(c,REDIS_ERR_OTHER,
sdscatprintf(sdsempty(),"Can't resolve: %s",addr));
close(s);
return REDIS_ERR;
}
memcpy(&sa.sin_addr, he->h_addr, sizeof(struct in_addr));
}
write函数的返回值不等于待写入的字节数
当write()
函数的返回值不等于待写入的字节数时,并不表示write()
函数出错了。
例如:
- 在Redis中:
int redisBufferWrite(redisContext *c, int *done) {
int nwritten;
if (sdslen(c->obuf) > 0)
{
nwritten = write(c->fd,c->obuf,sdslen(c->obuf));
if (nwritten == -1)
{
if (errno == EAGAIN)
{
/* Try again later */
}
else
{
__redisSetError(c,REDIS_ERR_IO,NULL);
return REDIS_ERR;
}
}
else if (nwritten > 0) // 注意:当nwritten不等于sdslen(c->obuf)时,并不表示write()出错
{
if (nwritten == (signed)sdslen(c->obuf))
{
sdsfree(c->obuf);
c->obuf = sdsempty();
}
else
{
c->obuf = sdsrange(c->obuf,nwritten,-1);
}
}
}
if (done != NULL)
{
*done = (sdslen(c->obuf) == 0);
}
return REDIS_OK;
}
- 在muduo中:
void TcpConnection::sendInLoop(const void* data, size_t len)
{
loop_->assertInLoopThread();
ssize_t nwrote = 0;
size_t remaining = len;
bool faultError = false;
if (state_ == kDisconnected)
{
LOG_WARN << "disconnected, give up writing";
return;
}
// if no thing in output queue, try writing directly
if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0)
{
nwrote = sockets::write(channel_->fd(), data, len);
if (nwrote >= 0)
{
remaining = len - nwrote;
if (remaining == 0 && writeCompleteCallback_)
{
loop_->queueInLoop(std::bind(writeCompleteCallback_, shared_from_this()));
}
}
else // nwrote < 0
{
nwrote = 0;
if (errno != EWOULDBLOCK)
{
LOG_SYSERR << "TcpConnection::sendInLoop";
if (errno == EPIPE || errno == ECONNRESET) // FIXME: any others?
{
faultError = true;
}
}
}
}
assert(remaining <= len);
if (!faultError && remaining > 0)
{
size_t oldLen = outputBuffer_.readableBytes();
if (oldLen + remaining >= highWaterMark_
&& oldLen < highWaterMark_
&& highWaterMarkCallback_)
{
loop_->queueInLoop(std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining));
}
outputBuffer_.append(static_cast<const char*>(data)+nwrote, remaining);
if (!channel_->isWriting())
{
channel_->enableWriting();
}
}
}
对阻塞和非阻塞文件描述符的读或者写封装在一个函数中
即,在函数中要考虑read()
或者write()
返回-1的情况,并不代表函数出错了,也可能是资源暂不可用,需要判断errno
进一步判断。
比如Redis客户端的读与写:
int redisBufferRead(redisContext *c)
{
char buf[2048];
int nread = read(c->fd,buf,sizeof(buf));
if (nread == -1)
{
// 当fd为非阻塞,且fd暂不可读时,read()函数会返回-1,且errno等于EAGAIN
if (errno == EAGAIN)
{
/* Try again later */
}
else
{
__redisSetError(c,REDIS_ERR_IO,NULL);
return REDIS_ERR;
}
}
else if (nread == 0)
{
__redisSetError(c,REDIS_ERR_EOF,
sdsnew("Server closed the connection"));
return REDIS_ERR;
}
else
{
__redisCreateReplyReader(c);
redisReplyReaderFeed(c->reader,buf,nread);
}
return REDIS_OK;
}
int redisBufferWrite(redisContext *c, int *done)
{
int nwritten;
if (sdslen(c->obuf) > 0)
{
nwritten = write(c->fd,c->obuf,sdslen(c->obuf));
if (nwritten == -1)
{
// 当fd为非阻塞时,会出现write返回-1,且errno等于EAGAIN的情况,即fd暂时不可写时。而在阻塞模式下,当fd暂时不可写时,会直接阻塞在write调用处
if (errno == EAGAIN)
{
/* Try again later */
}
else
{
__redisSetError(c,REDIS_ERR_IO,NULL);
return REDIS_ERR;
}
}
else if (nwritten > 0) // 注意:当nwritten不等于sdslen(c->obuf)时,并不表示write()出错
{
// 写缓冲区中的数据全部写完
if (nwritten == (signed)sdslen(c->obuf))
{
sdsfree(c->obuf); // 释放SDS
c->obuf = sdsempty(); // 新建一个空SDS
}
// 写缓冲区中的数据未全部写完
else
{
c->obuf = sdsrange(c->obuf,nwritten,-1); // 保留SDS中从nwritten到-1区间的数据,下次可写时再写
}
}
}
// 如果写缓冲区中的数据全部写完,则将*done置为1,表示完成
if (done != NULL)
{
*done = (sdslen(c->obuf) == 0);
}
return REDIS_OK;
}
网友评论