flush

作者: JIU_LV | 来源:发表于2018-12-06 09:46 被阅读0次

    DefaultChannelPipeline.flush()

    调用unsafe

    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
         unsafe.flush();
     }
    

    AbstractChannel.flush()

    调用outboundbuffer.addFlush 然后flush0,

    @Override
    public final void flush() {
        assertEventLoop();
    
        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null) {
            return;
        }
    
        outboundBuffer.addFlush();
        flush0();
    }
    

    flush0()

    调用NioSocketChannel.flush0()

    @Override
    protected final void flush0() {
        // Flush immediately only when there's no pending flush.
        // If there's a pending flush operation, event loop will call forceFlush() later,
        // and thus there's no need to call it now.
        if (isFlushPending()) {
            return;
        }
        super.flush0();
    }
    //???
        private boolean isFlushPending() {
        SelectionKey selectionKey = selectionKey();
        return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
    }
    

    ChannelOutboundBuffer.addFlush()

    对unflushedEntry继续计数flushed++
    若已经cancel,进行其他处理???

    
    /**
    * Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
    * and so you will be able to handle them.
    */
    public void addFlush() {
    // There is no need to process all entries if there was already a flush before and no new messages
    // where added in the meantime.
    //
    // See https://github.com/netty/netty/issues/2577
    Entry entry = unflushedEntry;
    if (entry != null) {
        if (flushedEntry == null) {
            // there is no flushedEntry yet, so start with the entry
            flushedEntry = entry;
        }
        do {
            flushed ++;
            if (!entry.promise.setUncancellable()) {
                // Was cancelled so make sure we free up memory and notify about the freed bytes
                int pending = entry.cancel();
                decrementPendingOutboundBytes(pending, false, true);
            }
            entry = entry.next;
        } while (entry != null);
    
        // All flushed so reset unflushedEntry
        unflushedEntry = null;
    }
    }
    
    

    AbstractChannel

    主要调用 doWrite(outboundBuffer) 还有其他异常处理

    @SuppressWarnings("deprecation")
    protected void flush0() {
        if (inFlush0) {
            // Avoid re-entrance
            return;
        }
    
        final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null || outboundBuffer.isEmpty()) {
            return;
        }
    
        inFlush0 = true;
    
        // Mark all pending write requests as failure if the channel is inactive.
        if (!isActive()) {
            try {
                if (isOpen()) {
                    outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
                } else {
                    // Do not trigger channelWritabilityChanged because the channel is closed already.
                    outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                }
            } finally {
                inFlush0 = false;
            }
            return;
        }
    
        try {
            doWrite(outboundBuffer);
        } catch (Throwable t) {
            if (t instanceof IOException && config().isAutoClose()) {
                /**
                 * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
                 * failing all flushed messages and also ensure the actual close of the underlying transport
                 * will happen before the promises are notified.
                 *
                 * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
                 * may still return {@code true} even if the channel should be closed as result of the exception.
                 */
                close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
            } else {
                try {
                    shutdownOutput(voidPromise(), t);
                } catch (Throwable t2) {
                    close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                }
            }
        } finally {
            inFlush0 = false;
        }
    }
     @Override
        public boolean isActive() {
            SocketChannel ch = javaChannel();
            return ch.isOpen() && ch.isConnected();
        }
    

    NioSocketChannel.doWrite()

    真正写。对不同的nioBufferCnt用不同的写方法
    写完后会调用ChannelOutboundBuffer.removeBytes(writtenBytes)

    @Override
    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    for (;;) {
        int size = in.size();
        if (size == 0) {
            // All written so clear OP_WRITE
            clearOpWrite();
            break;
        }
        long writtenBytes = 0;
        boolean done = false;
        boolean setOpWrite = false;
    
        // Ensure the pending writes are made of ByteBufs only.
        ByteBuffer[] nioBuffers = in.nioBuffers();
        int nioBufferCnt = in.nioBufferCount();
        long expectedWrittenBytes = in.nioBufferSize();
        SocketChannel ch = javaChannel();
    
        // Always us nioBuffers() to workaround data-corruption.
        // See https://github.com/netty/netty/issues/2761
        switch (nioBufferCnt) {
            case 0:
                // We have something else beside ByteBuffers to write so fallback to normal writes.
                super.doWrite(in);
                return;
            case 1:
                // Only one ByteBuf so use non-gathering write
                ByteBuffer nioBuffer = nioBuffers[0];
                for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                    final int localWrittenBytes = ch.write(nioBuffer);
                    if (localWrittenBytes == 0) {
                        setOpWrite = true;
                        break;
                    }
                    expectedWrittenBytes -= localWrittenBytes;
                    writtenBytes += localWrittenBytes;
                    if (expectedWrittenBytes == 0) {
                        done = true;
                        break;
                    }
                }
                break;
            default:
                for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                    final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                    if (localWrittenBytes == 0) {
                        setOpWrite = true;
                        break;
                    }
                    expectedWrittenBytes -= localWrittenBytes;
                    writtenBytes += localWrittenBytes;
                    if (expectedWrittenBytes == 0) {
                        done = true;
                        break;
                    }
                }
                break;
        }
    
        // Release the fully written buffers, and update the indexes of the partially written buffer.
        in.removeBytes(writtenBytes);
    
        if (!done) {
            // Did not write all buffers completely.
            incompleteWrite(setOpWrite);
            break;
        }
    }
    }
    

    ChannelOutboundBuffer.removeBytes()

    记录写的progress,若current()的buffer写完,会调用remove();
    最后调用clearNioBuffers();

    /**
    * Removes the fully written entries and update the reader index of the partially written entry.
    * This operation assumes all messages in this buffer is {@link ByteBuf}.
    */
    public void removeBytes(long writtenBytes) {
        for (;;) {
            Object msg = current();
            if (!(msg instanceof ByteBuf)) {
                assert writtenBytes == 0;
                break;
            }
        
            final ByteBuf buf = (ByteBuf) msg;
            final int readerIndex = buf.readerIndex();
            final int readableBytes = buf.writerIndex() - readerIndex;
        
            if (readableBytes <= writtenBytes) {
                if (writtenBytes != 0) {
                    progress(readableBytes);
                    writtenBytes -= readableBytes;
                }
                remove();
            } else { // readableBytes > writtenBytes
                if (writtenBytes != 0) {
                    buf.readerIndex(readerIndex + (int) writtenBytes);
                    progress(writtenBytes);
                }
                break;
            }
        }
        clearNioBuffers();
    }
    

    remove()

    调用removeEntry(e)

    /**
    * Will remove the current message, mark its {@link ChannelPromise} as success and return {@code true}. If no
    * flushed message exists at the time this method is called it will return {@code false} to signal that no more
    * messages are ready to be handled.
    */
    public boolean remove() {
        Entry e = flushedEntry;
        if (e == null) {
            clearNioBuffers();
            return false;
        }
        Object msg = e.msg;
        
        ChannelPromise promise = e.promise;
        int size = e.pendingSize;
        
        removeEntry(e);
        
        if (!e.cancelled) {
            // only release message, notify and decrement if it was not canceled before.
            ReferenceCountUtil.safeRelease(msg);
            safeSuccess(promise);//回调
            decrementPendingOutboundBytes(size, false, true);//小于低水位,设置write快速再写
        }
        
        // recycle the entry 回收
        e.recycle();
        
        return true;
    }
    

    removeEntry()

    --flushed
    若flushed为0flushedEntity设置null
    否则把next设置给flushed

    private void removeEntry(Entry e) {
            if (-- flushed == 0) {
                // processed everything
                flushedEntry = null;
                if (e == tailEntry) {
                    tailEntry = null;
                    unflushedEntry = null;
                }
            } else {
                flushedEntry = e.next;
            }
        }
    
    //小于低水位,设置write快速再写
    private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
        if (size == 0) {
            return;
        }
        
        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
        if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
            setWritable(invokeLater);
        }
    }
    
    
    private void setWritable(boolean invokeLater) {
        for (;;) {
            final int oldValue = unwritable;
            final int newValue = oldValue & ~1;
            if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                if (oldValue != 0 && newValue == 0) {
                    fireChannelWritabilityChanged(invokeLater);
                }
                break;
            }
        }
    }        
    

    clearNioBuffers()

    把线程中分配的BUF设置为null

     // Clear all ByteBuffer from the array so these can be GC'ed.
        // See https://github.com/netty/netty/issues/3837
        private void clearNioBuffers() {
            int count = nioBufferCount;
            if (count > 0) {
                nioBufferCount = 0;
                Arrays.fill(NIO_BUFFERS.get(), 0, count, null);
            }
        }
      private static final FastThreadLocal<ByteBuffer[]> NIO_BUFFERS = new FastThreadLocal<ByteBuffer[]>() {
            @Override
            protected ByteBuffer[] initialValue() throws Exception {
                return new ByteBuffer[1024];
            }
        };
    

    AbstractNioByteChannel.incompleteWrite()

    网咯拥塞的时候(writeBytes数量为0)设置write,加速写

    protected final void incompleteWrite(boolean setOpWrite) {
        // Did not write completely.
        if (setOpWrite) {
            setOpWrite();
        } else {
            // Schedule flush again later so other tasks can be picked up in the meantime
            Runnable flushTask = this.flushTask;
            if (flushTask == null) {
                flushTask = this.flushTask = new Runnable() {
                    @Override
                    public void run() {
                        flush();
                    }
                };
            }
            eventLoop().execute(flushTask);
        }
    }
    
    

    setOpWrite()

      protected final void setOpWrite() {
            final SelectionKey key = selectionKey();
            // Check first if the key is still valid as it may be canceled as part of the deregistration
            // from the EventLoop
            // See https://github.com/netty/netty/issues/2104
            if (!key.isValid()) {
                return;
            }
            final int interestOps = key.interestOps();
            if ((interestOps & SelectionKey.OP_WRITE) == 0) {
                key.interestOps(interestOps | SelectionKey.OP_WRITE);
            }
        }
    

    clearOpWrite()

    protected final void clearOpWrite() {
        final SelectionKey key = selectionKey();
        // Check first if the key is still valid as it may be canceled as part of the deregistration
        // from the EventLoop
        // See https://github.com/netty/netty/issues/2104
        if (!key.isValid()) {
            return;
        }
        final int interestOps = key.interestOps();
        if ((interestOps & SelectionKey.OP_WRITE) != 0) {
            key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
        }
    }

    相关文章

      网友评论

        本文标题:flush

        本文链接:https://www.haomeiwen.com/subject/tvjncqtx.html