美文网首页
netty之write和flush

netty之write和flush

作者: hello_kd | 来源:发表于2022-01-26 16:44 被阅读0次

    在netty开发中,当调用pipeline的write方法时,并不会将数据直接写入到底层channel通道发送出去,而是先添加到缓冲区中;只有当调用flush方法,才会真正将数据从缓冲区写入到channel并发送出去。netty还提供了一个简便的方法,结合两者的功能writeAndFlush。

    在之前的文章说过,应用程序开发中,主动调用IO操作,比如write、bind等等,触发的IO操作在pipeline上会从尾部的出站handler传播到头部出站handler,中间可能会经过各种编码器等等,但最终都会经过netty内置在pipeline上的HeadContext,而HeadContext的IO操作方法又是委托给内部的unsafe。

    因此本文就来讲述下unsafe的write和flush方法的处理逻辑。

    write源码
    public final void write(Object msg, ChannelPromise promise) {
        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        int size;
        try {
            msg = filterOutboundMessage(msg);
            size = pipeline.estimatorHandle().size(msg);
            if (size < 0) {
                size = 0;
            }
        } catch (Throwable t) {
            try {
                ReferenceCountUtil.release(msg);
            } finally {
                safeSetFailure(promise, t);
            }
            return;
        }
          
        outboundBuffer.addMessage(msg, size, promise);
    }
    

    write方法主要有两个步骤

    1. 会先对msg进行校验,校验msg是不是ByteBuf或者FileRegion类型的,非这两种类型直接抛出异常
    2. 将消息添加到缓冲区outboundBuffer中
    public void addMessage(Object msg, int size, ChannelPromise promise) {
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
        }
        tailEntry = entry;
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }
    
        // increment pending bytes after adding message to the unflushed arrays.
        // See https://github.com/netty/netty/issues/1619
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }
    

    其实,netty底层维护的是一个链表,链表的每个元素是一个Entry对象,而待发送的消息msg和promise就是维护在entry对象中的。

    将消息添加到链表后,还会将此条消息占用的字节数(实际发送的+entry本身的开销)维护到channel的一个全局变量totalPendingSize中,表示待发送的总字节数。并且与高水位值比较,若比较大,那么会触发handler的unwritable方法。这个后续再用专门的章节来说明。

    至此,write方法主要是将消息封装成entry对象,然后添加到channel对应的outboundBuffer维护的链表当中,并且会判断待发送的总字节数是否超过高水位值,若超出了,则触发handler的unwritable方法

    接下来,来看下flush方法处理逻辑,调用这方法会真正的将数据从buffer写入到channel通道去

    flush源码
    public final void flush() {
        assertEventLoop();
    
        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null) {
            return;
        }
    
        outboundBuffer.addFlush();
        flush0();
    }
    

    addFlushed()方法主要是将outboundBuffer底层的链表的首指针flushedEntry指向链表第一个元素,然后unflushedEntry置为null,再算下待发送的entry数量。

    flush0()会先判断channel注册到的selector有没正在监控write事件,没有的话才会去处理数据,有的话,在NioEventLoop的事件循环中会处理,这里就无需处理了

    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
        // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
        ch.unsafe().forceFlush();
    }
    

    下面为真正的处理数据发送逻辑

    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        SocketChannel ch = javaChannel();
    //此次写操作,调用channel的write方法次数上限,默认值为16
        int writeSpinCount = config().getWriteSpinCount();
        do {
    //若输出缓冲区是空的,也就是底层entry链表没有元素了,一般是已经写完数据了,那么取消selector的OP_WRITE事件,然后直接return
            if (in.isEmpty()) {
                clearOpWrite();
                return;
            }
            // 每次写请求,gathering的最大字节数
            int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
    //将outboundBuffer的entry链表的消息转成nio的ByteBuffer数组
            ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
    //ByteBuffer的数量
            int nioBufferCnt = in.nioBufferCount();
    
            //将ByteBuffer写入到channel,分为三种情况处理
            switch (nioBufferCnt) {
    
                case 0:
                    //为0的情况正常不会出现
                    writeSpinCount -= doWrite0(in);
                    break;
                case 1: {
                    //只有1个ByteBuffer,通常是write后便直接flush的,只有一个buffer的情况,直接用普通的
                    ByteBuffer buffer = nioBuffers[0];
                    int attemptedBytes = buffer.remaining();
                    final int localWrittenBytes = ch.write(buffer);
                    if (localWrittenBytes <= 0) {
                        incompleteWrite(true);
                        return;
                    }
                    adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                    in.removeBytes(localWrittenBytes);
                    --writeSpinCount;
                    break;
                }
                default: {
                    // 有多个ByteBuffer,一般是调用多次write方法后,再调flush方法的
                    long attemptedBytes = in.nioBufferSize();
                    final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                    if (localWrittenBytes <= 0) {
                        incompleteWrite(true);
                        return;
                    }
                    // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
                    adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
                            maxBytesPerGatheringWrite);
                    in.removeBytes(localWrittenBytes);
                    --writeSpinCount;
                    break;
                }
            }
        } while (writeSpinCount > 0);
    
    //若这此次的循环中未将所有数据写入到channel中,那么注册selector的OP_WRITE事件,再下次的事件循环中处理
        incompleteWrite(writeSpinCount < 0);
    }
    

    上述switch块内有几个重要点

    1. 不同nioBufferCnt数量的不同处理逻辑,主要有当为1时,直接用底层channel的write方法,写入单个ByteBuffer;当于1,一次性写入多个ByteBuffer,使用的是gathering技术
    2. 当调用write方法返回后,如实际写入的字节数<=0,可能是此时channel的socket缓冲区已经满了,不允许写入数据了,所以需要向selector注册OP_WRITE事件,待下次事件循环中再处理。
    3. 会根据试图写入的数据和实际写入的数据动态调整maxBytesPerGatheringWrite
    4. 已写入的数据需要从缓冲区移除,不然下次循环会重复写入数据
    public void removeBytes(long writtenBytes) {
        for (;;) {
    //获取当前flushedEntry对象
            Object msg = current();
            if (!(msg instanceof ByteBuf)) {
                assert writtenBytes == 0;
                break;
            }
    
            final ByteBuf buf = (ByteBuf) msg;
            final int readerIndex = buf.readerIndex();
            final int readableBytes = buf.writerIndex() - readerIndex;
    //比较当前entry对应的消息字节数和已写入字节数,
    //当写入的较大或者相等时,表示这个entry对象消息已经全部写入到channel了  
    //因此,会调用remove方法(这方法主要作用是将entry对象从链表移除,且通知ChannelPromise的监听器)
    //当写入的字节数<这个entry对象的消息字节数时,说明只写入了一部分,那么只需要移动readerIndex的索引值
    ..
            if (readableBytes <= writtenBytes) {
                if (writtenBytes != 0) {
                    progress(readableBytes);
                    writtenBytes -= readableBytes;
                }
                remove();
            } else { // readableBytes > writtenBytes
                if (writtenBytes != 0) {
                    buf.readerIndex(readerIndex + (int) writtenBytes);
                    progress(writtenBytes);
                }
                break;
            }
        }
    //将ByteBuffer数组的所有ByteBuffer值置为null
        clearNioBuffers();
    }
    

    在上面这个方法内部的remove方法,主要逻辑是将entry对象从链表移除,并且通知promise的监听器

    public boolean remove() {
        Entry e = flushedEntry;
    //当前链表为空了,那么将线程的ByteBuffer数组的所有元素都置为null
        if (e == null) {
            clearNioBuffers();
            return false;
        }
        Object msg = e.msg;
    
        ChannelPromise promise = e.promise;
        int size = e.pendingSize;
    
    //当前entry对象从链表移除
        removeEntry(e);
    
        if (!e.cancelled) {
            // 每个entry对象维护的msg是ByteBuf类型的,因为已经写入到channel了,所以将引用计数-1
            ReferenceCountUtil.safeRelease(msg);
    //promise结果值置为成功,并通知promise的监听器
            safeSuccess(promise);
    //扣掉缓冲区待发送的字节数,并且判断是否小于低水位值了,若小于,则触发channelWritabilityChanged方法
            decrementPendingOutboundBytes(size, false, true);
        }
    
        //回收entry对象
        e.recycle();
    
        return true;
    }
    

    相关文章

      网友评论

          本文标题:netty之write和flush

          本文链接:https://www.haomeiwen.com/subject/pfhbhrtx.html