美文网首页
Netty源码剖析

Netty源码剖析

作者: 王侦 | 来源:发表于2023-02-17 16:20 被阅读0次

    Netty服务端示例:

    public class NettyServer {
    
        public static void main(String[] args) throws Exception {
    
            // 创建两个线程组bossGroup和workerGroup, 含有的子线程NioEventLoop的个数默认为cpu核数的两倍
            // bossGroup只是处理连接请求 ,真正的和客户端业务处理,会交给workerGroup完成
            EventLoopGroup bossGroup = new NioEventLoopGroup(3);
            EventLoopGroup workerGroup = new NioEventLoopGroup(8);
            try {
                // 创建服务器端的启动对象
                ServerBootstrap bootstrap = new ServerBootstrap();
                // 使用链式编程来配置参数
                bootstrap.group(bossGroup, workerGroup) //设置两个线程组
                        // 使用NioServerSocketChannel作为服务器的通道实现
                        .channel(NioServerSocketChannel.class)
                        // 初始化服务器连接队列大小,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。
                        // 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        .childHandler(new ChannelInitializer<SocketChannel>() {//创建通道初始化对象,设置初始化参数,在 SocketChannel 建立起来之前执行
    
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                //对workerGroup的SocketChannel设置处理器
                                ch.pipeline().addLast(new NettyServerHandler());
                            }
                        });
                System.out.println("netty server start。。");
                // 绑定一个端口并且同步, 生成了一个ChannelFuture异步对象,通过isDone()等方法可以判断异步事件的执行情况
                // 启动服务器(并绑定端口),bind是异步操作,sync方法是等待异步操作执行完毕
                ChannelFuture cf = bootstrap.bind(9000).sync();
                // 给cf注册监听器,监听我们关心的事件
                /*cf.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (cf.isSuccess()) {
                            System.out.println("监听端口9000成功");
                        } else {
                            System.out.println("监听端口9000失败");
                        }
                    }
                });*/
                // 等待服务端监听端口关闭,closeFuture是异步操作
                // 通过sync方法同步等待通道关闭处理完毕,这里会阻塞等待通道关闭完成,内部调用的是Object的wait()方法
                cf.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    

    1.NioEventLoopGroup和NioEventLoop

    public NioEventLoopGroup() {
            this(0);
    }
    
        protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
            super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
        }
    

    线程数默认是核心数的两倍。

        protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
            if (nThreads <= 0) {
                throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
            }
    
            if (executor == null) {
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
    
            children = new EventExecutor[nThreads];
    
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                    children[i] = newChild(executor, args);
                    success = true;
                } catch (Exception e) {
                    // TODO: Think about if this is a good exception type
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                    if (!success) {
                        for (int j = 0; j < i; j ++) {
                            children[j].shutdownGracefully();
                        }
    
                        for (int j = 0; j < i; j ++) {
                            EventExecutor e = children[j];
                            try {
                                while (!e.isTerminated()) {
                                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                }
                            } catch (InterruptedException interrupted) {
                                // Let the caller handle the interruption.
                                Thread.currentThread().interrupt();
                                break;
                            }
                        }
                    }
                }
            }
    
            chooser = chooserFactory.newChooser(children);
    
            final FutureListener<Object> terminationListener = new FutureListener<Object>() {
                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (terminatedChildren.incrementAndGet() == children.length) {
                        terminationFuture.setSuccess(null);
                    }
                }
            };
    
            for (EventExecutor e: children) {
                e.terminationFuture().addListener(terminationListener);
            }
    
            Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
            Collections.addAll(childrenSet, children);
            readonlyChildren = Collections.unmodifiableSet(childrenSet);
        }
    

    重点看下newChild()

        protected EventLoop newChild(Executor executor, Object... args) throws Exception {
            EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
            return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
        }
    

    NioEventLoop里面有两个最核心的组件:

    • 1)在其父类构造方法SingleThreadEventExecutor()里面初始化了taskQueue,有可能是LinkedBlockingQueue,也有可能是MpscUnboundedArrayQueue或者MpscUnboundedAtomicArrayQueue
    • 2)selectorTuple = openSelector();

    2.ServerBootstrap

    配置参数:

    • ServerBootstrap#group(parentGroup, childGroup),设置this.group = parentGroup, this.childGroup = childGroup。
    • AbstractBootstrap#channel,设置this.channelFactory
    • AbstractBootstrap#option,设置参数
    • ServerBootstrap#childHandler(i),设置this.childHandler

    2.1 服务端向selector注册ACCEPT事件并绑定端口地址

    AbstractBootstrap#bind(int)

    • bind(SocketAddress)
    • AbstractBootstrap#doBind
      1)initAndRegister();
       1-1)channel = channelFactory.newChannel(); 这里调用ReflectiveChannelFactory#newChannel,然后会调用传入类的构造方法constructor.newInstance();
        NioServerSocketChannel()构造方法;
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));其中newSocket()会调用SelectorProvider.provider(). openServerSocketChannel()创建ServerSocketChannel。
        pipeline = newChannelPipeline();创建DefaultChannelPipeline。
        super(null, channel, SelectionKey.OP_ACCEPT);
        this.readInterestOp = readInterestOp;关注ACCEPT事件;
        ch.configureBlocking(false);设置为非阻塞模式;
       1-2)init(channel); 核心是向pipeline添加了一个ChannelHandler(ChannelInitializer一次性、初始化handler),负责添加一个ServerBootstrapAcceptor handler,添加完后,自己就移除了,ServerBootstrapAcceptor handler: 负责接收客户端连接创建连接后,对连接的初始化工作。
       1-3)config().group().register(channel);
        MultithreadEventLoopGroup#register()
        SingleThreadEventLoop#register()
        AbstractChannel.AbstractUnsafe#register
        eventLoop.execute()提交了一个register0()任务。
         AbstractNioChannel#doRegister,调用selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
         pipeline.invokeHandlerAddedIfNeeded();这里会调用上面设置的ChannelInitializer#initChannel方法,移除自己添加ServerBootstrapAcceptor 。(参考DefaultChannelPipeline#addLast()-> callHandlerCallbackLater(newCtx, true)-> PendingHandlerAddedTask -> ChannelInitializer#handlerAdded)
         pipeline.fireChannelRegistered();
         beginRead(),调用父类AbstractNioChannel#doBeginRead,这里会调用selectionKey.interestOps(interestOps | readInterestOp),也即关注ACCEPT事件。
      2)doBind0(regFuture, channel, localAddress, promise);
       channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

    2.2 NioEventLoop#run

    来分析一下SingleThreadEventExecutor#execute

    • 1)addTask(task);将任务加入到队列中taskQueue.offer(task);
    • 2)startThread();
      SingleThreadEventExecutor#doStartThread,这里会调用executor.execute()执行Runnable,Runnable的核心如下;
      SingleThreadEventExecutor.this.run();
      NioEventLoop#run

    上面的executor是ThreadPerTaskExecutor,在MultithreadEventExecutorGroup构造方法里面。

        protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
            if (nThreads <= 0) {
                throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
            }
    
            if (executor == null) {
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
    
            children = new EventExecutor[nThreads];
    
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                    children[i] = newChild(executor, args);
                    success = true;
    
    public final class ThreadPerTaskExecutor implements Executor {
        private final ThreadFactory threadFactory;
    
        public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
            if (threadFactory == null) {
                throw new NullPointerException("threadFactory");
            }
            this.threadFactory = threadFactory;
        }
    
        @Override
        public void execute(Runnable command) {
            threadFactory.newThread(command).start();
        }
    }
    

    重点就在NioEventLoop#run

    • 1)SelectStrategy.SELECT:select(wakenUp.getAndSet(false));这里面核心是调用 int selectedKeys = selector.select(timeoutMillis);
    • 2)processSelectedKeys();
    • 3)runAllTasks();从taskQueue中取出任务并执行。

    无锁串行化设计思想

    Netty的NioEventLoop读取到消息之后,直接调用ChannelPipeline的fireChannelRead(Object msg),只要用户不主动切换线程,一直会由NioEventLoop调用到用户的Handler,期间不进行线程切换,这种串行化处理方式避免了多线程操作导致的锁的竞争,从性能角度看是最优的。

    3.pipeline责任链

    来看看pipeline责任链调用流程:

    • DefaultChannelPipeline#fireChannelRegistered
    • AbstractChannelHandlerContext.invokeChannelRegistered(head);
    • 调用head.invokeChannelRegistered()
    • 调用HeadContext#channelRegistered
    • 核心是findContextInbound(MASK_CHANNEL_REGISTERED),找到下一个与MASK_CHANNEL_REGISTERED匹配的调用者,然后又重复调用AbstractChannelHandlerContext.invokeChannelRegistered()方法,实现责任链调用

    AbstractChannelHandlerContext.invokeChannelRegistered:

        static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeChannelRegistered();
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeChannelRegistered();
                    }
                });
            }
        }
    

    AbstractChannelHandlerContext#invokeChannelRegistered()

        private void invokeChannelRegistered() {
            if (invokeHandler()) {
                try {
                    ((ChannelInboundHandler) handler()).channelRegistered(this);
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireChannelRegistered();
            }
        }
    

    DefaultChannelPipeline.HeadContext#channelRegistered

           public void channelRegistered(ChannelHandlerContext ctx) {
                invokeHandlerAddedIfNeeded();
                ctx.fireChannelRegistered();
            }
    

    AbstractChannelHandlerContext#fireChannelRegistered

        public ChannelHandlerContext fireChannelRegistered() {
            invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
            return this;
        }
    

    核心是AbstractChannelHandlerContext#findContextInbound:

    • 从前往后查找AbstractChannelHandlerContext ,直到找到与mask匹配为止
        private AbstractChannelHandlerContext findContextInbound(int mask) {
            AbstractChannelHandlerContext ctx = this;
            do {
                ctx = ctx.next;
            } while ((ctx.executionMask & mask) == 0);
            return ctx;
        }
    

    4.服务端Channel注册并处理ACCEPT事件

    NioEventLoop#processSelectedKeys

        private void processSelectedKeys() {
            if (selectedKeys != null) {
                //不用JDK的selector.selectedKeys(), 性能更好(1%-2%),垃圾回收更少
                processSelectedKeysOptimized();
            } else {
                processSelectedKeysPlain(selector.selectedKeys());
            }
        }
    
        private void processSelectedKeysOptimized() {
            for (int i = 0; i < selectedKeys.size; ++i) {
                final SelectionKey k = selectedKeys.keys[i];
                // null out entry in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.keys[i] = null;
    
                //呼应于channel的register中的this: 例如:selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                final Object a = k.attachment();
    
                if (a instanceof AbstractNioChannel) {
                    processSelectedKey(k, (AbstractNioChannel) a);
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    processSelectedKey(k, task);
                }
    
                if (needsToSelectAgain) {
                    // null out entries in the array to allow to have it GC'ed once the Channel close
                    // See https://github.com/netty/netty/issues/2363
                    selectedKeys.reset(i + 1);
    
                    selectAgain();
                    i = -1;
                }
            }
        }
    
        private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
            if (!k.isValid()) {
                final EventLoop eventLoop;
                try {
                    eventLoop = ch.eventLoop();
                } catch (Throwable ignored) {
                    // If the channel implementation throws an exception because there is no event loop, we ignore this
                    // because we are only trying to determine if ch is registered to this event loop and thus has authority
                    // to close ch.
                    return;
                }
                // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
                // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
                // still healthy and should not be closed.
                // See https://github.com/netty/netty/issues/5125
                if (eventLoop != this || eventLoop == null) {
                    return;
                }
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
                return;
            }
    
            try {
                int readyOps = k.readyOps();
                // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
                // the NIO JDK channel implementation may throw a NotYetConnectedException.
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                    // See https://github.com/netty/netty/issues/924
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
    
                    unsafe.finishConnect();
                }
    
                // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                    ch.unsafe().forceFlush();
                }
    
                // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
                // to a spin loop
                //处理读请求(断开连接)或接入连接
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }
    

    重点关注一下服务端处理SelectionKey.OP_ACCEPT请求:unsafe.read();

    实际是AbstractNioMessageChannel.NioMessageUnsafe#read

    • 1)doReadMessages()首先会调用serverSocketChannel.accept(),然后将其封装成NioSocketChannel。
       NioSocketChannel构造方法,会创建DefaultChannelPipeline,会关注SelectionKey.OP_READ事件(赋值给this.readInterestOp),会设置非阻塞模式ch.configureBlocking(false)。
    • 2)pipeline.fireChannelRead(readBuf.get(i));这里会触发服务端的pipeline中的handler,核心是ServerBootstrapAcceptor#channelRead()。
       2-1)child.pipeline().addLast(childHandler),将netty服务端初始化时写的ChannelInitializer加入到客户端socketChannel的pipeline里面;
       2-2)childGroup.register(child).addListener(),跟上面服务端channel处理类似。
        A)向workerGroup线程池某个NioEventLoop中的selector注册读事件(是在pipeline.fireChannelActive() -> .DefaultChannelPipeline.HeadContext#read -> AbstractChannel.AbstractUnsafe#beginRead,这里会注册上面channel初始化传入的读事件),NioEventLoop#run死循环监听该事件;
        B)ChannelInitializer#initChannel方法,移除自己添加NettyServerHandler。
            public void read() {
                assert eventLoop().inEventLoop();
                final ChannelConfig config = config();
                final ChannelPipeline pipeline = pipeline();
                final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
                allocHandle.reset(config);
    
                boolean closed = false;
                Throwable exception = null;
                try {
                    try {
                        do {
                            int localRead = doReadMessages(readBuf);
                            if (localRead == 0) {
                                break;
                            }
                            if (localRead < 0) {
                                closed = true;
                                break;
                            }
    
                            allocHandle.incMessagesRead(localRead);
                        } while (allocHandle.continueReading());
                    } catch (Throwable t) {
                        exception = t;
                    }
    
                    int size = readBuf.size();
                    for (int i = 0; i < size; i ++) {
                        readPending = false;
                        pipeline.fireChannelRead(readBuf.get(i));
                    }
                    readBuf.clear();
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (exception != null) {
                        closed = closeOnReadError(exception);
    
                        pipeline.fireExceptionCaught(exception);
                    }
    
                    if (closed) {
                        inputShutdown = true;
                        if (isOpen()) {
                            close(voidPromise());
                        }
                    }
                } finally {
                    // Check if there is a readPending which was not processed yet.
                    // This could be for two reasons:
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                    // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                    //
                    // See https://github.com/netty/netty/issues/2254
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
        }
    

    5.客户端Channel处理READ事件

    NioEventLoop#processSelectedKey()
    -> AbstractNioByteChannel.NioByteUnsafe#read

    • 1)byteBuf = allocHandle.allocate(allocator); 分配byteBuf
    • 2) allocHandle.lastBytesRead(doReadBytes(byteBuf)); 从channel读取数据;
    • 3)pipeline.fireChannelRead(byteBuf),pipeline上执行,业务逻辑的处理就在这个地方

    6.直接内存、零拷贝与ByteBuf内存池

    在上面分配byteBuf里面,就使用了直接内存:

    RecvByteBufAllocator.DelegatingHandle#allocate

            public ByteBuf allocate(ByteBufAllocator alloc) {
                return delegate.allocate(alloc);
            }
    

    DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle#allocate

            public ByteBuf allocate(ByteBufAllocator alloc) {
                return alloc.ioBuffer(guess());
            }
    

    AbstractByteBufAllocator#ioBuffer(int)

        public ByteBuf ioBuffer(int initialCapacity) {
            if (PlatformDependent.hasUnsafe() || isDirectBufferPooled()) {
                return directBuffer(initialCapacity);
            }
            return heapBuffer(initialCapacity);
        }
    

    PooledByteBufAllocator#newDirectBuffer

        protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
            PoolThreadCache cache = threadCache.get();
            PoolArena<ByteBuffer> directArena = cache.directArena;
    
            final ByteBuf buf;
            if (directArena != null) {
                buf = directArena.allocate(cache, initialCapacity, maxCapacity);
            } else {
                buf = PlatformDependent.hasUnsafe() ?
                        UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                        new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
            }
    
            return toLeakAwareBuffer(buf);
        }
    

    使用直接内存的优缺点
    优点:

    • 不占用堆内存空间,减少了发生GC的可能
    • java虚拟机实现上,本地IO会直接操作直接内存(直接内存=>系统调用=>硬盘/网卡),而非直接内存则需要二次拷贝(堆内存=>直接内存=>系统调用=>硬盘/网卡)

    缺点:

    • 初始分配较慢
    • 没有JVM直接帮助管理内存,容易发生内存溢出。为了避免一直没有FULL GC,最终导致直接内存把物理内存耗完。我们可以指定直接内存的最大值,通过-XX:MaxDirectMemorySize来指定,当达到阈值的时候,调用system.gc来进行一次FULL GC,间接把那些没有被使用的直接内存回收掉。

    对于堆外直接内存的分配和回收,是一件耗时的操作。为了尽量重用缓冲区,Netty提供了基于ByteBuf内存池的缓冲区重用机制。需要的时候直接从池子里获取ByteBuf使用即可,使用完毕之后就重新放回到池子里去。

    PooledByteBufAllocator#newDirectBuffer
    -> PoolArena#allocate()
    -> PoolArena.DirectArena#newByteBuf
    -> PooledUnsafeDirectByteBuf#newInstance
    -> 最终通过RECYCLER内存池获取ByteBuf对象,如果是非内存池实现,则直接创建一个新的ByteBuf对象。

    PoolArena#allocate()

        PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
            PooledByteBuf<T> buf = newByteBuf(maxCapacity);
            allocate(cache, buf, reqCapacity);
            return buf;
        }
    

    PoolArena.DirectArena#newByteBuf

            protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
                if (HAS_UNSAFE) {
                    return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
                } else {
                    return PooledDirectByteBuf.newInstance(maxCapacity);
                }
            }
    

    PooledUnsafeDirectByteBuf#newInstance

        static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
            PooledUnsafeDirectByteBuf buf = RECYCLER.get();
            buf.reuse(maxCapacity);
            return buf;
        }
    

    7.ByteBuf扩容机制

    ByteBuf.writeByte()->AbstractByteBuf

    AbstractByteBuf#writeByte

        public ByteBuf writeByte(int value) {
            ensureWritable0(1);
            _setByte(writerIndex++, value);
            return this;
        }
    

    AbstractByteBuf#ensureWritable0

        final void ensureWritable0(int minWritableBytes) {
            ensureAccessible();
            if (minWritableBytes <= writableBytes()) {
                return;
            }
            final int writerIndex = writerIndex();
            if (checkBounds) {
                if (minWritableBytes > maxCapacity - writerIndex) {
                    throw new IndexOutOfBoundsException(String.format(
                            "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                            writerIndex, minWritableBytes, maxCapacity, this));
                }
            }
    
            // Normalize the current capacity to the power of 2.
            int minNewCapacity = writerIndex + minWritableBytes;
            int newCapacity = alloc().calculateNewCapacity(minNewCapacity, maxCapacity);
    
            int fastCapacity = writerIndex + maxFastWritableBytes();
            // Grow by a smaller amount if it will avoid reallocation
            if (newCapacity > fastCapacity && minNewCapacity <= fastCapacity) {
                newCapacity = fastCapacity;
            }
    
            // Adjust to the new capacity.
            capacity(newCapacity);
        }
    

    AbstractByteBufAllocator#calculateNewCapacity

        public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
            checkPositiveOrZero(minNewCapacity, "minNewCapacity");
            if (minNewCapacity > maxCapacity) {
                throw new IllegalArgumentException(String.format(
                        "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                        minNewCapacity, maxCapacity));
            }
            final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
    
            if (minNewCapacity == threshold) {
                return threshold;
            }
    
            // If over threshold, do not double but just increase by threshold.
            if (minNewCapacity > threshold) {
                int newCapacity = minNewCapacity / threshold * threshold;
                if (newCapacity > maxCapacity - threshold) {
                    newCapacity = maxCapacity;
                } else {
                    newCapacity += threshold;
                }
                return newCapacity;
            }
    
            // Not over threshold. Double up to 4 MiB, starting from 64.
            int newCapacity = 64;
            while (newCapacity < minNewCapacity) {
                newCapacity <<= 1;
            }
    
            return Math.min(newCapacity, maxCapacity);
        }
    

    Netty的ByteBuf需要动态扩容来满足需要,扩容过程: 默认门限阈值为4MB(这个阈值是一个经验值,不同场景,可能取值不同),当需要的容量等于门限阈值,使用阈值作为新的缓存区容量 目标容量,如果大于阈值,采用每次步进4MB的方式进行内存扩张((需要扩容值/4MB)*4MB),扩张后需要和最大内存(maxCapacity)进行比较,大于maxCapacity的话就用maxCapacity,否则使用扩容值 目标容量,如果小于阈值,采用倍增的方式,以64(字节)作为基本数值,每次翻倍增长64 -->128 --> 256,直到倍增后的结果大于或等于需要的容量值。

    8.空轮询bug处理

    NioEventLoop#select

        private void select(boolean oldWakenUp) throws IOException {
            Selector selector = this.selector;
            try {
                int selectCnt = 0;
                long currentTimeNanos = System.nanoTime();
                //按scheduled的task时间来计算select timeout时间。
                long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
    
                long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();
                if (nextWakeupTime != normalizedDeadlineNanos) {
                    nextWakeupTime = normalizedDeadlineNanos;
                }
    
                for (;;) {
                    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                    if (timeoutMillis <= 0) { //已经有定时task需要执行了,或者超过最长等待时间了
                        if (selectCnt == 0) {
                            //非阻塞,没有数据返回0
                            selector.selectNow();
                            selectCnt = 1;
                        }
                        break;
                    }
    
                    // If a task was submitted when wakenUp value was true, the task didn't get a chance to call
                    // Selector#wakeup. So we need to check task queue again before executing select operation.
                    // If we don't, the task might be pended until select operation was timed out.
                    // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
                    if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                        selector.selectNow();
                        selectCnt = 1;
                        break;
                    }
                    //下面select阻塞中,别人唤醒也可以可以的
                    int selectedKeys = selector.select(timeoutMillis);
                    selectCnt ++;
    
                    if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                        // - Selected something,
                        // - waken up by user, or
                        // - the task queue has a pending task.
                        // - a scheduled task is ready for processing
                        break;
                    }
                    if (Thread.interrupted()) {
                        // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                        // As this is most likely a bug in the handler of the user or it's client library we will
                        // also log it.
                        //
                        // See https://github.com/netty/netty/issues/2426
                        if (logger.isDebugEnabled()) {
                            logger.debug("Selector.select() returned prematurely because " +
                                    "Thread.currentThread().interrupt() was called. Use " +
                                    "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                        }
                        selectCnt = 1;
                        break;
                    }
    
                    long time = System.nanoTime();
                    if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                        // timeoutMillis elapsed without anything selected.
                        selectCnt = 1;
                    } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                            selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                        // The code exists in an extra method to ensure the method is not too big to inline as this
                        // branch is not very likely to get hit very frequently.
                        selector = selectRebuildSelector(selectCnt);
                        selectCnt = 1;
                        break;
                    }
    
                    currentTimeNanos = time;
                }
    
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                }
            } catch (CancelledKeyException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                            selector, e);
                }
                // Harmless exception - log anyway
            }
        }
    
    

    若Selector的轮询结果为空,也没有wakeup或新消息处理,则发生空轮询,CPU使用率100%。

    Netty的解决办法:

    • 1、对Selector的select操作周期进行统计,每完成一次空的select操作进行一次计数,若在某个周期内连续发生N次空轮询,则触发了epoll死循环bug。
    • 2、重建Selector,判断是否是其他线程发起的重建请求,若不是则将原SocketChannel从旧的Selector上去除注册,重新注册到新的Selector上,并将原来的Selector关闭。

    Netty解决办法具体步骤:

    • 1、先定义当前时间currentTimeNanos。
    • 2、接着计算出一个执行最少需要的时间timeoutMillis。
    • 3、每次对selectCnt做++操作。
    • 4、进行判断,如果执行达到或者超过了最少时间,则seletCnt重置为1(过滤到select超时返回情况)。
    • 5、一旦到达SELECTOR_AUTO_REBUILD_THRESHOLD这个阀值,就需要重建selector来解决这个问题。
    • 6、这个阀值默认是512。

    参考

    相关文章

      网友评论

          本文标题:Netty源码剖析

          本文链接:https://www.haomeiwen.com/subject/teppkdtx.html