美文网首页netty
Netty中线程池NioEventLoopGroup初始化流程解

Netty中线程池NioEventLoopGroup初始化流程解

作者: 海涛_meteor | 来源:发表于2018-09-10 19:42 被阅读0次

    前言

    因为项目中有长连接的关系,所以用到了Netty框架,但一直都没有对这块做些系统性的整理和源码解析,准备有空的时候逐步补上,提到Netty首当其冲被提起的肯定是支持它承受高并发的线程模型,说到线程模型就不得不提到NioEventLoopGroup这个线程池,接下来进入正题。

    线程模型

    首先来看一段Netty的使用示例

            ThreadFactory bossThreadFactory = new ThreadFactoryBuilder().setNameFormat(getServerName() + " Server Acceptor NIO Thread#%d").build();
            ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat(getServerName() + " Server Reactor NIO Thread#%d").build();
    
            this.bossGroup = new NioEventLoopGroup(numberOfThreads, bossThreadFactory);
            this.workerGroup = new NioEventLoopGroup(numberOfThreads, workerThreadFactory);
    
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast("readTimeoutHandler", new ReadTimeoutHandler(DeviceServerListener.this.timeoutSeconds));
                    pipeline.addLast("lineBasedFrameDecoder-" + maxLength, new LineBasedFrameDecoder(Integer.parseInt(maxLength)));// 按行('\n')解析成命令ByteBuf
                    pipeline.addLast("stringPluginMessageDecoder", new StringDecoder(CharsetUtil.UTF_8));
                    pipeline.addLast("stringToByteEncoder", new StringToByteEncoder());// 将JSON字符串类型消息转换成ByteBuf
                    pipeline.addLast("deviceMessageDecoder", new DeviceMessageDecoder());// 将JSON字符串消息转成deviceMessage对象
                    pipeline.addLast("deviceMessageEncoder", new DeviceMessageEncoder());// 将deviceMessage对象转成JSON字符串
                    pipeline.addLast("deviceHeartBeatResponseHandler", new DeviceHeartBeatResponseHandler(heartTime));
                    pipeline.addLast("deviceAuthResponseHandler",
                            new DeviceAuthResponseHandler(DeviceServerListener.this.timeoutSeconds, DeviceServerListener.serverInstanceName));
                    pipeline.addLast("deviceMessageHandler", new DeviceMessageHandler());
    
                    // log.debug("Added Handler to Pipeline: {}", pipeline.names());
                }
            }).option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT).childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                    .option(ChannelOption.RCVBUF_ALLOCATOR, AdaptiveRecvByteBufAllocator.DEFAULT);
    
            // Start the server. Bind and start to accept incoming connections.
            this.channelFuture = bootstrap.bind(serverPort).sync();
    

    从代码中可以看到这里使用了两个线程池bossGroupworkerGroup,那么为什么需要定义两个线程池呢?这就要说到Netty的线程模型了。

    netty.png
    Netty的线程模型被称为Reactor模型,具体如图所示,图上的mainReactor指的就是bossGroup,这个线程池处理客户端的连接请求,并将accept的连接注册到subReactor的其中一个线程上;图上的subReactor当然指的就是workerGroup,负责处理已建立的客户端通道上的数据读写;图上还有一块ThreadPool是具体的处理业务逻辑的线程池,一般情况下可以复用subReactor,比我的项目中就是这种用法,但官方建议处理一些较为耗时的业务时还是要使用单独的ThreadPool。

    源码解析

    讲完线程池原理接着来看代码,this.bossGroup = new NioEventLoopGroup(numberOfThreads, bossThreadFactory)bossGroup进行了初始化,不难看出这里传入了线程数量和线程工厂,我们接着来看一下源码

        public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
            this(nThreads, threadFactory, SelectorProvider.provider());
        }
    
        public NioEventLoopGroup(
                int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
            this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
        }
    
        public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
            final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
            super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
        }
    

    可以看到为了创建这个NioEventLoopGroup进行了一连串的构造方法引用,其中有个默认值值得注意,那就是SelectorProvider.provider()。对NIO了解的人一定记得其中有一个很重要的部件Selector,既然Netty是对NIO的高度封装那么一定也少不了这个组件,其实这里的SelectorProvider.provider()就是根据不同的操作系统平台获取对应的Selector。这里的构造方法继续调用了父类MultithreadEventLoopGroup的构造方法,代码如下:

        protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
            super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
        }
    

    这里又继续调用了父类MultithreadEventExecutorGroup的构造方法,来看一下:

        protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
            if (nThreads <= 0) {
                throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
            }
    
            if (threadFactory == null) {
                threadFactory = newDefaultThreadFactory();
            }
    
            children = new SingleThreadEventExecutor[nThreads];
            if (isPowerOfTwo(children.length)) {
                chooser = new PowerOfTwoEventExecutorChooser();
            } else {
                chooser = new GenericEventExecutorChooser();
            }
    
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                    children[i] = newChild(threadFactory, args);
                    success = true;
                } catch (Exception e) {
                    // TODO: Think about if this is a good exception type
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                    if (!success) {
                        for (int j = 0; j < i; j ++) {
                            children[j].shutdownGracefully();
                        }
    
                        for (int j = 0; j < i; j ++) {
                            EventExecutor e = children[j];
                            try {
                                while (!e.isTerminated()) {
                                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                }
                            } catch (InterruptedException interrupted) {
                                Thread.currentThread().interrupt();
                                break;
                            }
                        }
                    }
                }
            }
    
            final FutureListener<Object> terminationListener = new FutureListener<Object>() {
                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (terminatedChildren.incrementAndGet() == children.length) {
                        terminationFuture.setSuccess(null);
                    }
                }
            };
    
            for (EventExecutor e: children) {
                e.terminationFuture().addListener(terminationListener);
            }
        }
    

    首先看一下children,这是一个EventExecutor的数组,具体生成的是SingleThreadEventExecutor的对象数组,关于SingleThreadEventExecutor从名字就能看出来是一个线程池的执行器,等用到的时候再来具体分析。
    然后来看一下chooser这个对象的初始化,这里根据isPowerOfTwo(children.length)的执行结果有不同的实现,isPowerOfTwo(children.length)从方法名就能看出来是判断children的长度是否为2的幂次方,我们来看一下其实现

        private static boolean isPowerOfTwo(int val) {
            return (val & -val) == val;
        }
    

    这是一个很经典的判断2的幕次的方法,非常值得记住,另外这里很值得说的一点就是给方法取一个有含义的名字是多么重要。。。
    判断完children的长度后就该生成chooser了,来看看两个不同实现类PowerOfTwoEventExecutorChooserGenericEventExecutorChooser的区别

        private final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
            @Override
            public EventExecutor next() {
                return children[childIndex.getAndIncrement() & children.length - 1];
            }
        }
    
        private final class GenericEventExecutorChooser implements EventExecutorChooser {
            @Override
            public EventExecutor next() {
                return children[Math.abs(childIndex.getAndIncrement() % children.length)];
            }
        }
    

    可以看到两者都实现了EventExecutorChooser,唯一区别是next()方法的实现不同。据此可以推测从children数组中取对象时并不是按照序号来的,而是next()方法的规则。这里的childIndex是一个AtomicInteger类型的计数器,从两个类的next()方法实现来看显然是PowerOfTwoEventExecutorChooser的位运算实现更高效,因此从性能角度考虑线程数这个值肯定是更推荐设置为2的幂次的。
    接着看children中成员的复制操作children[i] = newChild(threadFactory, args),这里的newChild方法在子类NioEventLoopGroup中实现,来看一下

        @Override
        protected EventExecutor newChild(ThreadFactory threadFactory, Object... args) throws Exception {
            return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0],
                ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
        }
    

    可以看到这里生成的是NioEventLoop的对象,具体实现如下

        NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
            super(parent, threadFactory, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
            if (selectorProvider == null) {
                throw new NullPointerException("selectorProvider");
            }
            if (strategy == null) {
                throw new NullPointerException("selectStrategy");
            }
            provider = selectorProvider;
            selector = openSelector();
            selectStrategy = strategy;
        }
    

    这里有两个值得关注的地方,一个是调用父类SingleThreadEventLoop的构造方法,另一个是openSelector()方法生成selector,这里首先看一下父类构造方法的调用

        protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedExecutionHandler) {
            super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
        }
    

    可以看到这里又调用了父类SingleThreadEventExecutor的构造方法

        @SuppressWarnings("deprecation")
        protected SingleThreadEventExecutor(
                EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp, int maxPendingTasks,
                RejectedExecutionHandler rejectedHandler) {
            if (threadFactory == null) {
                throw new NullPointerException("threadFactory");
            }
    
            this.parent = parent;
            this.addTaskWakesUp = addTaskWakesUp;
    
            thread = threadFactory.newThread(new Runnable() {
                @Override
                public void run() {
                    boolean success = false;
                    updateLastExecutionTime();
                    try {
                        SingleThreadEventExecutor.this.run();
                        success = true;
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception from an event executor: ", t);
                    } finally {
                        for (;;) {
                            int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
                            if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                    SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                                break;
                            }
                        }
                        // Check if confirmShutdown() was called at the end of the loop.
                        if (success && gracefulShutdownStartTime == 0) {
                            logger.error(
                                    "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                    SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                    "before run() implementation terminates.");
                        }
    
                        try {
                            // Run all remaining tasks and shutdown hooks.
                            for (;;) {
                                if (confirmShutdown()) {
                                    break;
                                }
                            }
                        } finally {
                            try {
                                cleanup();
                            } finally {
                                STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                                threadLock.release();
                                if (!taskQueue.isEmpty()) {
                                    logger.warn(
                                            "An event executor terminated with " +
                                            "non-empty task queue (" + taskQueue.size() + ')');
                                }
    
                                terminationFuture.setSuccess(null);
                            }
                        }
                    }
                }
            });
            threadProperties = new DefaultThreadProperties(thread);
            this.maxPendingTasks = Math.max(16, maxPendingTasks);
            taskQueue = newTaskQueue();
            rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
        }
    

    这里最重要的工作就是初始化了一个线程thread,我们具体来看一下其实现的run方法,这里最重要的是SingleThreadEventExecutor.this.run()这个方法调用(这里的语法乍一看没有看懂,查了资料才知道是内部类中独有的用法看来基础这块还需要加强啊),调用了SingleThreadEventExecutorrun方法,而该方法是个抽象方法,由其子类NioEventLoop实现,来具体看一下

       @Override
        protected void run() {
            for (;;) {
                try {
                    // 调用select()查询是否有就绪的IO事件
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));
                            if (wakenUp.get()) {
                                selector.wakeup();
                            }
                        default:
                            // fallthrough
                    }
    
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
                        processSelectedKeys();// 处理就绪的IO事件
                        runAllTasks();// 执行完任务队列中的任务
                    } else {
                        final long ioStartTime = System.nanoTime();
    
                        processSelectedKeys();
    
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);// 在给定时间内执行任务
                    }
    
                    if (isShuttingDown()) {// 检测用户是否要终止线程
                        closeAll();
                        if (confirmShutdown()) {
                            break;
                        }
                    }
                } 
            }
        }
    

    这里用了一个死循环进行轮询操作,首先来看一下判断条件selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()),该方法具体由DefaultSelectStrategy实现

        @Override
        public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
            return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
        }
    

    该方法根据hasTasks标志判断队列中是否有待执行任务,据此分别返回selectSupplier.get()SelectStrategy.SELECT,来看一下selectSupplier.get()

        private final IntSupplier selectNowSupplier = new IntSupplier() {
            @Override
            public int get() throws Exception {
                return selectNow();
            }
        };
    

    调用了NioEventLoop中的selectNow()方法,继续跟进

        int selectNow() throws IOException {
            try {
                return selector.selectNow();
            } finally {
                // restore wakup state if needed
                if (wakenUp.get()) {
                    selector.wakeup();
                }
            }
        }
    

    这里的selectNow()SelectorImpl类的方法,调用该方法返回IO事件就绪的通道数量,与select()方法不同的是在没有IO事件就绪时也不会阻塞。finally中的代码表示只有当wakenUp标记字段为true时才会执行selector.wakeup(),那么下一个select()操作也会立即返回。
    回到switch...case中,刚才提到了当hasTasks标志判断队列中没有任务时会返回SelectStrategy.SELECT从而进入case的对应分支执行select(wakenUp.getAndSet(false))

        private void select(boolean oldWakenUp) throws IOException {
            Selector selector = this.selector;
            try {
                int selectCnt = 0;
                long currentTimeNanos = System.nanoTime();
                long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
                for (;;) {
                    long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                    if (timeoutMillis <= 0) {
                        if (selectCnt == 0) {
                            selector.selectNow();
                            selectCnt = 1;
                        }
                        break;
                    }
    
                    if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                        selector.selectNow();
                        selectCnt = 1;
                        break;
                    }
    
                    int selectedKeys = selector.select(timeoutMillis);
                    selectCnt ++;
    
                    if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                        // - Selected something,
                        // - waken up by user, or
                        // - the task queue has a pending task.
                        // - a scheduled task is ready for processing
                        break;
                    }
                    if (Thread.interrupted()) {
                        selectCnt = 1;
                        break;
                    }
    
                    long time = System.nanoTime();
                    if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                        // timeoutMillis elapsed without anything selected.
                        selectCnt = 1;
                    } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                        rebuildSelector();// 对空轮询BUG的处理
                        selector = this.selector;
    
                        // Select again to populate selectedKeys.
                        selector.selectNow();
                        selectCnt = 1;
                        break;
                    }
    
                    currentTimeNanos = time;
                }
    
                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                }
            } 
        }
    

    其它的正常NIO处理逻辑就不多说了,重点关注一下rebuildSelector()这个方法,NIO中有个非常著名的空轮询bug,该bug出现时会造成cpu飙升到100%的情况,由于该bug造成的原因是select()方法未阻塞直接返回0,因此Netty采用的策略是对select()方法返回0的操作计数,如果次数大于阈值SELECTOR_AUTO_REBUILD_THRESHOLD就新建一个selector,将注册到老的selector上的channel重新注册到新的selector上。这里的阈值SELECTOR_AUTO_REBUILD_THRESHOLD可以修改,默认情况为512。
    跳回到NioEventLooprun()方法,接着来看下面的代码片段

                    if (ioRatio == 100) {
                        processSelectedKeys();// 处理就绪的IO事件
                        runAllTasks();// 执行完任务队列中的任务
                    } else {
                        final long ioStartTime = System.nanoTime();
    
                        processSelectedKeys();
    
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);// 在给定时间内执行任务
                    }
    

    这里的ioRatio是一个非常重要的参数,它代表着IO任务占总任务(IO+普通任务)的时间执行比例,默认值是50,也就代表着IO任务和普通任务的执行时间各占一半。从源码就可以看出,当ioRatio为100时,runAllTasks()的执行没有时间限制,否则其执行时间要根据processSelectedKeys()的执行时间进行调整。来具体分析下这两个方法做了什么,首先看processSelectedKeys()方法

        private void processSelectedKeys() {
            if (selectedKeys != null) {
                processSelectedKeysOptimized(selectedKeys.flip());// 使用优化
            } else {
                processSelectedKeysPlain(selector.selectedKeys());// 普通处理
            }
        }
    

    这里根据selectedKeys对象是否为空采用优化执行或者普通执行,这里重点看一下普通处理的实现

        private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
            if (selectedKeys.isEmpty()) {
                return;
            }
    
            Iterator<SelectionKey> i = selectedKeys.iterator();
            for (;;) {
                final SelectionKey k = i.next();
                final Object a = k.attachment();
                i.remove();
    
                if (a instanceof AbstractNioChannel) {
                    processSelectedKey(k, (AbstractNioChannel) a);
                } else {
                    @SuppressWarnings("unchecked")
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    processSelectedKey(k, task);
                }
    
                if (!i.hasNext()) {
                    break;
                }
    
                if (needsToSelectAgain) {
                    selectAgain();
                    selectedKeys = selector.selectedKeys();
    
                    // Create the iterator again to avoid ConcurrentModificationException
                    if (selectedKeys.isEmpty()) {
                        break;
                    } else {
                        i = selectedKeys.iterator();
                    }
                }
            }
        }
    

    方法中通过迭代器遍历了selectedKeys中的对象,进而对每个对象执行processSelectedKey方法

        private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final NioUnsafe unsafe = ch.unsafe();
            if (!k.isValid()) {
                final EventLoop eventLoop;
                try {
                    eventLoop = ch.eventLoop();
                } catch (Throwable ignored) {
                    return;
                }
                if (eventLoop != this || eventLoop == null) {
                    return;
                }
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
                return;
            }
    
            try {
                int readyOps = k.readyOps();
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                    if (!ch.isOpen()) {
                        // Connection already closed - no need to handle write.
                        return;
                    }
                }
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                    ch.unsafe().forceFlush();
                }
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
    
                    unsafe.finishConnect();
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }
    

    该方法基本上就是标准的NIO中的常规操作,唯一不太一样的地方是这里对channel的操作都由对应的NioUnsafe对象代替。
    讲完了processSelectedKeys()方法,接着来看runAllTasks(long timeoutNanos)方法

        protected boolean runAllTasks(long timeoutNanos) {
            fetchFromScheduledTaskQueue();
            Runnable task = pollTask();
            if (task == null) {
                return false;
            }
    
            final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
            long runTasks = 0;
            long lastExecutionTime;
            for (;;) {
                try {
                    task.run();
                } catch (Throwable t) {
                    logger.warn("A task raised an exception.", t);
                }
    
                runTasks ++;
    
                // Check timeout every 64 tasks because nanoTime() is relatively expensive.
                // XXX: Hard-coded value - will make it configurable if it is really a problem.
                if ((runTasks & 0x3F) == 0) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    if (lastExecutionTime >= deadline) {
                        break;
                    }
                }
    
                task = pollTask();
                if (task == null) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    break;
                }
            }
    
            this.lastExecutionTime = lastExecutionTime;
            return true;
        }
    

    首先看一下fetchFromScheduledTaskQueue()方法

        private boolean fetchFromScheduledTaskQueue() {
            long nanoTime = AbstractScheduledEventExecutor.nanoTime();
            Runnable scheduledTask  = pollScheduledTask(nanoTime);
            while (scheduledTask != null) {
                if (!taskQueue.offer(scheduledTask)) {
                    // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
                    scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
                    return false;
                }
                scheduledTask  = pollScheduledTask(nanoTime);
            }
            return true;
        }
    

    方法中主要是对scheduledTask对象的操作,该对象通过pollScheduledTask(nanoTime)方法获取,来看一下该方法

        protected final Runnable pollScheduledTask(long nanoTime) {
            assert inEventLoop();
    
            Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
            ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
            if (scheduledTask == null) {
                return null;
            }
    
            if (scheduledTask.deadlineNanos() <= nanoTime) {
                scheduledTaskQueue.remove();
                return scheduledTask;
            }
            return null;
        }
    

    该方法概括起来就是从scheduledTaskQueue中到期的定时任务从队列中移除并返回。再回过头看fetchFromScheduledTaskQueue()方法就很好理解了,就是将到期的定时任务放入taskQueue队列中,如果taskQueue满了则重新放回scheduledTaskQueue队列中。
    接着回到runAllTasks方法,来看看pollTask()方法如何复制task对象的

        protected Runnable pollTask() {
            assert inEventLoop();
            for (;;) {
                Runnable task = taskQueue.poll();
                if (task == WAKEUP_TASK) {
                    continue;
                }
                return task;
            }
        }
    

    该方法就是对taskQueue.poll()方法的封装,并且要求任务不是WAKEUP_TASK的,这里的WAKEUP_TASK是一个标记任务,使用这个标记任务是为了线程能正确退出。
    接着看runAllTasks方法,其中(runTasks & 0x3F) == 0用于判断计数器runTasks是否达到64(这里用位运算判断64而不是用>做也真的是把性能运用到极致了),也就是说每执行64个任务执行一次ScheduledFutureTask.nanoTime()方法获取当前时间,判断是否已经到达执行结束时间,按官方说明这样做是因为nanoTime()方法开销较大。
    到此SingleThreadEventLoop的构造方法就分析完了,接着来看看NioEventLoopopenSelector()方法怎么生成selector

        private Selector openSelector() {
            final Selector selector;
            try {
                selector = provider.openSelector();
            } catch (IOException e) {
                throw new ChannelException("failed to open a new selector", e);
            }
    
            if (DISABLE_KEYSET_OPTIMIZATION) {
                return selector;
            }
    
            final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
    
            //拿到sun.nio.ch.SelectorImpl 的字节码
            Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                @Override
                public Object run() {
                    try {
                        return Class.forName(
                                "sun.nio.ch.SelectorImpl",
                                false,
                                PlatformDependent.getSystemClassLoader());
                    } catch (ClassNotFoundException e) {
                        return e;
                    } catch (SecurityException e) {
                        return e;
                    }
                }
            });
    
            //获取sun.nio.ch.SelectorImpl 的字节码失败情况
            if (!(maybeSelectorImplClass instanceof Class) ||
                    // ensure the current selector implementation is what we can instrument.
                    !((Class<?>) maybeSelectorImplClass).isAssignableFrom(selector.getClass())) {
                if (maybeSelectorImplClass instanceof Exception) {
                    Exception e = (Exception) maybeSelectorImplClass;
                    logger.trace("failed to instrument a special java.util.Set into: {}", selector, e);
                }
                return selector;
            }
    
            final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
    
            Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
                @Override
                public Object run() {
                    try {
                        /**
                         * 反射替换hashset和set成员变量为netty优化后的数组实现的set
                         * 优化的方案  用数组实现set集合  add添加的时间复杂度降低为O(1)
                         */
                        Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                        Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    
                        selectedKeysField.setAccessible(true);
                        publicSelectedKeysField.setAccessible(true);
                        //通过反射进行替换
                        selectedKeysField.set(selector, selectedKeySet);
                        publicSelectedKeysField.set(selector, selectedKeySet);
                        return null;
                    } catch (NoSuchFieldException e) {
                        return e;
                    } catch (IllegalAccessException e) {
                        return e;
                    }
                }
            });
    
            if (maybeException instanceof Exception) {
                selectedKeys = null;
                Exception e = (Exception) maybeException;
                logger.trace("failed to instrument a special java.util.Set into: {}", selector, e);
            } else {
                selectedKeys = selectedKeySet;
                logger.trace("instrumented a special java.util.Set into: {}", selector);
            }
    
            return selector;
        }
    

    DISABLE_KEYSET_OPTIMIZATION参数指定不需要进行优化,或者获取sun.nio.ch.SelectorImpl类的字节码失败时,通过provider.openSelector()返回一个默认的selector,此时没有进行任何优化。当获取到sun.nio.ch.SelectorImpl的字节码时,通过反射方式将该对象的属性selectedKeyspublicSelectedKeys分别由原来的HashSet类型替换为SelectedSelectionKeySet类型。SelectedSelectionKeySet实现了Set接口,并且通过数组存储的方式重写了add方法,将add添加的时间复杂度降低为O(1),具体代码就不再贴了。
    到此NioEventLoopGroup线程池的初始化过程就分析完了。

    总结

    本文主要对Netty的线程模型和线程池NioEventLoopGroup初始化过程进行了讲解,从中应该能对Netty开发者为了追求高性能而在细节上做出的各种努力有了一些了解。

    相关文章

      网友评论

        本文标题:Netty中线程池NioEventLoopGroup初始化流程解

        本文链接:https://www.haomeiwen.com/subject/zrlegftx.html