美文网首页
Netty IO事件与任务处理

Netty IO事件与任务处理

作者: 隔壁王哥 | 来源:发表于2022-05-08 21:47 被阅读0次

    概述

    Netty的IO事件分别为读事件(OP_READ)、写事件(OP_WRITE)、接收事件(OP_ACCEPT)、连接事件(OP_CONNECT)。其中读、写事件可以发生在客户端与服务端。接收事件只发生在服务端,服务端启动后会注册接收事件监听客户端连接。连接事件只发生在客户端,客户端启动时会连接服务端。Netty任务分为普通任务(通过execute(Runnable task) 执行)与定时任务(通过schedule(Runnable task,long delay,TimeUnit unit)执行)。无论是IO事件还是任务,都是通过NioEventLoop中对应的线程来进行处理。

    NioEventLoop创建过程

    在实例化NioEventLoopGroup时,默认会创建2倍CPU核心数的NioEventLoop。对于bossGroup来说,虽然会创建这么多NioEventLoop,但是如果只绑定一个端口进行事件监听,实际上只会用到一个NioEventLoop,也就是说只有一个线程在循环处理事件与任务。

    EventLoopGroup bossGroup = new NioEventLoopGroup();
    

    NioEventLoopGroup UML图:

    NioEventLoopGroup无参构造方法最终会调到下面的构造方法:

    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        /**
         * nThreads:创建线程的数量,如果不传,后续会默认为2倍CPU核心数
         * executor:默认为null,在NioEventLoopGroup父类中会进行初始化
         * selectorProvider:用于创建Java NIO的Selector对象
         * selectStrategyFactory:IO多路复用器策略工厂,值为DefaultSelectStrategyFactory
         * RejectedExecutionHandlers.reject():拒绝策略,当线程池任务队列满了后在往其中添加任务会触发该拒绝策略
         */
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }
    

    NioEventLoopGroup构造方法中会调用其父类MultithreadEventLoopGroup的构造方法,该构造方法会初始化默认的线程数量,常量DEFAULT_EVENT_LOOP_THREADS值为2倍CPU核心数:

    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    

    MultithreadEventLoopGroup构造方法中会调用其父类MultithreadEventExecutorGroup的构造方法,该构造方法会初始化线程执行选择器工厂,常量DefaultEventExecutorChooserFactory.INSTANCE值为EventExecutorChooserFactory:

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }
    

    最终会调用MultithreadEventExecutorGroup如下构造方法,在该方法中会循环创建NioEventLoop:

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        
        if (executor == null) {
            // 创建线程执行器
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
            // 创建EventExecutor数组,用来保存NioEventLoop
        children = new EventExecutor[nThreads];
    
        for (int i = 0; i < nThreads; i ++) {
            // ...
            // 创建NioEventLoop
            children[i] = newChild(executor, args);
            // ...
        }
            // 创建线程选择器
        chooser = chooserFactory.newChooser(children);
            // ...
    }
    

    创建线程执行器

    线程执行器ThreadPerTaskExecutor#execute方法内部使用ThreadFactory来创建并启动线程,其中ThreadFactory就是调用其构造方法传入的DefaultThreadFactory,DefaultThreadFactory#newThread方法会创建线程,并设置线程属性,如线程名称等:

    public final class ThreadPerTaskExecutor implements Executor {
        private final ThreadFactory threadFactory;
    
        public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
            if (threadFactory == null) {
                throw new NullPointerException("threadFactory");
            }
            this.threadFactory = threadFactory;
        }
    
        @Override
        public void execute(Runnable command) {
            // 使用线程工厂创建并启动线程
            threadFactory.newThread(command).start();
        }
    }
    

    创建NioEventLoop

    调用NioEventLoopGroup#newChild方法进行NioEventLoop的创建:

    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
    

    方法内部会调用NioEventLoop的构造方法进行创建:

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        // 调用父类构造方法,创建任务队列以及初始化父类属性
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        // 创建IO多路复用器
        selector = openSelector();
        selectStrategy = strategy;
    }
    

    该构造方法中有两个比较重要的操作:一是调用父类构造方法创建任务队列等,二是调用openSelector方法创建IO多路复用器,我们先看openSelector方法:

    private Selector openSelector() {
        final Selector selector;
        try {
            // 创建Java Nio的Selector对象
            selector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }
            // 是否禁用对Java Selector的优化,如果禁用则标识不优化直接返回Java的Selector,默认为false
        if (DISABLE_KEYSET_OPTIMIZATION) {
            return selector;
        }
            // 创建SelectedSelectionKeySet,底层是数组实现,用于替换Java Selector底层selectedKeys的数据结构
        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
            // 通过反射获取到Selector的实现类
        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (ClassNotFoundException e) {
                        // ...
                }
            }
        });
    
        // ...
    
        final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
    
        Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    // 获取selectedKeys、publicSelectedKeys属性
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    
                    selectedKeysField.setAccessible(true);
                    publicSelectedKeysField.setAccessible(true);
                                    // 通过反射将新的key结构替换原生结构
                    selectedKeysField.set(selector, selectedKeySet);
                    publicSelectedKeysField.set(selector, selectedKeySet);
                    return null;
                } catch (NoSuchFieldException e) {
                    // ...
                }
            }
        });
            // 保存selectedKeys属性,后续处理IO事件信息时可以直接通过该属性获取事件信息
        selectedKeys = selectedKeySet;
    
        return selector;
    }
    

    该方法会创建Java Selector,并通过反射替换其对应的selectedKeys、publicSelectedKeys属性。这里Netty对原生Selector数据结构进行了优化,由原本HashSet实现的数据结构替换为了Netty基于数组实现的数据结构:SelectedSelectionKeySet。

    接下来继续跟进,该构造方法中会调用父类的构造方法,对父类属性进行初始化,先看下NioEventLoop UML图:

    NioEventLoop父类为SingleThreadEventLoop,在其父类构造方法中会创建一个MpscQueue类型的队列:

    protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                    boolean addTaskWakesUp, int maxPendingTasks,
                                    RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
        // 用来存放收尾工作的任务队列
        tailTasks = newTaskQueue(maxPendingTasks);
    }
    

    SingleThreadEventLoop构造方法中会调用其父类SingleThreadEventExecutor的构造方法:

    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        // 最终会调用AbstractEventExecutor构造函数保存parent,此处parent即为NioEventLoopGroup
        super(parent);
        // 默认false,当且仅当调用addTask(Runnable)将唤醒执行线程
        this.addTaskWakesUp = addTaskWakesUp;
        // 最大等待任务数,默认值为Integer.MAX_VALUE
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        // 保存线程执行器
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
        // 创建MpscQueue类型任务队列
        taskQueue = newTaskQueue(this.maxPendingTasks);
        // 保存拒绝策略
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }
    

    NioEventLoop线程启动

    服务端NioEventLoop中的线程启动是在channel注册时触发的,我们再来回顾下:

    #AbstractChannel#AbstractUnsafe
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        // ...
            // 判断当前线程是否为IO线程,因为当前线程为主线程,且此时IO线程还未创建,所以会走到else方法
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                // 异步执行任务
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
                // ...
            }
        }
    }
    

    NioEventLoop#execute方法实现在其父类SingleThreadEventExecutor中:

    #SingleThreadEventExecutor
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
            // 当前线程为主线程,且此时IO线程还未创建,inEventLoop方法返回false
        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            // 启动线程
            startThread();
            // 添加任务
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }
    
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            // 唤醒线程
            wakeup(inEventLoop);
        }
    }
    

    execute方法会主要分为三步,第一步:调用startThread方法创建并启动线程;第二步:将任务添加到任务队列中;第三步:唤醒线程执行任务。先看下startThread方法:

    private void startThread() {
        // 线程是否尚未启动
        if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
            // 将ST_NOT_STARTED设置为ST_STARTED
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                // 启动线程
                doStartThread();
            }
        }
    }
    

    startThread方法最终会调用doStartThread方法来启动线程:

    private void doStartThread() {
        assert thread == null;
        // 使用线程执行器创建并启动线程
        executor.execute(new Runnable() {
            @Override
            public void run() {
                // 保存创建的线程
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }
    
                boolean success = false;
                updateLastExecutionTime();
                try {
                    // 处理IO事件与异步任务
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    // ...
                }
            }
        });
    }
    

    调用executor#execute方法创建并启动线程,这里的executor类型为:ThreadPerTaskExecutor,是实例化NioEventLoopGroup时,在其父类MultithreadEventExecutorGroup中创建的,ThreadPerTaskExecutor#execute方法内部通过线程工厂创建并启动线程。线程启动后主要做两件事情,一:保存创建的线程,二:处理IO事件与异步任务(后面会讲到)。

    线程启动后,会调用addTask方法,将任务放到任务队列中,等待线程处理:

    protected void addTask(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (!offerTask(task)) {
            reject(task);
        }
    }
    

    addTask方法会尝试将任务放入taskQueue中,如果放入失败则会触发拒绝策略。

    任务添加到taskQueue中后,会调用wakeup方法唤醒线程,因为此时线程可能因为没有任务而进入到阻塞状态,使用wakeup方法可以将线程从阻塞中唤醒,处理任务。

    IO事件与任务处理流程

    NioEventLoop中的线程启动后,会一直循环处理IO事件与异步任务:

    SingleThreadEventExecutor.this.run();
    

    该方法会调用其子类NioEventLoop的run方法,看下NioEventLoop的run方法:

    protected void run() {
        for (;;) {
            try {
                // 检测IO事件
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // fallthrough
                }
    
                cancelledKeys = 0;
                needsToSelectAgain = false;
                // ioRatio控制IO事件与非IO事件执行时间占比
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        // 处理IO事件
                        processSelectedKeys();
                    } finally {
                        // 处理普通任务与定时任务
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // 根据比率计算非IO事件任务处理事件
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
    

    IO事件与任务处理主要分为三步,第一:检测IO事件,第二:处理IO事件,第三:处理普通任务与定时任务。

    检测IO事件

    通过SelectStrategy#calculateStrategy方法计算走什么策略,SelectStrategy是在创建NioEventLoop时通过IO多路复用器策略工厂DefaultSelectStrategyFactory进行创建的:

    #DefaultSelectStrategy
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    }
    

    hasTasks值为true,标识taskQueue或者tailQueue中有任务,则调用IntSupplier#get方法,该方法内部会调用Selector#selectNow方法,selectNow方法是一个非阻塞方法,不管有没有IO事件都会立即返回。如果任务队列中没有任务,则直接返回SelectStrategy.SELECT。

    如果SelectStrategy#calculateStrategy方法返回SelectStrategy.SELECT,则会尝试将wakenUp属性设置为false,并调用select方法。因为Selector#wakeup方法是一个比较耗时的操作,而用户线程和IO线程都有可能操作该属性,因此使用原子操作防止多个线程重复唤醒。接着看下select方法:

    private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            // 计算最近一次要执行的定时任务的最后期限,如果定时任务队列中没有任务则返回当前事件+1s
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
            for (;;) {
                // 计算定时任务将要执行的事件与当前时间的时间差
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    // 如果时间差<0.5s,且没有调用select方法阻塞过,则调用selectNow方法,然后退出循环
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }
    
                // 如果任务队列有任务,且尝试设置wakenUp属性为true成功,则调用selectNow方法,然后退出循环
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }
                            // 阻塞等待IO事件
                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;
                            // 如果有其中一种情况则退出循环:有IO事件、外部线程唤醒、任务对立有任务、定时任务队列有任务
                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    break;
                }
                if (Thread.interrupted()) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = 1;
                    break;
                }
    
                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // The selector returned prematurely many times in a row.
                    // Rebuild the selector to work around the problem.
                    logger.warn(
                            "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                            selectCnt, selector);
                                    // 重建selector,解决JDK空轮询bug
                    rebuildSelector();
                    selector = this.selector;
    
                    // Select again to populate selectedKeys.
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }
    
                currentTimeNanos = time;
            }
        } catch (CancelledKeyException e) {
            // ...
        }
    }
    

    IO事件处理

    processSelectedKeys方法,用来处理所有IO事件:

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized(selectedKeys.flip());
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
    

    Netty默认会开启对Selector的优化,所以会进入processSelectedKeysOptimized方法处理IO事件:

    private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
        for (int i = 0;; i ++) {
            // 获取事件
            final SelectionKey k = selectedKeys[i];
            if (k == null) {
                break;
            }
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys[i] = null;
                    // 获取attachment
            final Object a = k.attachment();
    
            if (a instanceof AbstractNioChannel) {
                // 处理IO事件
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
    
            // ...
        }
    }
    

    最终会调用processSelectedKey方法进行IO事件的处理:

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
            // ...
        try {
            int readyOps = k.readyOps();
            // 处理OP_CONNECT事件
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
    
                unsafe.finishConnect();
            }
    
            // 处理OP_WRITE事件
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                ch.unsafe().forceFlush();
            }
    
            // 处理OP_READ、OP_ACCEPT事件
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    return;
                }
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
    

    方法内部会调用Unsafe类的方法进行处理,以OP_ACCEPT事件为例,我们看下其read方法,OP_ACCEPT事件的read方法是在AbstractNioMessageChannel类的内部类NioMessageUnsafe中:

    #AbstractNioMessageChannel
    private final class NioMessageUnsafe extends AbstractNioUnsafe {
    
        private final List<Object> readBuf = new ArrayList<Object>();
    
        @Override
        public void read() {
            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);
    
            boolean closed = false;
            Throwable exception = null;
            try {
                try {
                    do {
                        // 处理OP_ACCEPT事件
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }
    
                        allocHandle.incMessagesRead(localRead);
                    } while (allocHandle.continueReading());
                } catch (Throwable t) {
                    exception = t;
                }
    
                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    // 触发channelRead事件
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                allocHandle.readComplete();
                // 触发channelReadComplete事件
                pipeline.fireChannelReadComplete();
    
                if (exception != null) {
                    closed = closeOnReadError(exception);
    
                    pipeline.fireExceptionCaught(exception);
                }
    
                if (closed) {
                    inputShutdown = true;
                    if (isOpen()) {
                        close(voidPromise());
                    }
                }
            } finally {
                
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }
    

    NioMessageUnsafe#read方法主要做了两件事,一是:调用NioServerSocketChannel#doReadMessages方法处理事件,二是:调用ChannelPipeline发送channelRead、channelReadComplete事件。

    NioServerSocketChannel#doReadMessages方法中会调用Java的ServerSocketChannel方法建立连接:

    #NioServerSocketChannel
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = javaChannel().accept();
    
        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);
    
            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }
    
        return 0;
    }
    

    到此IO事件处理流程就结束了,真正的事件处理还是由不同的Unsafe类调用对应的channel中的方法来进行处理。

    任务处理

    runAllTasks方法用来处理普通任务与定时任务:

    protected boolean runAllTasks() {
        assert inEventLoop();
        boolean fetchedAll;
        boolean ranAtLeastOne = false;
    
        do {
            // 从定时任务队列中获取可调度的任务放入到taskQueue中
            fetchedAll = fetchFromScheduledTaskQueue();
            // 从taskQueue中取出任务,执行其run方法
            if (runAllTasksFrom(taskQueue)) {
                ranAtLeastOne = true;
            }
        } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
    
        if (ranAtLeastOne) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
        }
        afterRunningAllTasks();
        return ranAtLeastOne;
    }
    

    runAllTasks方法还是比较容易理解,方法主要做了两件事,一是:将可调度的定时任务从scheduledTaskQueue队列放入到taskQueue队列中,然后循环取出taskQueue中的任务,执行其run方法。

    相关文章

      网友评论

          本文标题:Netty IO事件与任务处理

          本文链接:https://www.haomeiwen.com/subject/exccurtx.html