美文网首页
(二)bootStrap是如何去注册通道的

(二)bootStrap是如何去注册通道的

作者: guessguess | 来源:发表于2021-03-17 11:04 被阅读0次

    还是看一下下面的例子,从代码里面,并没有看到,channel是如何注册的。除了一个connect方法。这里就是如何注册到对应的selector,以及如何连接到服务端的。在讲connect方法前,有必要先讲一下NioEventLoopGroup

    public class TimeClient {
        public static void main(String args[]) {
            connect();
        }
        
        private static void connect() {        
            //用于客户端处通道的读写
            EventLoopGroup work = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            b.group(work).option(ChannelOption.TCP_NODELAY, true).channel(NioSocketChannel.class)
                    .handler(new TimeClientHandler());
            ChannelFuture cf = null;
            try {
                //一直阻塞,直到连接上服务端
                cf = b.connect(ConnectConfig.getHost(), ConnectConfig.getPort()).sync();
                //一直阻塞,直到该通道关闭
                cf.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                //避免线程没有杀死
                work.shutdownGracefully();
            }
        }
    }
    

    NioEventLoopGroup的结构图

    为了说的简单一些。所以简单的画了一个图


    NioEventGroup结构图

    接下来根据层次来说说各个接口的功能。

    1.EventExecutorGroup。

    概况一下:继承了线程池的接口,说明实现类必然是具备线程池的相关功能。
    在线程池功能的基础上,新增了几个方法
    1.boolean isShuttingDown();
    2.Future<?> shutdownGracefully();
    3.Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
    4.FutFuture<?> terminationFuture();
    5.next()
    以上方法都是用于管理EventExecutor。
    比如需要关闭EventExecutorGroup中所有的EventExecutor。
    获取EventExecutorGroup的关闭状态。
    

    2.AbstractEventExecutorGroup

    概括一下: 是一个抽象类,但是覆写了线程池功能的相关方法。
    主要修改的地方:举个例子,如下
        @Override
        public Future<?> submit(Runnable task) {
            return next().submit(task);
        }
    执行一个任务,都是选择合适的EventExecutor去执行。
    

    3.MultithreadEventExecutorGroup

    概括一下: 是一个抽象类,主要的功能还挺多的。包
    括EventExecutor选择的功能的实现
    以及EventExecutor选择器的设置
    还有EventExecutor的怎么生成,都是在这个类中完成的。
    后面会重点讲一下。
    

    4.MultithreadEventLoopGroup

    其实,我也没搞懂,这个loop何为loop。
    只知道这个接口还实现了EventLoopGroup这个接口(该接口具备了注册channel的功能)
    覆写了next方法,返回为EventLoop而不是EventExecutor,EventLoop是EventExecutor的子类。
    

    5.EventExecutor

    概括一下:EventExecutor为EventExecutorGroup的子类
    新增了一些方法。
    1.parent(),返回所在的工作组
    2.inEventLoop,判断线程池中的线程,是否当前线程。若是则说明已经开启。
    

    6.EventLoopGroup

    概括一下,从接口结构来看
    它是EventExecutorGroup的子类,主要是新增了注册通道的功能。
    说白了就具备了管理EventLoop的功能。
    以及注册通道的功能,以及EventExecutor的功能,是一个执行者。
    

    7.EventLoop

    没什么特别的。不过还是得具体看实现。NioEventLoop
    

    bootStrap的注册过程

    先看看bootStrap的connect方法的走向,最后定位到BootStrap类的以下方法。从这里可以看到,最后是channel的eventLoop去执行注册。

        private static void doConnect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
    
            // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
            // the pipeline in its channelRegistered() implementation.
            final Channel channel = connectPromise.channel();
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (localAddress == null) {
                        channel.connect(remoteAddress, connectPromise);
                    } else {
                        channel.connect(remoteAddress, localAddress, connectPromise);
                    }
                    connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                }
            });
        }
    
    

    这里由于是NioSocketChannel,所以很简单的,直接看看NioSocketChannel的eventLoop方法的返回类型。最后发现,NioSocketChannel本身并没有实现这个方法。最后定位到父类。返回的类型是NioEventLoop。在讲解NioEventLoop前,要讲一下NioEventLoopGroup的初始化,这里面涉及到NioEventLoop的初始化。

    NioEventLoopGroup的初始化

    其实就是通过NioEventLoopGroup的构造方法一点一点debug进去。最后定位到MultithreadEventLoopGroup的构造方法。

    public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
    
        private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
    
        private static final int DEFAULT_EVENT_LOOP_THREADS;
        //静态代码块,用于初始化默认的EventLoop线程数,其实就是EventLoop的数量。
        static {
            DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                    "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    
            if (logger.isDebugEnabled()) {
                logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
            }
        }
    
        /**
         * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
         */
        protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
            //一开始传入是0,所以用的是默认的线程数
            super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
        }
    
    

    再接着往下走。最后来到这个方法。具体内容看注释。
    这个方法主要涉及到,事件执行选择器的初始化(EventExecutorChooser用于选择合适的EventChooser),以及事件执行器的初始化(EventExecutor)

        protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
            //线程数小于0,直接报错
            if (nThreads <= 0) {
                throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
            }
            //一开始executor 就是Null,所以必然会走到这里。
            //executor的执行方法,就是每execute的时候,新创建一个线程去执行任务。具体可以看ThreadPerTaskExecutor的代码。
            //当然netty不会那么傻,每次都new一个去执行任务。
            if (executor == null) {
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
            //成员变量,用于存储所有的EventExecutor
            children = new EventExecutor[nThreads];
    
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                    所以这里是重点,怎么去创建EventExecutor的呢?直接往下面走,看后面的代码片段
                    children[i] = newChild(executor, args);
                    success = true;
                } catch (Exception e) {
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                   //......这里就是有任意一个没有创建成功,就会把所有的EventExecutor都关闭,这里肯定是将EventExecutor对应的线程中断。
                }
            }
            //初始化EventExecutor选择器,这里面其实是根据游标来获取对应的EventExecutor,在next()方法中,其实是使用选择器去选择事件执行器。
            chooser = chooserFactory.newChooser(children);
            //。。。有一些无关代码,所以暂时先省略
        }
    

    EventExecutor是怎么生成的?

    public class NioEventLoopGroup extends MultithreadEventLoopGroup {
        @Override
        protected EventLoop newChild(Executor executor, Object... args) throws Exception {
            从入参看,需要传入对应的线程池,以及SelectProvider,选择策略与拒绝处理器。
            再接着往下走
            return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
        }
    }
    

    直接看看NioEventLoop的构造方法

    public final class NioEventLoop extends SingleThreadEventLoop {
        设置最大的任务等待数量
        protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
                SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
        NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                     SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
            super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
            ...还有一些不重要的,直接忽略了。
        }
    }
    public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
        protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedExecutionHandler) {
            super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
            tailTasks = newTaskQueue(maxPendingTasks);
        }
    }
    
    public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
        //成员变量,用于封装相关的线程池的配置。
        private final int maxPendingTasks;
        private final Executor executor;
        private final RejectedExecutionHandler rejectedExecutionHandler;
        private final Queue<Runnable> taskQueue;
        protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                            boolean addTaskWakesUp, int maxPendingTasks,
                                            RejectedExecutionHandler rejectedHandler) {
            super(parent);
            this.addTaskWakesUp = addTaskWakesUp;
            //设置最大的任务等待数量
            this.maxPendingTasks = Math.max(16, maxPendingTasks);
            //设置线程池
            this.executor = ObjectUtil.checkNotNull(executor, "executor");
            //初始化任务队列
            taskQueue = newTaskQueue(this.maxPendingTasks);
            //设置拒绝策略
            rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
        }
    
        protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
            return new LinkedBlockingQueue<Runnable>(maxPendingTasks);
        }
    }
    

    从上面的代码段可以看出。
    1.NioEventLoop的父类是一个单线程的事件执行器(SingleThreadEventExecutor )。
    2.初始化的过程中,需要设置对应的线程池,拒绝策略,以及任务队列。
    所以说到底,EventLoop其实就是EventExecutor的一个增强,应该是叫具体实现。EventLoop在具体执行任务的时候,必然是使用线程池中的线程去执行,加队列的话则使用的是自定义的任务队列。拒绝策略也是使用自定义的拒绝策略。
    说完EventLoop是怎么生成的,接下来就可以看看,EventLoop究竟是怎么工作的。

    NioEventLoop是怎么工作的?

    这里就得回到最初的那段代码,代码位于BootStrap中.

        private static void doConnect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
            final Channel channel = connectPromise.channel();
            这段代码就是执行的核心了。
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (localAddress == null) {
                        channel.connect(remoteAddress, connectPromise);
                    } else {
                        channel.connect(remoteAddress, localAddress, connectPromise);
                    }
                    connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                }
            });
        }
    
    

    所以我们需要看的就是NioEventLoop的execute方法。
    最后发现NioEventLoop并没有覆写这个方法,而是在其父类SingleThreadEventExecutor中进行了覆写。
    下面来看看代码

    public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
        用于存储当前处理器的处理线程(其实就是将线程池里面的线程保存在里面)。可以用于判断线程是否已经生成了(一开始thread这个变量必然为空,若与执行的线程不一样)。
        private volatile Thread thread;
        @Override
        public void execute(Runnable task) {
            任务为空直接抛异常
            if (task == null) {
                throw new NullPointerException("task");
            }
            判断当前线程是否成员变量一致。一开始必然不一致。
            boolean inEventLoop = inEventLoop();
            if (inEventLoop) {
                //将任务直接加到队列中
                addTask(task);
            } else {
                启动线程----下面重点就是说一下这个线程。是如何去执行任务的,因为在整个方面里面,并没有看到整个线程是如何去执行任务的。只看到将任务添加到队列。
                startThread();
                //将任务直接加入到队列中
                addTask(task);
                //如果正在暂停 或者 在移除任务则拒绝
                if (isShutdown() && removeTask(task)) {
                    reject();
                }
            }
    
            if (!addTaskWakesUp && wakesUpForTask(task)) {
                wakeup(inEventLoop);
            }
        }
    }
    

    在上面的方法中,并没有看到任务的执行,只是简单的看到任务的添加以及拒绝策略。
    所以接下来直接进入那个方法。

        private void startThread() {
            //如果没有开始
            if (state == ST_NOT_STARTED) {
                //设为开始  双重锁
                if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                    doStartThread();
                }
            }
        }
    
        //线程中断状态
        private volatile boolean interrupted;
    
        private void doStartThread() {
            assert thread == null;
            executor创建一个线程,用于执行以下方法。
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    将线程池中的任务线程保存到成员变量中,用于后续判断线程是否被用于EventLoop
                    thread = Thread.currentThread();
                    //中断状态,则线程中断
                    if (interrupted) {
                        thread.interrupt();
                    }
                    boolean success = false;
                    //更新上次的执行时间
                    updateLastExecutionTime();
                    try {
                        运行SingleThreadEventExecutor的run()方法。这里是后面需要重点看的............................................................,只有当run方法结束,success才会变成true。但是run方法其实也是一个死循环,只有关闭的时候,才会停止。这里只指定的是实现类的run方法,就是NioEventLoop中的run方法。
                        SingleThreadEventExecutor.this.run();
                        success = true;
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception from an event executor: ", t);
                    } finally {
                        //死循环,如果是关闭状态或者处于正在关闭状态,则跳出死循环
                        for (;;) {
                            int oldState = state;
                            if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                    SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                                break;
                            }
                        }
                       。。。。。。下面都是一些无关的代码,关于如何停止该执行器的
                }
            });
        }
    
    

    从doStartThread方法中可以看到,线程如果是中断状态,则直接中断。否则会运行run方法,只有run方法,结束了,才会去判断执行器的状态是否关闭或者关闭中,再跳出这个循环。那么这个run方法指的就是NioEventLoop的run方法。下面来看看是如何处理的。

    NioEventLoop是如何处理任务的。

    @Override
        protected void run() {
            单线程循环处理任务
            for (;;) {
                try {
                    //这里其实对selector的处理,通过准备好的key的数量来决定要不要唤醒selector
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));
                            if (wakenUp.get()) {
                                selector.wakeup();
                            }
                        default:
                    }
    
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
                        try {
                            //selector处理准备好的key
                            processSelectedKeys();
                        } finally {
                            处理所有任务
                            runAllTasks();
                        }
                    } else {
                        final long ioStartTime = System.nanoTime();
                        try {
                            //selector处理准备好的key
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            final long ioTime = System.nanoTime() - ioStartTime;
                            处理所有任务
                            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
                // Always handle shutdown even if the loop processing threw an exception.
                try {
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
        }
    

    从上面代码也可以很清晰的看得到,其实可以理解成一个死循环,处理所有准备好的Key以及任务。
    我们接下来看看runAllTask的方法,由于实现都差不多,所以选一个就好了。

        protected boolean runAllTasks(long timeoutNanos) {
            fetchFromScheduledTaskQueue();
            Runnable task = pollTask();
            if (task == null) {
                afterRunningAllTasks();
                return false;
            }
    
            final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
            long runTasks = 0;
            long lastExecutionTime;
            for (;;) {
                执行所有的任务
                safeExecute(task);
    
                runTasks ++;
    
                // Check timeout every 64 tasks because nanoTime() is relatively expensive.
                // XXX: Hard-coded value - will make it configurable if it is really a problem.
                if ((runTasks & 0x3F) == 0) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    if (lastExecutionTime >= deadline) {
                        break;
                    }
                }
    
                task = pollTask();
                没有任务跳出循环
                if (task == null) {
                    lastExecutionTime = ScheduledFutureTask.nanoTime();
                    break;
                }
            }
    
            afterRunningAllTasks();
            this.lastExecutionTime = lastExecutionTime;
            return true;
        }
    
        到这里,任务的执行就完成了。
        protected static void safeExecute(Runnable task) {
            try {
                task.run();
            } catch (Throwable t) {
                logger.warn("A task raised an exception. Task: {}", task, t);
            }
        }
    

    从上面可以简单的看出,NioEventLoop其实本质上上一个具备了执行注册功能的单线程线程池。注册本质也是一个任务,交由给EventLoop去操作。至于为什么是loop, 因为每个eventEventLoop只会有一个线程,通过线程不断的loop去处理任务队列,netty本身不会创建很多个线程。
    举个例子,每个channel的注册,都是通过eventExecutorChooser实现的next方法,去找到合适的EventLoop去执行register方法,随后执行connect。每个eventLoop可以重复利用,做许多事情,通过不断loop去处理任务队列。

    下面来看看怎么注册到Selector吧。
    入口还是老地方BootStrap的connect方法io.netty.bootstrap.Bootstrap.connect(java.lang.String, int)

    public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
        private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
            初始化,以及注册---------------这里是需要重点去看的。直接看内部实现。下一个代码片段。
            final ChannelFuture regFuture = initAndRegister();
            final Channel channel = regFuture.channel();
            //注册回调完成
            if (regFuture.isDone()) {
                if (!regFuture.isSuccess()) {
                    return regFuture;
                }
                //注册完必然就是连接了
                return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
            } else {
                // 监听器,暂时没搞懂这里做什么的,直接跳过。这里看上去是,没有完成注册,后续需要完成的操作,不是需要关注的重点。
                final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        Throwable cause = future.cause();
                        if (cause != null) {
                            promise.setFailure(cause);
                        } else {
                            promise.registered();
                            //注册完必然就是连接了
                            doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                        }
                    }
                });
                return promise;
            }
        }
    }
    

    通道是如何初始化以及注册的

    public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
    
        final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                //通道的生成。这里逻辑比较简单,其实就是通过工厂去生成channel
                channel = channelFactory.newChannel();
                //初始化,通道的处理器的初始化。。。这里比较深,暂时没看,跳过
                init(channel);
            } catch (Throwable t) {
                if (channel != null) {
                    // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                    channel.unsafe().closeForcibly();
                }
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            这里就是重点了。。。。。。。。。。。。。。。。很明显,通过EventLoopGroup去注册通道。看接下来的代码片段。register方法是在MultithreadEventLoopGroup中进行了实现。何为Loop,next方法就体现了。
            ChannelFuture regFuture = config().group().register(channel);
            if (regFuture.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close();
                } else {
                    channel.unsafe().closeForcibly();
                }
            }
            return regFuture;
        }
    }
    

    NioEventLoopGroup是如何去注册通道的

    public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
        @Override
        public ChannelFuture register(Channel channel) {
            直接往里面走,感兴趣的可以看看next的实现,先看看register
            return next().register(channel);
        }
    }
    
    这个类实现了next方法,通过EventExecutorChooser去选择合适的EventExecutor
    public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
        @Override
        public EventExecutor next() {
            return chooser.next();
        }
    }
    

    注册方法的实现

    因为NioEventLoop是SingleThreadEventLoop的子类。register跑到这个类里面去了


    NioEventLoop的结构图

    下面看看代码片段

    public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
        @Override
        public ChannelFuture register(final ChannelPromise promise) {
            ObjectUtil.checkNotNull(promise, "promise");
            ...一层一层debug,最后定位到的是netty自己包装的channel的一个内部类的register方法
            promise.channel().unsafe().register(this, promise);
            return promise;
        }
    }
    
    
    public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
        protected abstract class AbstractUnsafe implements Unsafe {
            @Override
            public final void register(EventLoop eventLoop, final ChannelPromise promise) {
                if (eventLoop == null) {
                    throw new NullPointerException("eventLoop");
                }
                if (isRegistered()) {
                    promise.setFailure(new IllegalStateException("registered to an event loop already"));
                    return;
                }
                if (!isCompatible(eventLoop)) {
                    promise.setFailure(
                            new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                    return;
                }
                给通道设置对应的eventLoop
                AbstractChannel.this.eventLoop = eventLoop;
                当前线程与eventLoop中的线程不是同一个线程,一开始eventLoop并没有分配线程。所以走得是下面的else分支
                if (eventLoop.inEventLoop()) {
                    register0(promise);
                } else {
                    try {
                        通过eventLoop去执行。execute的方法在SingleThreadEventExecutor中,所以直接往下走。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
                        eventLoop.execute(new Runnable() {
                            @Override
                            public void run() {
                                register0(promise);
                            }
                        });
                    } catch (Throwable t) {
                        logger.warn(
                                "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                                AbstractChannel.this, t);
                        closeForcibly();
                        closeFuture.setClosed();
                        safeSetFailure(promise, t);
                    }
                }
            }
        }
    }
    

    SingleThreadEventExecutor如何去注册

    这里就涉及到我们一开始说的,EventLoop的工作流程。

    public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
        @Override
        public void execute(Runnable task) {
            if (task == null) {
                throw new NullPointerException("task");
            }
    
            boolean inEventLoop = inEventLoop();
            //一开始,当然是没有在eventLoop中
            if (inEventLoop) {
                addTask(task);
            } else {
                所以这里会开启一个死循环的线程,处理selectorkey以及任务队列中的任务
                startThread();
                将注册任务添加到队列中
                addTask(task);
                if (isShutdown() && removeTask(task)) {
                    reject();
                }
            }
    
            if (!addTaskWakesUp && wakesUpForTask(task)) {
                wakeup(inEventLoop);
            }
        }
    }
    

    最后说一下注册的执行

    注册方法的执行

    public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    
        protected abstract class AbstractUnsafe implements Unsafe {
               private void register0(ChannelPromise promise) {
                try {
                    // check if the channel is still open as it could be closed in the mean time when the register
                    // call was outside of the eventLoop
                    if (!promise.setUncancellable() || !ensureOpen(promise)) {
                        return;
                    }
                    boolean firstRegistration = neverRegistered;
                    注册的核心逻辑。点进去一看就是java提供的nio通道本身的注册实现。。。。。。。。。。。。
                    doRegister();
                    neverRegistered = false;
                    registered = true;
    
                    // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                    // user may already fire events through the pipeline in the ChannelFutureListener.
                    pipeline.invokeHandlerAddedIfNeeded();
                    
                    safeSetSuccess(promise);
                    pipeline.fireChannelRegistered();
                    // Only fire a channelActive if the channel has never been registered. This prevents firing
                    // multiple channel actives if the channel is deregistered and re-registered.
                    if (isActive()) {
                        if (firstRegistration) {
                            pipeline.fireChannelActive();
                        } else if (config().isAutoRead()) {
                            // This channel was registered before and autoRead() is set. This means we need to begin read
                            // again so that we process inbound data.
                            //
                            // See https://github.com/netty/netty/issues/4805
                            beginRead();
                        }
                    }
                } catch (Throwable t) {
                    // Close the channel directly to avoid FD leak.
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            } 
       }
    
        注册实现
        @Override
        protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                    注册到selector,同时将selectionKey设置到成员变量中。而selector则是eventLoop自带的selector
                    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                    return;
                } catch (CancelledKeyException e) {
                    if (!selected) {
                        // Force the Selector to select now as the "canceled" SelectionKey may still be
                        // cached and not removed because no Select.select(..) operation was called yet.
                        eventLoop().selectNow();
                        selected = true;
                    } else {
                        // We forced a select operation on the selector before but the SelectionKey is still cached
                        // for whatever reason. JDK bug ?
                        throw e;
                    }
                }
            }
        }
    }
    

    至此,通道的注册就完成了。

    相关文章

      网友评论

          本文标题:(二)bootStrap是如何去注册通道的

          本文链接:https://www.haomeiwen.com/subject/qyvafltx.html