netty是如何实现流控的
netty实现流控的方式可以分两个大类,第一类依赖于tcp的窗口机制,第二类通过使用流量整形,这里只对第一类进行介绍
滑动窗口和拥塞窗口
tcp协议使用滑动窗口和拥塞窗口进行流量控制
-
滑动窗口是在接收端确认的窗口,其实就是接收端的接收缓冲区,随着数据的不断接收,如果接收端处理的速度没有接收数据的速度快,那么缓冲区的大小在减小,也即是窗口在减少,那么接收端在发给发送端的ack中会包含可以接收的数据大小,那么发送端就会发送对应接收端剩余窗口大小的数据
-
另一方面在发送端维护了一个拥塞窗口,在开始发送数据时,会使用一个慢启动算法发送,窗口从1开始以指数级增长当达到临界值时,改为线性增加,如果发现网络中有重传发生,那么就会将拥塞窗口减小到1,从而避免自己发的太快导致的网络堵塞。
netty依赖窗口实现的流控
- 作为发送方:当我们执行channel.write()时,最终的write操作会由unsafe接口处理,实现是AbstractUnsafe,片段代码如下:
public final void write(Object msg, ChannelPromise promise) {
//。。。省略
outboundBuffer.addMessage(msg, size, promise);
}
紧接着addMessage方法会将消息加入到队列中,进而增加发送缓冲区字节数,代码如下:
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(entry.pendingSize, false);
}
incrementPendingOutboundBytes会将当前字节大小与配置的写的高水位比较,如果超过配置的高水位值则执行setUnwritable方法,就是将writable属性设置为false,若设置成功则触发了pipeline中的fireChannelWritabilityChanged接口,这个方法我们可以自己去实现比如降低我们发送的速率
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
同样的在将数据慢慢发送去之后,如果小于配置的低水位了,那么就再会触发该事件
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
setWritable(invokeLater);
}
}
private void setWritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue & ~1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue != 0 && newValue == 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
所以这里要么可以维护一个发送队列暂存消息或者快速失败
- 作为接收方:netty提供了一个配置autoread,该参数设置后,那么即使select出的事件为读时,那么也不会从内核中读数据,这样就会导致接收窗口满,那么就会通知发送方接收端窗口为0,使得发送方降低发送速度,具体实现代码如下:
#NioEventLoop 的processSelectedKey方法在触发read方法时,会调用NioByteUnsafe的read方法
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
#该方法里面有个allocHandle.continueReading()判断,具体实现在MaxMessageHandle
public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
#判断的依据就是autoread
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return config.isAutoRead() &&
maybeMoreDataSupplier.get() &&
totalMessages < maxMessagePerRead &&
totalBytesRead > 0;
}
业务控制方式:
作为发送方,发送前应该检查iswritable 如果返回false,一种方式是延迟发送可以用队列或者timeout去实现
作为接收方,如果业务处理速度不足以匹配接收速度,结合队列控制队列满时将autoread设置为false,不满时将其打开,但是应该注意不满的条件触发,不应使得autoread频繁开关,毕竟这也是耗费系统资源的
网友评论