美文网首页
netty 线程模型解析

netty 线程模型解析

作者: whateverblake | 来源:发表于2020-07-08 14:03 被阅读0次

背景

netty是一个使用java实现的高性能的网络通信框架,对于使用netty实现网络通信的Server端netty遵从了reactor模式,一个acceptor线程负责接收用户连接请求,为每个连接请求创建SocketChannel,然后被创建的SocketChannel会绑定一个线程,之后SocketChannel上发生的所有事件都是由其绑定的线程处理


reactor.png

说明

下面都是基于NIO类型去分析

NioEventLoopGroup

从线程的角度来看,我们可以把NioEventLoopGroup类别成线程池,NioEventLoopGroup中线程的数量是用户可以设定的(在NioEventLoop中没有线程的说法,叫做EventExecutor,但是为了好理解我们就把它当做线程吧目前),如果不设定则默认为机器可用cpu cores的两倍,netty内部使用到的所有线程都是从这个类中分配,创建NioEventLoopGroup的时候会创建一个线程分配器,netty默认有两种线程分配器

  • PowerOfTwoEventExecutorChooser
    NioEventLoopGroup中包含的线程数为2^n时则使用这个线程分配器
 private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

  • GenericEventExecutorChooser
    NioEventLoopGroup中包含的线程数不是2^n时则使用这个线程分配器
 private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }

我们看到这两种选择器都能实现线程池中的线程被平均的分配,其实第二种更为通用,那么为什么还要实现PowerOfTwoEventExecutorChooser,这就体现了netty对细节的在乎了,因为&运算会比%运算快,所以对于这种也许微乎其微的性能提升netty也是不放过的

NioEventLoop

在netty中使用NioEventLoop表示一个线程,对于每一个NioSocketChannel都会绑定一个NioEventLoop,将来这个NioSocketChannel上面的所有事件都是由这个绑定的NioEveLoop来处理。NioEventLoop是对线程的包装,当NioEventLoop对应的线程启动后,它主要处理的任务类型如下图


NioEventLoop_jobs.png
  • selector
    从前面我们知道每个新建的NioSocketChannel都会绑定一个NioEventLoop,同时NioEventLoopGroup中管理的NioEventLoop数量是固定的,所以当创建的NioSocketChannel数量多于NioEventLoop时,一个NioEventLoop就会被多个NioSocketChanel绑定,之后这些NioSocketChannel都会在同一个NioEventLoop注册监听事件,当有监听事件发生的时候NioEventLoop需要处理这些事件
  • tasks
    这类事件是别的线程分配给自己的或者自己分配给自己的,比如主线程在把一个NioSocketChannel绑定到一个NioEventLoop的时,会向这个被绑定的NioEventLoop提交一个初始化NioSocketChannel的任务
  • scheduled tasks
    这是一类定时任务,比如当客户端的NioSocketChannel去连接远程服务,如果连接没有立马成功,那么NioEventLoop就会给自己注册一个延时执行的任务,这个任务的作用是当指定的时间到了连接还没有成功,那么抛出connect time out 异常

