美文网首页程序员
[Netty源码分析]EventLoop

[Netty源码分析]EventLoop

作者: 没意思先生1995 | 来源:发表于2018-08-26 22:02 被阅读0次

    QUESTION:

    1. 默认情况下Netty服务端起多少线程?何时启动?
    2. Netty如何解决JDK空轮训BUG?
    3. Netty如何保证异步串行无锁化?
      (待完善)

    NioEventLoopGroup创建

    NioEventLoopGroup实例化过程.png

    netty的程序的启动(在服务端一般是两个NioEventLoopGroup线程池,一个boss, 一个worker; 对于客户端一般是一个线程池)我们一般是使用NIO.boss用于接收客户端的TCP连接,work用于处理IO相关的读写操作或者执行系统Task,定时任务task.

    (1)在NioEventLoopGroup构造器调用:
    如果对于不指定线程数参数的构造器,默认设置为0(但是在后面的构造器中会判断,如果设置为0,就会初始化为2*CPU数量,对应问题一);

    源码:

    NioEventLoopGroup

            public NioEventLoopGroup() {
                this(0);
            }
            
                ↓↓↓↓
                
            public NioEventLoopGroup(int nThreads) {
                this(nThreads, (Executor) null);
            }
            -----
            设置了NioEventLoopGroup线程池中每个线程执行器默认是null(这里设置为null, 在后面的构造器中会判断,如果为null就实例化一个线程执行器)
            -----
            
                ↓↓↓↓
                
            public NioEventLoopGroup(int nThreads, Executor executor) {
                this(nThreads, executor, SelectorProvider.provider());
            }
            -----
            这里就存在于JDK的NIO的交互了,这里设置了线程池的SelectorProvider, 通过SelectorProvider.provider() 返回
            -----
            
                ↓↓↓↓
            
            public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
                this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
            }
            -----
            在这个重载的构造器中传入了默认的选择策略工厂DefaultSelectStrategyFactory
            -----
            
                ↓↓↓↓
            
            public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                                 final SelectStrategyFactory selectStrategyFactory) {
                super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
            }
            
            -----
            调用父类MultithreadEventLoopGroup的构造器,添加了线程的拒绝执行策略
            -----
    

    MultithreadEventLoopGroup

            private static final int DEFAULT_EVENT_LOOP_THREADS;
            
            static {
                DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                        "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
                ......
            }
            protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
                super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
            }
            --------
            构造器被定义成protect,表示只能在NioEventLoopGroup中被调用,一定层度上的保护作用。这里就是对线程数进行了判断,当nThreads为0 的时候就设置成 DEFAULT_EVENT_LOOP_THREADS 这个常量。这个常量的定义如下:其实就是在MultithreadEventLoopGroup的静态代码段,其实就是将DEFAULT_EVENT_LOOP_THREADS赋值为CPU核心数*2
            --------
                ↓↓↓↓
            调用的基类MultithreadEventExecutorGroup的构造器
    

    MultithreadEventExecutorGroup

            protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
                this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
            }
            ------
            构造器里面多传入了一个参数 DefaultEventExecutorChooserFactory.INSTANCE , 通过这个EventLoop选择器工厂可以实例化GenericEventExecutorChooser这个类, 这个类是EventLoopGroup线程池里面的EventLoop的选择器,调用GenericEventExecutorChooser.next() 方法可以从线程池中选择出一个合适的EventLoop线程
            ------
            
                ↓↓↓↓
            
            重载调用MultithreadEventExecutorGroup类的构造器
            
            /**
             * 最终的创建实例构造器
             *
             * @param nThreads          该实例将使用的线程数
             * @param executor          将要使用的executor, 默认为null
             * @param chooserFactory    将要使用的EventExecutorChooserFactory
             * @param args              arguments which will passed to each {@link #newChild(Executor, Object...)} call
             */
            protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                    EventExecutorChooserFactory chooserFactory, Object... args) {
                /** 1.初始化线程池 */
                //参数校验nThread合法性,
                if (nThreads <= 0) {
                    throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
                }
                //executor校验非空, 如果为空就创建ThreadPerTaskExecutor, 该类实现了 Executor接口
                // 这个executor 是用来执行线程池中的所有的线程,也就是所有的NioEventLoop,其实从
                //NioEventLoop构造器中也可以知道,NioEventLoop构造器中都传入了executor这个参数。
                if (executor == null) {
                    executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
                }
            
                //这里的children数组, 其实就是线程池的核心实现,线程池中就是通过指定的线程数组来实现线程池;
                //数组中每个元素其实就是一个EventLoop,EventLoop是EventExecutor的子接口。
                children = new EventExecutor[nThreads];
            
                //for循环实例化children数组,NioEventLoop对象
                for (int i = 0; i < nThreads; i ++) {
                    boolean success = false;
                    try {
                        //newChild(executor, args) 函数在NioEventLoopGroup类中实现了, 
                        // 实质就是就是存入了一个 NIOEventLoop类实例
                        children[i] = newChild(executor, args);
                        success = true;
                    } catch (Exception e) {
                        throw new IllegalStateException("failed to create a child event loop", e);
                    } finally {
                        //如果构造失败, 就清理资源
                        if (!success) {
                            for (int j = 0; j < i; j ++) {
                                children[j].shutdownGracefully();
                            }
                            for (int j = 0; j < i; j ++) {
                                EventExecutor e = children[j];
                                try {
                                    while (!e.isTerminated()) {
                                        e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                    }
                                } catch (InterruptedException interrupted) {
                                    Thread.currentThread().interrupt();
                                    break;
                                }
                            }
                        }
                    }
                }//end foreach
            
                /** 2.实例化线程工厂执行器选择器: 根据children获取选择器 */
                chooser = chooserFactory.newChooser(children);
            
                /** 3.为每个EventLoop线程添加 线程终止监听器*/
                final FutureListener<Object> terminationListener = new FutureListener<Object>() {
                    @Override
                    public void operationComplete(Future<Object> future) throws Exception {
                        if (terminatedChildren.incrementAndGet() == children.length) {
                            terminationFuture.setSuccess(null);
                        }
                    }
                };
            
                for (EventExecutor e: children) {
                    e.terminationFuture().addListener(terminationListener);
                }
            
                /** 4. 将children 添加到对应的set集合中去重, 表示只可读。*/
                Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
                Collections.addAll(childrenSet, children);
                readonlyChildren = Collections.unmodifiableSet(childrenSet);
            }
    

    [图片上传失败...(image-b490e5-1535292035799)]

    NioEventLoop 的继承链如下:

    NioEventLoop -> SingleThreadEventLoop -> SingleThreadEventExecutor -> AbstractScheduledEventExecutor

    在 AbstractScheduledEventExecutor 中, Netty 实现了 NioEventLoop 的 schedule 功能, 即我们可以通过调用一个 NioEventLoop 实例的 schedule()方法来运行一些定时任务. 而在 SingleThreadEventLoop 中, 又实现了任务队列的功能, 通过它, 我们可以调用一个 NioEventLoop 实例的 execute() 方法来向任务队列中添加一个 task, 并由 NioEventLoop 进行调度执行.

    通常来说, NioEventLoop 肩负着两种任务,:
    1)第一个是作为 IO 线程, 执行与 Channel 相关的 IO 操作, 包括 调用 select 等待就绪的 IO 事件、读写数据与数据的处理等;

    2)而第二个任务是作为任务队列, 执行 taskQueue 中的任务, 例如用户调用 eventLoop.schedule 提交的定时任务也是这个线程执行的.

    对于NioEventLoop的实例化,基本就是在NioEventLoopGroup.newChild() 中调用的

    源码:

    NioEventLoopGroup

        protected EventLoop newChild(Executor executor, Object... args) throws Exception {
            return new NioEventLoop(this, executor, (SelectorProvider) args[0]);
        }
        
            ↓↓↓↓
            
        new NioEventLoop()实例化
    

    NioEventLoop

        //Netty5 构造器为 NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider)
        
        NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
            //Netty 5 构造器         
            //super(parent, executor, false);
                     
            //调用父类构造器
            super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        
            if (selectorProvider == null) {
                throw new NullPointerException("selectorProvider");
            }
            //Netty5 取消该处判断
            if (strategy == null) {
                throw new NullPointerException("selectStrategy");
            }
            provider = selectorProvider;
            selector = openSelector();//new 一个selector实例, 具体的类与平台和底层有关
            selectStrategy = strategy;//Netty5 取消该处
        }
        
        ------
        构造器里面传入了 NioEventLoopGroup、Executor、SelectorProvider、SelectStrategyFactory、RejectedExecutionHandler。从这里可以看出,一个NioEventLoop属于某一个NioEventLoopGroup, 且处于同一个NioEventLoopGroup下的所有NioEventLoop 公用Executor、SelectorProvider、SelectStrategyFactory和RejectedExecutionHandler.
        
        这里的SelectorProvider构造参数传入的是通过在NioEventLoopGroup里面的构造器里面的SelectorProvider.provider()方式获取的, 而这个方法返回的是一个单例的SelectorProvider, 所以所有的NioEventLoop公用同一个单例SelectorProvider
        ------
            ↓↓↓↓
            
        SingleThreadEventLoop构造器
    

    SingleThreadEventLoop

        private final Queue<Runnable> tailTasks;// 尾部任务队列
        ------
        队列的数量maxPendingTasks参数默认是SingleThreadEventLoop.DEFAULT_MAX_PENDING_TASK,其实就是Integer.MAX_VALUE; 对于new的这个队列, 其实就是一个LinkedBlockingQueue 无界队列
        ------
        
        
        protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                            boolean addTaskWakesUp, int maxPendingTasks,
                                            RejectedExecutionHandler rejectedExecutionHandler) {
            //父类构造器
            super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
            tailTasks = newTaskQueue(maxPendingTasks);//实例化了 tailTasks 这个变量
        }
            ↓↓↓↓
            
        SingleThreadEventExecutor构造器
    

    SingleThreadEventExecutor

        protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                            boolean addTaskWakesUp, int maxPendingTasks,
                                            RejectedExecutionHandler rejectedHandler) {
            super(parent);// 设置EventLoop所属于的EventLoopGroup
            this.addTaskWakesUp = addTaskWakesUp;
            this.maxPendingTasks = Math.max(16, maxPendingTasks);//默认是Integer.MAX_VALUE
            this.executor = ObjectUtil.checkNotNull(executor, "executor");
            taskQueue = newTaskQueue(this.maxPendingTasks);//创建EventLoop的任务队列, 默认是 LinkedBlockingQueue
            rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
        }
    

    EventLoop是什么时候启动?

    NioEventLoop 本身就是一个 SingleThreadEventExecutor, 因此 NioEventLoop 的启动, 其实就是 NioEventLoop 所绑定的本地 Java 线程的启动。在NioEventLoop的构造器初始化分析过程中,直到SingleThreadEventExecutor传入了一个线程执行器 Executor,线程的启动就是通过这个线程执行器启动的.
    在SingleThreadEventExecutor类中,有一个非常重要的属性 thread,这个线程也就是与NioEventLoop 所绑定的本地 Java 线程. thread 变量,发现在 doStartThread() 函数中,thread = Thread.currentThread();而且这行代码在
    executor.execute(new Runnable() {}).在executor执行的一个线程里面, 含义也就是当executor 第一次执行提交的任务创建的线程 赋值给 thread对象。由此可见,thread的启动其实也就是在这里,分析 doStartThread() 函数 的父调用关系,最顶层的调用就是 SingleThreadEventExecutor.execute(Runnable task).也就是说哪里第一次调用了execute()函数,也就是启动了该NioEventLoop.

    private void doStartThread() {
            assert thread == null;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    thread = Thread.currentThread();
                    if (interrupted) {
                        thread.interrupt();
                    }
    
                    boolean success = false;
                    updateLastExecutionTime();
                    try {
                        SingleThreadEventExecutor.this.run();//此处开始NioEventLoop
                        success = true;
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception from an event executor: ", t);
                    } finally {
                        //略
                    }
                }
            });
        }
    
    

    当调用了 AbstractChannel#AbstractUnsafe.register 后, 就完成了 Channel 和 EventLoop 的关联. register 实现如下:
    AbstractChannel.AbstractUnsafe

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            // 删除条件检查.
            ...
            AbstractChannel.this.eventLoop = eventLoop;
        
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new OneTimeTask() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    ...
                }
            }
        }
        -----
        在 AbstractChannel#AbstractUnsafe.register 中, 会将一个 EventLoop 赋值给 AbstractChannel 内部的 eventLoop 字段, 到这里就完成了 EventLoop 与 Channel 的关联过程.
        一路从 Bootstrap.bind 方法跟踪到 AbstractChannel#AbstractUnsafe.register 方法, 整个代码都是在主线程中运行的, 因此上面的 eventLoop.inEventLoop() 就为 false, 于是进入到 else 分支, 在这个分支中调用了 eventLoop.execute. eventLoop 是一个 NioEventLoop 的实例, 而 NioEventLoop 没有实现 execute 方法, 因此调用的是 SingleThreadEventExecutor.execute()方法
        -----
    

    SingleThreadEventExecutor

        @Override
        public void execute(Runnable task) {
            if (task == null) {
                throw new NullPointerException("task");
            }
    
            boolean inEventLoop = inEventLoop();
            addTask(task);
            if (!inEventLoop) {
                startThread();
                if (isShutdown() && removeTask(task)) {
                    reject();
                }
            }
    
            if (!addTaskWakesUp && wakesUpForTask(task)) {
                wakeup(inEventLoop);
            }
        }
        
        当 EventLoop.execute 第一次被调用时, 就会触发 startThread() 的调用, 进而导致了 EventLoop 所对应的 Java 线程的启动
    

    NioEventLoop 启动之后是怎么实现事件循环的?

    SingleThreadEventExecutor

        private void startThread() {
            if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
                //设置thread 的状态 this.state是 ST_STARTED 已启动
                if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                    //真正启动线程的函数, 其作用类似于 thread.start()
                    doStartThread();
                }
            }
        }
        
            ↓↓↓↓
        
        private void doStartThread() {
            //启动线程之前,必须保证thread 是null,其实也就是thread还没有启动。
            assert thread == null;
            /** 通过executor启动一个新的task, 在task里面启动this.thread线程。*/
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    // 1. 将当前thread 赋值给 this.thread 也就是将启动的线程赋值给本地绑定线程thread
                    thread = Thread.currentThread();
                    // 2. 实际上是调用NioEventLoop.run() 方法实现 事件循环机制
                    try {
                        SingleThreadEventExecutor.this.run();//最核心所在,调用run()方法
                        success = true;
                    } catch (Throwable t) {//........
                    } finally {
                        //确保事件循环结束之后,关闭线程,清理资源。
                        //...........
                    }
                }//end run()
            });// end run() 这个task
        }// end doStartThread method
        
            ↓↓↓↓
        
        -----
        SingleThreadEventExecutor.this.run(); 这是一个抽象函数
        -----
        
        protected void run() {
            /** 死循环:NioEventLoop 事件循环的核心就是这里! */
            for (;;) {
                try {
                    // 1.通过 select/selectNow 调用查询当前是否有就绪的 IO 事件
                    // 当 selectStrategy.calculateStrategy() 返回的是 CONTINUE, 就结束此轮循环,进入下一轮循环;
                    // 当返回的是 SELECT, 就表示任务队列为空,就调用select(Boolean);
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));
                            if (wakenUp.get()) {
                                selector.wakeup();
                            }
                        default:
                            // fallthrough
                    }// end switch
        
                    //2. 当有IO事件就绪时, 就会处理这些IO事件
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
        
                    //ioRatio表示:此线程分配给IO操作所占的时间比(即运行processSelectedKeys耗时在整个循环中所占用的时间).
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
                        try {
                            //查询就绪的 IO 事件, 然后处理它;
                            processSelectedKeys();
                        } finally {
                            //运行 taskQueue 中的任务.
                            runAllTasks();
                        }
                    } else {
                        final long ioStartTime = System.nanoTime();
                        try {
                            //查询就绪的 IO 事件, 然后处理它;
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            final long ioTime = System.nanoTime() - ioStartTime;
                            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
                //
        
            }
        } //end of run()函数
        
        --------
        死循环 for(;;) 就是NioEventLoop事件循环执行机制。死循环中的业务简单点就是: 
        (1)通过调用 select/selectNow 函数,等待 IO 事件; 
        
        (2)当有IO事件就绪时, 获取事件类型,分别处理这些IO事件,处理IO事件函数调用就是 processSelectedKeys(); 
        --------
        
    

    IO事件轮询

    在 run() 方法中, 第一步是调用 hasTasks() 方法来判断当前任务队列中是否有任务:

        protected boolean hasTasks() {
            assert inEventLoop();
            return !taskQueue.isEmpty();
        }
        -----
        仅仅是检查了一下 taskQueue 是否为空. 至于 taskQueue 是什么呢, 其实它就是存放一系列的需要由此 EventLoop 所执行的任务列表.
        
        1)当 taskQueue 不为空时, hasTasks() 就会返回TRUE,那么selectStrategy.calculateStrategy() 的实现里面就会执行selectSupplier.get() 而get()方法里面会调用 selectNow(); 执行立即返回当前就绪的IO事件的个数,如果存在IO事件,那么在switch 语句中就会直接执行 default, 直接跳出switch语句,如果不存在,就是返回0, 对应于continue,忽略此次循环。
    
        2)当taskQueue为空时,就会selectStrategy.calculateStrategy() 就会返回SelectStrategy.SELECT, 对用于switch case语句就是执行select()函数,阻塞等待IO事件就绪。
        -----
    

    IO事件处理

    NioEventLoop.run() 方法中, 第一步是通过 select/selectNow 调用查询当前是否有就绪的 IO 事件. 那么当有 IO 事件就绪时, 第二步自然就是处理这些 IO 事件啦. NioEventLoop.run 中循环的剩余部分(核心部分):

        final int ioRatio = this.ioRatio;
        if (ioRatio == 100) {
            processSelectedKeys();
            runAllTasks();
        } else {
            final long ioStartTime = System.nanoTime();
        
            processSelectedKeys();
        
            final long ioTime = System.nanoTime() - ioStartTime;
            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
        }
        ---------
        有两个关键的调用, 第一个是 processSelectedKeys() 调用, 根据字面意思, 可以猜出这个方法肯定是查询就绪的 IO 事件, 然后处理它; 第二个调用是 runAllTasks(), 来它的功能就是运行 taskQueue 中的任务.
        
        什么是 ioRatio呢? 它表示的是此线程分配给 IO 操作所占的时间比(即运行 processSelectedKeys 耗时在整个循环中所占用的时间). 例如 ioRatio 默认是 50, 则表示 IO 操作和执行 task 的所占用的线程执行时间比是 1 : 1. 当知道了 IO 操作耗时和它所占用的时间比, 那么执行 task 的时间就可以很方便的计算出来了。
    
        当设置 ioRate = 70 时, 则表示 IO 运行耗时占比为70%, 即假设某次循环一共耗时为 100ms, 那么根据公式, processSelectedKeys() 方法调用所耗时大概为70ms(即 IO 耗时), 而 runAllTasks() 耗时大概为 30ms(即执行 task 耗时).
        
        当 ioRatio 为 100 时, Netty 就不考虑 IO 耗时的占比, 而是分别调用 processSelectedKeys()、runAllTasks(); 而当 ioRatio 不为 100时, 则执行到 else 分支, 在这个分支中, 首先记录下 processSelectedKeys() 所执行的时间(即 IO 操作的耗时), 然后根据公式, 计算出执行 task 所占用的时间, 然后以此为参数, 调用 runAllTasks().
        ---------
        
            ↓↓↓↓
        
        private void processSelectedKeys() {
            if (selectedKeys != null) {
                processSelectedKeysOptimized();
            } else {
                processSelectedKeysPlain(selector.selectedKeys());
            }
        }
        
        -------
        这个方法中, 会根据 selectedKeys 字段是否为空, 而分别调用 processSelectedKeysOptimized 或 processSelectedKeysPlain. selectedKeys 字段是在调用 openSelector() 方法时, 根据 JVM 平台的不同, 而有设置不同的值, 这个值是不为 null 的.
        --------
        
            ↓↓↓↓
            
        private void processSelectedKeysOptimized() {
            //1. 迭代 selectedKeys 获取就绪的 IO 事件, 然后为每个事件都调用 processSelectedKey 来处理它.
            for (int i = 0; i < selectedKeys.size; ++i) {
                final SelectionKey k = selectedKeys.keys[i];
                selectedKeys.keys[i] = null;
                final Object a = k.attachment();
                if (a instanceof AbstractNioChannel) {
                    //2. 为事件调用processSelectedKey方法来处理 对应的事件
                    processSelectedKey(k, (AbstractNioChannel) a);
                } else {
                    NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                    processSelectedKey(k, task);
                }
        
                if (needsToSelectAgain) {
                    selectedKeys.reset(i + 1);
        
                    selectAgain();
                    i = -1;
                }
            }
        }
        --------
        迭代 selectedKeys 获取就绪的 IO 事件, 然后为每个事件都调用 processSelectedKey 来处理它.可以调用 selectionKey.attach(object) 给一个 selectionKey 设置一个附加的字段, 然后可以通过 Object attachedObj = selectionKey.attachment() 获取它. 上面代代码正是通过了 k.attachment() 来获取一个附加在 selectionKey 中的对象, 那么这个对象是什么呢? 它又是在哪里设置的呢? 
        
        在客户端的 Channel 注册过程中, 会有如下调用链:
        
        Bootstrap.initAndRegister -> 
        AbstractBootstrap.initAndRegister -> 
            MultithreadEventLoopGroup.register -> 
                SingleThreadEventLoop.register -> 
                    AbstractUnsafe.register ->
                        AbstractUnsafe.register0 ->
                            AbstractNioChannel.doRegister
        
        最后的 AbstractNioChannel.doRegister 方法会调用 SocketChannel.register 方法注册一个 SocketChannel 到指定的 Selector:
        
        protected void doRegister() throws Exception {
            // 省略错误处理
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
        }
        
        register 的第三个参数, 这个参数是设置 selectionKey 的附加对象的, 和调用 selectionKey.attach(object) 的效果一样. 而调用 register 所传递的第三个参数是 this, 它其实就是一个 NioSocketChannel 的实例. 
        
        在将 SocketChannel 注册到 Selector 中时, 将 SocketChannel 所对应的 NioSocketChannel 以附加字段的方式添加到了selectionKey 中.
        
        当我们获取到附加的对象后, 我们就调用 processSelectedKey 来处理这个 IO 事件:
        
        final Object a = k.attachment();
    
        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }
        -------
        
            ↓↓↓↓
            
        private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
            //......
            //......
            try {
                int readyOps = k.readyOps();
                // 连接事件
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
        
                    unsafe.finishConnect();
                }
        
                //可写事件
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                    ch.unsafe().forceFlush();
                }
        
                //可读事件
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    // 这里就是核心中的核心了. 事件循环器读到了字节后, 就会将数据传递到pipeline
                    unsafe.read();
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }
        ------------
        processSelectedKey 中处理了三个事件, 分别是:
    
        OP_READ, 可读事件, 即 Channel 中收到了新数据可供上层读取. 
        OP_WRITE, 可写事件, 即上层可以向 Channel 写入数据. 
        OP_CONNECT, 连接建立事件, 即 TCP 连接已经建立, Channel 处于 active 状态.
        ------------
    

    OP_READ 处理

    当就绪的 IO 事件是 OP_READ, 代码会调用 unsafe.read() 方法, 即:

        // 可读事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {
                // Connection already closed - no need to handle write.
                return;
            }
        }
        ---------
        在它的父类 AbstractNioByteChannel 实现的
        ---------
            ↓↓↓↓
        
        @Override
        public final void read() {
            ...
            ByteBuf byteBuf = null;
            int messages = 0;
            boolean close = false;
            try {
                int totalReadAmount = 0;
                boolean readPendingReset = false;
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    int writable = byteBuf.writableBytes();
                    int localReadAmount = doReadBytes(byteBuf);
        
                    // 检查读取结果.
                    ...
        
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
        
                    ...
        
                    totalReadAmount += localReadAmount;
        
                    // 检查是否是配置了自动读取, 如果不是, 则立即退出循环.
                    ...
                } while (++ messages < maxMessagesPerRead);
        
                pipeline.fireChannelReadComplete();
                allocHandle.record(totalReadAmount);
        
                if (close) {
                    closeOnRead(pipeline);
                    close = false;
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close);
            } finally {
            }
        }
        ---------
        read 方法归纳起来, 可以认为做了如下工作:
        分配 ByteBuf
        从 SocketChannel 中读取数据;
        调用 pipeline.fireChannelRead 发送一个inbound 事件. pipeline.fireChannelRead 正好就是 inbound 事件起点. 当调用了 pipeline.fireIN_EVT() 后, 那么就产生了一个 inbound 事件, 此事件会以 head -> customContext -> tail 的方向依次流经 ChannelPipeline 中的各个 handler.
        ---------
    

    OP_CONNECT 处理,即 TCP 连接已建立事件.

        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);
        
            unsafe.finishConnect();
        }
        ---------
        归纳起来, 可以认为做了如下工作:
        将 OP_CONNECT 从就绪事件集中清除, 不然会一直有 OP_CONNECT 事件.
        调用 unsafe.finishConnect() 通知上层连接已建立.
        unsafe.finishConnect() 调用最后会调用到 pipeline().fireChannelActive(), 产生一个 inbound 事件, 通知 pipeline 中的各个 handler TCP 通道已建立(即 ChannelInboundHandler.channelActive 方法会被调用)
        ---------
    

    netty的任务队列机制

    任务的添加
    1. 普通 Runnable 任务

      NioEventLoop 继承于 SingleThreadEventExecutor, 而 SingleThreadEventExecutor 中有一个 Queue taskQueue 字段, 用于存放添加的 Task. 在 Netty 中, 每个 Task 都使用一个实现了 Runnable 接口的实例来表示.

      将一个 Runnable 添加到 taskQueue 中时, 可以进行如下操作:

          EventLoop eventLoop = channel.eventLoop();
          eventLoop.execute(new Runnable() {
              @Override
              public void run() {
                  System.out.println("Hello, Netty!");
              }
          });
          --------
          当调用 execute 后, 实际上是调用到了 SingleThreadEventExecutor.execute() 方法, 它的实现如下:
          --------
          
              ↓↓↓↓
          
          public void execute(Runnable task) {
              if (task == null) {
                  throw new NullPointerException("task");
              }
          
              boolean inEventLoop = inEventLoop();
              if (inEventLoop) {
                  addTask(task);
              } else {
                  startThread();
                  addTask(task);
                  if (isShutdown() && removeTask(task)) {
                      reject();
                  }
              }
          
              if (!addTaskWakesUp && wakesUpForTask(task)) {
                  wakeup(inEventLoop);
              }
          }
          
          添加任务的 addTask 方法的源码如下:
          
          protected void addTask(Runnable task) {
              if (task == null) {
                  throw new NullPointerException("task");
              }
              if (isShutdown()) {
                  reject();
              }
              taskQueue.add(task);
          }
          ---------
          taskQueue 是存放着待执行的任务的队列
          ---------
      
    2. schedule 任务

      除了通过 execute 添加普通的 Runnable 任务外, 还可以通过调用 eventLoop.scheduleXXX 之类的方法来添加一个定时任务.
      EventLoop 中实现任务队列的功能在超类 SingleThreadEventExecutor 实现的, 而 schedule 功能的实现是在 SingleThreadEventExecutor 的父类, 即 AbstractScheduledEventExecutor 中实现的.

      在 AbstractScheduledEventExecutor 中, 有以 scheduledTaskQueue 字段:

      Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
      scheduledTaskQueue 是一个队列(Queue), 其中存放的元素是 ScheduledFutureTask. 而 ScheduledFutureTask 我们很容易猜到, 它是对 Schedule 任务的一个抽象.

      AbstractScheduledEventExecutor 所实现的 schedule 方法

      public  ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
          ObjectUtil.checkNotNull(command, "command");
          ObjectUtil.checkNotNull(unit, "unit");
          if (delay < 0) {
              throw new IllegalArgumentException(
                      String.format("delay: %d (expected: >= 0)", delay));
          }
          return schedule(new ScheduledFutureTask<Void>(
                  this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
      }
      -------------
      这是其中一个重载的 schedule, 当一个 Runnable 传递进来后, 会被封装为一个 ScheduledFutureTask 对象, 这个对象会记录下这个 Runnable 在何时运行、已何种频率运行等信息. 
      当构建了 ScheduledFutureTask 后, 会继续调用 另一个重载的 schedule 方法:
      
      <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
          if (inEventLoop()) {
              scheduledTaskQueue().add(task);
          } else {
              execute(new OneTimeTask() {
                  @Override
                  public void run() {
                      scheduledTaskQueue().add(task);
                  }
              });
          }
      
          return task;
      }
      
      在这个方法中, ScheduledFutureTask 对象就会被添加到 scheduledTaskQueue 中了
      -------------
      
    3. 任务的执行
      当一个任务被添加到 taskQueue 后, 它是怎么被 EventLoop 执行的呢?

      NioEventLoop.run() 方法中, 在这个方法里, 会分别调用 processSelectedKeys() 和 runAllTasks() 方法, 来进行 IO 事件的处理和 task 的处理.

      runAllTasks 方法有两个重载的方法, 一个是无参数的, 另一个有一个参数的. 首先来看一下无参数的 runAllTasks:

      protected boolean runAllTasks() {
          fetchFromScheduledTaskQueue();
          Runnable task = pollTask();
          if (task == null) {
              return false;
          }
      
          for (;;) {
              try {
                  task.run();
              } catch (Throwable t) {
                  logger.warn("A task raised an exception.", t);
              }
      
              task = pollTask();
              if (task == null) {
                  lastExecutionTime = ScheduledFutureTask.nanoTime();
                  return true;
              }
          }
      }
      
      EventLoop 可以通过调用 EventLoop.execute 来将一个 Runnable 提交到 taskQueue 中, 也可以通过调用 EventLoop.schedule 来提交一个 schedule 任务到 scheduledTaskQueue 中. 在此方法的一开始调用的
      
      fetchFromScheduledTaskQueue() 其实就是将 scheduledTaskQueue 中已经可以执行的(即定时时间已到的 schedule 任务) 拿出来并添加到 taskQueue 中, 作为可执行的 task 等待被调度执行,源码如下:
      
      private void fetchFromScheduledTaskQueue() {
          if (hasScheduledTasks()) {
              long nanoTime = AbstractScheduledEventExecutor.nanoTime();
              for (;;) {
                  Runnable scheduledTask = pollScheduledTask(nanoTime);
                  if (scheduledTask == null) {
                      break;
                  }
                  taskQueue.add(scheduledTask);
              }
          }
      }
      接下来 runAllTasks() 方法就会不断调用 task = pollTask() 从 taskQueue 中获取一个可执行的 task, 然后调用它的 run() 方法来运行此 task.
      注:  因为 EventLoop 既需要执行 IO 操作, 又需要执行 task, 因此我们在调用 EventLoop.execute 方法提交任务时, 不要提交耗时任务, 更不能提交一些会造成阻塞的任务, 不然会导致我们的 IO 线程得不到调度, 影响整个程序的并发量。
      
      这里也是为什么用我们自己的线程池隔离一些可能阻塞的业务。
      

    Netty如何保证异步串行无锁化?

    通过串行化设计,即消息的处理尽可能在同一个线程内完成,期间不进行线程切换,这样就避免了多线程竞争和同步锁。

    Netty采用了串行无锁化设计,在IO线程内部进行串行操作,避免多线程竞争导致的性能下降。表面上看,串行化设计似乎CPU利用率不高,并发程度不够。但是,通过调整NIO线程池的线程参数,可以同时启动多个串行化的线程并行运行,这种局部无锁化的串行线程设计相比一个队列-多个工作线程模型性能更优。

    NioEventLoop读取到消息之后,直接调用ChannelPipeline的fireChannelRead(Object msg)方法,只要用户不主动切换线程,一直会由NioEventLoop调用到用户的Handler,期间不进行线程切换。

    图示:

    无锁化图解.png

    Netty如何解决JDK空轮训BUG

    Selector BUG出现的原因

    若Selector的轮询结果为空,也没有wakeup或新消息处理,则发生空轮询,CPU使用率100%

    解决办法:

    1. 对Selector的select操作周期进行统计,每完成一次空的select操作进行一次计数,
    
    2. 若在某个周期内连续发生N次空轮询,则触发了epoll死循环bug。
    
    3. 重建Selector,判断是否是其他线程发起的重建请求,若不是则将原SocketChannel从旧的Selector上去除注册,重新注册到新的Selector上,并将原来的Selector关闭。
    

    Netty的处理方式是这样的:

    记录select空转的次数,定义一个阀值,这个阀值默认是512,可以在应用层通过设置系统属性io.netty.selectorAutoRebuildThreshold传入,当空转的次数超过了这个阀值,重新构建新Selector,将老Selector上注册的Channel转移到新建的Selector上,关闭老Selector,用新的Selector代替老Selector,详细实现可以查看NioEventLoop中的selector和rebuildSelector方法:

        for (;;) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }
        
            int selectedKeys = selector.select(timeoutMillis);
            selectCnt ++;
        
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) {
                // Selected something,
                // waken up by user, or
                // the task queue has a pending task.
                break;
            }
            if (selectedKeys == 0 && Thread.interrupted()) {
                // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                // As this is most likely a bug in the handler of the user or it's client library we will
                // also log it.
                //
                // See https://github.com/netty/netty/issues/2426
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely because " +
                            "Thread.currentThread().interrupt() was called. Use " +
                            "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                }
                selectCnt = 1;
                break;
            }
            if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                    selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                // The selector returned prematurely many times in a row.
                // Rebuild the selector to work around the problem.
                logger.warn(
                        "Selector.select() returned prematurely {} times in a row; rebuilding selector.",
                        selectCnt);
        
                rebuildSelector();
                selector = this.selector;
        
                // Select again to populate selectedKeys.
                selector.selectNow();
                selectCnt = 1;
                break;
            }
        
            currentTimeNanos = System.nanoTime();
        }
        public void rebuildSelector() {
            if (!inEventLoop()) {
                execute(new Runnable() {
                    @Override
                    public void run() {
                        rebuildSelector();
                    }
                });
                return;
            }
        
            final Selector oldSelector = selector;
            final Selector newSelector;
        
            if (oldSelector == null) {
                return;
            }
        
            try {
                newSelector = openSelector();
            } catch (Exception e) {
                logger.warn("Failed to create a new Selector.", e);
                return;
            }
        
            // Register all channels to the new Selector.
            int nChannels = 0;
            for (;;) {
                try {
                    for (SelectionKey key: oldSelector.keys()) {
                        Object a = key.attachment();
                        try {
                            if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
                                continue;
                            }
        
                            int interestOps = key.interestOps();
                            key.cancel();
                            key.channel().register(newSelector, interestOps, a);
                            nChannels ++;
                        } catch (Exception e) {
                            logger.warn("Failed to re-register a Channel to the new Selector.", e);
                            if (a instanceof AbstractNioChannel) {
                                AbstractNioChannel ch = (AbstractNioChannel) a;
                                ch.unsafe().close(ch.unsafe().voidPromise());
                            } else {
                                @SuppressWarnings("unchecked")
                                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                                invokeChannelUnregistered(task, key, e);
                            }
                        }
                    }
                } catch (ConcurrentModificationException e) {
                    // Probably due to concurrent modification of the key set.
                    continue;
                }
        
                break;
            }
        
            selector = newSelector;
        
            try {
                // time to close the old selector as everything else is registered to the new one
                oldSelector.close();
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to close the old Selector.", t);
                }
            }
        
            logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
        }
    
    

    相关文章

      网友评论

        本文标题:[Netty源码分析]EventLoop

        本文链接:https://www.haomeiwen.com/subject/bwdaiftx.html