美文网首页
流量控制

流量控制

作者: edolovee | 来源:发表于2018-03-20 19:01 被阅读0次

    netty是如何实现流控的

    netty实现流控的方式可以分两个大类,第一类依赖于tcp的窗口机制,第二类通过使用流量整形,这里只对第一类进行介绍

    滑动窗口和拥塞窗口

    tcp协议使用滑动窗口和拥塞窗口进行流量控制

    1. 滑动窗口是在接收端确认的窗口,其实就是接收端的接收缓冲区,随着数据的不断接收,如果接收端处理的速度没有接收数据的速度快,那么缓冲区的大小在减小,也即是窗口在减少,那么接收端在发给发送端的ack中会包含可以接收的数据大小,那么发送端就会发送对应接收端剩余窗口大小的数据

    2. 另一方面在发送端维护了一个拥塞窗口,在开始发送数据时,会使用一个慢启动算法发送,窗口从1开始以指数级增长当达到临界值时,改为线性增加,如果发现网络中有重传发生,那么就会将拥塞窗口减小到1,从而避免自己发的太快导致的网络堵塞。

    netty依赖窗口实现的流控

    1. 作为发送方:当我们执行channel.write()时,最终的write操作会由unsafe接口处理,实现是AbstractUnsafe,片段代码如下:
    public final void write(Object msg, ChannelPromise promise) {
               //。。。省略
                outboundBuffer.addMessage(msg, size, promise);
     }
    

    紧接着addMessage方法会将消息加入到队列中,进而增加发送缓冲区字节数,代码如下:

    public void addMessage(Object msg, int size, ChannelPromise promise) {
            Entry entry = Entry.newInstance(msg, size, total(msg), promise);
            if (tailEntry == null) {
                flushedEntry = null;
                tailEntry = entry;
            } else {
                Entry tail = tailEntry;
                tail.next = entry;
                tailEntry = entry;
            }
            if (unflushedEntry == null) {
                unflushedEntry = entry;
            }
    
            // increment pending bytes after adding message to the unflushed arrays.
            // See https://github.com/netty/netty/issues/1619
            incrementPendingOutboundBytes(entry.pendingSize, false);
        }
    

    incrementPendingOutboundBytes会将当前字节大小与配置的写的高水位比较,如果超过配置的高水位值则执行setUnwritable方法,就是将writable属性设置为false,若设置成功则触发了pipeline中的fireChannelWritabilityChanged接口,这个方法我们可以自己去实现比如降低我们发送的速率

    private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
            if (size == 0) {
                return;
            }
    
            long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
            if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
                setUnwritable(invokeLater);
            }
        }
    

    同样的在将数据慢慢发送去之后,如果小于配置的低水位了,那么就再会触发该事件

    private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
            if (size == 0) {
                return;
            }
    
            long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
            if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
                setWritable(invokeLater);
            }
        }
    
    private void setWritable(boolean invokeLater) {
            for (;;) {
                final int oldValue = unwritable;
                final int newValue = oldValue & ~1;
                if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                    if (oldValue != 0 && newValue == 0) {
                        fireChannelWritabilityChanged(invokeLater);
                    }
                    break;
                }
            }
        }
    

    所以这里要么可以维护一个发送队列暂存消息或者快速失败

    1. 作为接收方:netty提供了一个配置autoread,该参数设置后,那么即使select出的事件为读时,那么也不会从内核中读数据,这样就会导致接收窗口满,那么就会通知发送方接收端窗口为0,使得发送方降低发送速度,具体实现代码如下:
    #NioEventLoop 的processSelectedKey方法在触发read方法时,会调用NioByteUnsafe的read方法
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                }
    
    #该方法里面有个allocHandle.continueReading()判断,具体实现在MaxMessageHandle
     public final void read() {
                final ChannelConfig config = config();
                final ChannelPipeline pipeline = pipeline();
                final ByteBufAllocator allocator = config.getAllocator();
                final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
                allocHandle.reset(config);
    
                ByteBuf byteBuf = null;
                boolean close = false;
                try {
                    do {
                        byteBuf = allocHandle.allocate(allocator);
                        allocHandle.lastBytesRead(doReadBytes(byteBuf));
                        if (allocHandle.lastBytesRead() <= 0) {
                            // nothing was read. release the buffer.
                            byteBuf.release();
                            byteBuf = null;
                            close = allocHandle.lastBytesRead() < 0;
                            if (close) {
                                // There is nothing left to read as we received an EOF.
                                readPending = false;
                            }
                            break;
                        }
    
                        allocHandle.incMessagesRead(1);
                        readPending = false;
                        pipeline.fireChannelRead(byteBuf);
                        byteBuf = null;
                    } while (allocHandle.continueReading());
    
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (close) {
                        closeOnRead(pipeline);
                    }
                } catch (Throwable t) {
                    handleReadException(pipeline, byteBuf, t, close, allocHandle);
                } finally {
                    // Check if there is a readPending which was not processed yet.
                    // This could be for two reasons:
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                    //
                    // See https://github.com/netty/netty/issues/2254
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
    
    #判断的依据就是autoread
    public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
                return config.isAutoRead() &&
                       maybeMoreDataSupplier.get() &&
                       totalMessages < maxMessagePerRead &&
                       totalBytesRead > 0;
            }
    

    业务控制方式:
    作为发送方,发送前应该检查iswritable 如果返回false,一种方式是延迟发送可以用队列或者timeout去实现

    作为接收方,如果业务处理速度不足以匹配接收速度,结合队列控制队列满时将autoread设置为false,不满时将其打开,但是应该注意不满的条件触发,不应使得autoread频繁开关,毕竟这也是耗费系统资源的

    相关文章

      网友评论

          本文标题:流量控制

          本文链接:https://www.haomeiwen.com/subject/vecpqftx.html