美文网首页
Netty源码学习(6)--pipeline学习2

Netty源码学习(6)--pipeline学习2

作者: 未名枯草 | 来源:发表于2018-01-16 14:44 被阅读19次

    Unsafe

    unsafe是不安全的意思,不要在应用程序里面直接使用Unsafe以及他的衍生类对象。Unsafe 在Channel定义,属于Channel的内部类,表明Unsafe和Channel密切相关。

    interface Unsafe {
    /**
             * Return the assigned {@link RecvByteBufAllocator.Handle} which will be used to allocate {@link ByteBuf}'s when
             * receiving data.
             */
            RecvByteBufAllocator.Handle recvBufAllocHandle();
    
            /**
             * Return the {@link SocketAddress} to which is bound local or
             * {@code null} if none.
             */
            SocketAddress localAddress();
    
            /**
             * Return the {@link SocketAddress} to which is bound remote or
             * {@code null} if none is bound yet.
             */
            SocketAddress remoteAddress();
    
            /**
             * Register the {@link Channel} of the {@link ChannelPromise} and notify
             * the {@link ChannelFuture} once the registration was complete.
             */
            void register(EventLoop eventLoop, ChannelPromise promise);
    
            /**
             * Bind the {@link SocketAddress} to the {@link Channel} of the {@link ChannelPromise} and notify
             * it once its done.
             */
            void bind(SocketAddress localAddress, ChannelPromise promise);
    
            /**
             * Connect the {@link Channel} of the given {@link ChannelFuture} with the given remote {@link SocketAddress}.
             * If a specific local {@link SocketAddress} should be used it need to be given as argument. Otherwise just
             * pass {@code null} to it.
             *
             * The {@link ChannelPromise} will get notified once the connect operation was complete.
             */
            void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
    
            /**
             * Disconnect the {@link Channel} of the {@link ChannelFuture} and notify the {@link ChannelPromise} once the
             * operation was complete.
             */
            void disconnect(ChannelPromise promise);
    
            /**
             * Close the {@link Channel} of the {@link ChannelPromise} and notify the {@link ChannelPromise} once the
             * operation was complete.
             */
            void close(ChannelPromise promise);
    
            /**
             * Closes the {@link Channel} immediately without firing any events.  Probably only useful
             * when registration attempt failed.
             */
            void closeForcibly();
    
            /**
             * Deregister the {@link Channel} of the {@link ChannelPromise} from {@link EventLoop} and notify the
             * {@link ChannelPromise} once the operation was complete.
             */
            void deregister(ChannelPromise promise);
    
            /**
             * Schedules a read operation that fills the inbound buffer of the first {@link ChannelInboundHandler} in the
             * {@link ChannelPipeline}.  If there's already a pending read operation, this method does nothing.
             */
            void beginRead();
    
            /**
             * Schedules a write operation.
             */
            void write(Object msg, ChannelPromise promise);
    
            /**
             * Flush out all write operations scheduled via {@link #write(Object, ChannelPromise)}.
             */
            void flush();
    
            /**
             * Return a special ChannelPromise which can be reused and passed to the operations in {@link Unsafe}.
             * It will never be notified of a success or error and so is only a placeholder for operations
             * that take a {@link ChannelPromise} as argument but for which you not want to get notified.
             */
            ChannelPromise voidPromise();
    
            /**
             * Returns the {@link ChannelOutboundBuffer} of the {@link Channel} where the pending write requests are stored.
             */
            ChannelOutboundBuffer outboundBuffer();
    

    Usafe接口继承及实现关系下类图:

    image.png

    NioUnsafe 在 Unsafe基础上增加了以下几个接口

        public interface NioUnsafe extends Unsafe {
            /**
             * Return underlying  可以访问底层jdk
             */
            SelectableChannel ch();
    
            /**
             * Finish connect
             */
            void finishConnect();
    
            /**
             * Read from underlying {@link SelectableChannel}
             */
            void read();
    
            void forceFlush();
        }
    
    1. 从增加的接口以及类名上来看,NioUnsafe 增加了可以访问底层jdk的SelectableChannel的功能,定义了从SelectableChannel读取数据的read方法。AbstractUnsafe 实现了大部分Unsafe的功能。AbstractNioUnsafe 主要是通过代理到其外部类AbstractNioChannel拿到了与jdk nio相关的一些信息,比如SelectableChannel,SelectionKey等等。
    2. NioSocketChannelUnsafe 和 NioByteUnsafe 放到一起讲,其实现了IO的基本操作,读,和写,这些操作都与jdk底层相关

    Unsafe的分类

    继承结构来看,我们可以总结出两种类型的Unsafe分类,

    1. 与连接的字节数据读写相关的NioByteUnsafe
    2. 与新连接建立操作相关的NioMessageUnsafe

    NioByteUnsafe中的读:委托到外部类NioSocketChannel
    已知,boss线程主要负责监听并处理accept事件,将socketChannel注册到work线程的selector,由worker线程来监听并处理read事件。当work线程的selector检测到OP_READ事件发生时,触发read操作。

    //NioEventLoop  
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {  
        unsafe.read();  
        if (!ch.isOpen()) {  
            // Connection already closed - no need to handle write.  
            return;  
        }  
    }
    

    unsafe.read()直接调转到unsafe的接口NioByteUnsafe类中的read()方法,此方法的实现在类AbstractNioByteChannel中:

    @Override
            public final void read() {
                final ChannelConfig config = config();
                final ChannelPipeline pipeline = pipeline();
                final ByteBufAllocator allocator = config.getAllocator();
    
                //负责自适应调整当前缓存分配的大小,以防止缓存分配过多或过少
                final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
                allocHandle.reset(config);
    
                ByteBuf byteBuf = null;
                boolean close = false;
                try {
                    do {
                        byteBuf = allocHandle.allocate(allocator);  // 申请一块指定大小的内存。
                        allocHandle.lastBytesRead(doReadBytes(byteBuf));
                        if (allocHandle.lastBytesRead() <= 0) {
                            // nothing was read. release the buffer.
                            byteBuf.release();
                            byteBuf = null;
                            close = allocHandle.lastBytesRead() < 0;
                            break;
                        }
    
                        allocHandle.incMessagesRead(1);
                        readPending = false;
                        pipeline.fireChannelRead(byteBuf);  // 触发事件,将会引发pipeline的读事件传播
                        byteBuf = null;
                    } while (allocHandle.continueReading());
    
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (close) {
                        closeOnRead(pipeline);
                    }
                } catch (Throwable t) {
                    handleReadException(pipeline, byteBuf, t, close, allocHandle);
                } finally {
                    // Check if there is a readPending which was not processed yet.
                    // This could be for two reasons:
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                    //
                    // See https://github.com/netty/netty/issues/2254
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
        }
    

    (1) allocHandle//\负责自适应调整当前缓存分配的大小,以防止缓存分配过多或过少;
    allocHandle的初始化类为AdaptiveRecvByteBufAllocator

    public class AdaptiveRecvByteBufAllocator extends DefaultMaxMessagesRecvByteBufAllocator {
    
        static final int DEFAULT_MINIMUM = 64;   //最小缓存(64),在SIZE_TABLE中对应的下标为3。
        static final int DEFAULT_INITIAL = 1024;  //初始化缓存大小,第一次分配缓存时,
    //由于没有上一次实际收到的字节数做参考,需要给一个默认初始值。
    
        static final int DEFAULT_MAXIMUM = 65536; //最大缓存(65536),在SIZE_TABLE中对应的下标为38。
    
        private static final int INDEX_INCREMENT = 4;  //上次预估缓存偏小,下次index的递增值。
        private static final int INDEX_DECREMENT = 1; //上次预估缓存偏大,下次index的递减值。
    
        private static final int[] SIZE_TABLE;   //按照从小到大的顺序预先存储可以分配的缓存大小。
    //从16开始,每次累加16,直到496,接着从512开始,每次增大一倍,直到溢出。
    
        static {
            List<Integer> sizeTable = new ArrayList<Integer>();
            for (int i = 16; i < 512; i += 16) {
                sizeTable.add(i);
            }
    
            for (int i = 512; i > 0; i <<= 1) {
                sizeTable.add(i);
            }
    
            SIZE_TABLE = new int[sizeTable.size()];
            for (int i = 0; i < SIZE_TABLE.length; i ++) {
                SIZE_TABLE[i] = sizeTable.get(i);
            }
        }
    
    

    (2) allocHandle.allocate(allocator)申请一块指定大小的内存。

    //DefaultMaxBytesRecvByteBufAllocator.HandleImpl
    public ByteBuf allocate(ByteBufAllocator alloc) {
        return alloc.ioBuffer(nextReceiveBufferSize);
    }
    

    通过ByteBufAllocator的ioBuffer方法申请缓存,调用AbstractByteBufAllocator类中的方法ioBuffer:

        @Override
        public ByteBuf ioBuffer(int initialCapacity) {
            if (PlatformDependent.hasUnsafe()) { //判断平台是否支持unsafe
                return directBuffer(initialCapacity); //直接物理内存
            }
            return heapBuffer(initialCapacity);  //堆上内存
        }
    

    根据平台是否支持unsafe,选择使用直接物理内存还是堆上内存。
    directBuffer方案:

    //类AbstractByteBufAllocator
     @Override
        public ByteBuf directBuffer(int initialCapacity) {
            return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
        }
    
        @Override
        public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
            if (initialCapacity == 0 && maxCapacity == 0) {
                return emptyBuf;
            }
            validate(initialCapacity, maxCapacity);
            return newDirectBuffer(initialCapacity, maxCapacity);
        }
    

    newDirectBuffer方法调用到UnpooledByteBufAllocator类中:

       @Override
        protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
            final ByteBuf buf;
            if (PlatformDependent.hasUnsafe()) {
                buf = PlatformDependent.useDirectBufferNoCleaner() ?
                        new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
                        new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
            } else {
                buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
            }
            return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
        }
    
    Netty中使用引用计数机制来管理资源,ByteBuf实现了ReferenceCounted接口,当实例化一个ByteBuf时,引用计数为1, 代码中需要保持一个该对象的引用时需要调用retain方法将计数增1,对象使用完时调用release将计数减1。当引用计数变为0时,对象将释放所持有的底层资源或将资源返回资源池。

    (3) 方法doReadBytes(byteBuf)将socketChannel数据写入缓存。

    //NioSocketChannel
      @Override
        protected int doReadBytes(ByteBuf byteBuf) throws Exception {
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.attemptedBytesRead(byteBuf.writableBytes());
            return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
        }
    

    继续调用:

       @Override
        public ByteBuf writeBytes(ByteBuf src, int length) {
            if (length > src.readableBytes()) {
                throw new IndexOutOfBoundsException(String.format(
                        "length(%d) exceeds src.readableBytes(%d) where src is: %s", length, src.readableBytes(), src));
            }
            writeBytes(src, src.readerIndex(), length);
            src.readerIndex(src.readerIndex() + length);//读取src数据到this.ByteBuf
            return this;
        }
    
        @Override
        public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {
            ensureAccessible();
            //是否扩容处理
            ensureWritable(length);
            //调用子类实现
            setBytes(writerIndex, src, srcIndex, length);
            //记录已写长度
            writerIndex += length;
            return this;
        }
    

    总结:

    1.writeBytes跟setBytes、readBytes跟getBytes区别是前者有记录,后者没有,而后者是子类的实现

    2.扩容算法是两种策略:
    大于4M时不走double自增,数值范围取 minNewCapacity <= maxCapacity
    少于4M时从64开始double自增

    3.更改容量也是每个子类实现,要考虑两种情况
    大于当前容量
    小于当前容量,当小于的时候要考虑 readerIndex、writerIndex边界,当超过 readerIndex、writerIndex边界heap的策略是丢去原来的数据

    4.heap是继承 AbstractReferenceCountedByteBuf的,当refCnt记录为1时释放数据

    =============================

    ByteBuf

    Nio ByteBuffer 和 Netty ByteBuf 对比

    1 指针:
    ByteBuffer

    例如下面使用buffer的例子:

    public class Test2 {  
        public static void main(String[] args) {  
            String content = "abcdefg";  
            ByteBuffer byteBuffer = ByteBuffer.allocate(256);  
            byteBuffer.put(content.getBytes());  
            byteBuffer.flip();  
            byte[] bufferValue = new byte[byteBuffer.remaining()];  
            byteBuffer.get(bufferValue);  
            System.out.println(new String(bufferValue));  
        }  
    }  
    

    ByteBuffer中会有三个下标,初始位置0,当前位置positon,limit位置,初始时,position为0,limit为Buffer数组末尾
    调用buffer.put(value.getBytes())后:


    image.png

    不调用flip:
    从缓冲区读取的是position — limit位置的数据,明显不是我们要的
    调用flip:
    会将limit设置为position,position设置为0,,此时读取的数据 :


    image.png

    比较关键的代码 byteBuffer.flip();它会把limit设置为position的位置。否则读取到的将会是错误的内容。

    ByteBuf:

    ByteBuf中使用两个指针,readerIndex,writerIndex来指示位置,初始时readrIndex = writerIndex = 0,当写入数据后:


    image.png

    writerIndex — capacity:可写容量
    readerIndex — writerIndex:可读部分
    当读取了M个字节后:


    image.png

    调用discardReadBytes,会释放掉discardReadBytes的空间,并把readableBytes复制到从0开始的位置,因此这里会发生内存复制,频繁调用会影响性能

    image.png

    2 扩容

    ByteBuffer

    ByteBuffer缓冲区的长度固定,分多了会浪费内存,分少了存放大的数据时会索引越界,所以使用ByteBuffer时,为了解决这个问题,我们一般每次put操作时,都会对可用空间进行校检,如果剩余空间不足,需要重新创建一个新的ByteBuffer,然后将旧的ByteBuffer复制到新的ByteBuffer中去。

    ByteBuf:

    而ByteBuf则对其进行了改进,它会自动扩展,具体的做法是,写入数据时,会调用ensureWritable方法,传入我们需要写的字节长度,判断是否需要扩容,然后使用calculateNewCapacity进行扩容:

    image.png image.png

    相关文章

      网友评论

          本文标题:Netty源码学习(6)--pipeline学习2

          本文链接:https://www.haomeiwen.com/subject/wozjoxtx.html