下面是NioEventLoop的核心方法run的源码

 @Override
    protected void run() {
        int selectCnt = 0;
        for (;;) {
            try {
                int strategy;
                try {
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO

                    case SelectStrategy.SELECT:
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        //设置nextWakeupNanos为下一个最近需要执行的scheduled task的延时时间
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                           //如果没有tasks类型的任务,执行selector.select()
                          //如果没有scheduled task,这个方法会阻塞,如果有会等待curDeadlineNanos,当然在阻塞的期间可能会被别的线程wake up
                            if (!hasTasks()) {
                               
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                    // the selector and retry. https://github.com/netty/netty/issues/8566
                    rebuildSelector0();
                    selectCnt = 0;
                    handleLoopException(e);
                    continue;
                }

                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                boolean ranTasks;
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) {
                           //strategy>0表示有IO事件发生,processSelectedKeys处理IO事件
                            processSelectedKeys();
                        }
                    } finally {
                        // Ensure we always run tasks.
                      //执行tasks
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else {
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }

                if (ranTasks || strategy > 0) {
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
                    selectCnt = 0;
                }
            } catch (CancelledKeyException e) {
                // Harmless exception - log anyway
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                            selector, e);
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
NioEventLoop绑定线程过程

NioEventLoop是如何绑定到一个线程的呢?
我们在初始化NioEventLoopGroup的时候会有一个Executor被设置,如果用户不指定的话会被默认赋值为ThreadPerTaskExecutor

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

我们在NioEventLoopGroup中初始化每个NioEventLoop的时候ThreadPerTaskExecutor会被传入NioEventLoop的构造方法,ThreadPerTaskExecutor通过ThreadExecutorMap.apply包装后赋值给NioEventLoop父类中名称为executor的属性


executor_init.png

我们看下ThreadExecutorMap.apply是如何实现的

 public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
        ObjectUtil.checkNotNull(executor, "executor");
        ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
        return new Executor() {
        @Override
        public void execute(final Runnable command) {
            executor.execute(apply(command, eventExecutor));
        }
    };

可以看到apply方法返回的是一个包装了ThreadPerTaskExecutor匿名Executor对象,将来如果调用这个匿名的Executor,那么就会执行ThreadPerTaskExecutor.execute,通过上面ThreadPerTaskExecutor的源码我可以看到ThreadPerTaskExecutor.execute会创建一个线程去执行apply方法返回的runnable

我们看下apply是如何实现的

 public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
        ObjectUtil.checkNotNull(command, "command");
        ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
        return new Runnable() {
            @Override
            public void run() {
                setCurrentEventExecutor(eventExecutor);
                try {
                    command.run();
                } finally {
                    setCurrentEventExecutor(null);
                }
            }
        };
    }

可以看到这个方法返回的一个匿名的Runnable,这个匿名Runnable对最终需要执行的Runnabe进行了封装,这个封装的主要目的是把当前NioEventLoop和本线程进行关联,setCurrentEventExecutor就是用一个FastThreadLocal去存储本NioEventLoop

线程是如何被启动

主线程把一个NioSocketChannel绑定到一个NioEveLoop后,回通过调用NioEventLoop.execute方法向这个NioEventLoop提交一个register的任务,NioEventLoop的execute源码如下

 private void execute(Runnable task, boolean immediate) {
        //判断当前执行线程和NioEventLoop绑定的线程是不是相同
        boolean inEventLoop = inEventLoop();
       //把提交的任务添加到NioEventLoop的taskQueue中
        addTask(task);
        if (!inEventLoop) {
           //如果当前执行的线程和NioEventLoop绑定的线程不同,那么启动NioEventLoop去绑定线程
           //当然在startThread方法中会判断NioEventLoop绑定的线程是不是已经启动了,如果启动了就什么都不做,如果没启动那么就会去执行doStartThread
            startThread();
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                    // The task queue does not support removal so the best thing we can do is to just move on and
                    // hope we will be able to pick-up the task before its completely terminated.
                    // In worst case we will log on termination.
                }
                if (reject) {
                    reject();
                }
            }
        }

        if (!addTaskWakesUp && immediate) {
            wakeup(inEventLoop);
        }
    }

我们看下startThread代码

 private void startThread() {
        //判断NioEventLoop绑定的线程是不是启动了
        if (state == ST_NOT_STARTED) {
            //设置线程为启动状态
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {
                     //启动线程
                    doStartThread();
                    success = true;
                } finally {
                    if (!success) {
                        //如果doStartThread失败,那么设置线程启动状态为未启动
                        STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }

doStartThread源码比较长:

private void doStartThread() {
        assert thread == null;
        //这个executor就是我们上面提到的包装了ThreadPerTaskExecutor的匿名Executor
        //这个executor的execute方法会首先执行ThreadPerTaskExecutor的execute方法
        //这个方法会使用threadFactory去创建一个线程然后执行Runnable任务
       //就像上面提到的,下面的这个Runnable匿名类也会被ThreadExecutorMap.apply()方法进行包装,这么做的原因上面已经介绍了
        executor.execute(new Runnable() {
            @Override
            public void run() {
              //thread 是NioEventLoop的属性,到这个地方NioEventLoop绑定的线程已经启动了,
             //现在把thread属性赋值为当前线程,这样NioEventLoop就真正的和一个执行线程完成了绑定
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    //这个地方会调用NioEventLoop中重写的run方法,run方法的在上面已经解析了,这里就不多说了
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                   //下面是处理线程关闭之后,NioEventLoop自身一些状态的处理
                    for (;;) {
                        //设置NioEventLoop的运行状态为关闭进行中
                        int oldState = state;
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }

                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        if (logger.isErrorEnabled()) {
                            logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                    SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                    "be called before run() implementation terminates.");
                        }
                    }

                    try {
                        // Run all remaining tasks and shutdown hooks. At this point the event loop
                        // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
                        // graceful shutdown with quietPeriod.
                        //检查是不是完全关闭了
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }

                        // Now we want to make sure no more tasks can be added from this point. This is
                        // achieved by switching the state. Any new tasks beyond this point will be rejected.
                        for (;;) {
                             //设置NioEvenLoop的运行状态为完全关闭
                            int oldState = state;
                            if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
                                    SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
                                break;
                            }
                        }

                        // We have the final set of tasks in the queue now, no more can be added, run all remaining.
                        // No need to loop here, this is the final pass.
                        confirmShutdown();
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
                            // the future. The user may block on the future and once it unblocks the JVM may terminate
                            // and start unloading classes.
                            // See https://github.com/netty/netty/issues/6596.
                            FastThreadLocal.removeAll();
                            //设置NioEventLoop运行状态为terminate
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.countDown();
                            int numUserTasks = drainTasks();
                            if (numUserTasks > 0 && logger.isWarnEnabled()) {
                                logger.warn("An event executor terminated with " +
                                        "non-empty task queue (" + numUserTasks + ')');
                            }
                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }




相关文章

网友评论

      本文标题:netty 线程模型解析

      本文链接:https://www.haomeiwen.com/subject/zppaqktx.html