美文网首页
(三)NioEventLoopGroup的工作原理,以及如何完成

(三)NioEventLoopGroup的工作原理,以及如何完成

作者: guessguess | 来源:发表于2021-03-19 21:14 被阅读0次

之前在bootStrap去注册通道的时候,顺便粗略的过了一下NioEventLoop的工作过程。只能大概看清楚工作流程,但是其原理,还是没有很细致的说到,所以这次还是详细看看,NioEventLoop是如何工作的,其实本质上,只是看Netty如何去维护这些channel的读写以及注册操作。

入口还是之前的demo,代码如下

public class TimeClient {
    public static void main(String args[]) {
        connect();
    }
    private static void connect() {        
        //用于客户端处通道的读写
        EventLoopGroup work = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(work).option(ChannelOption.TCP_NODELAY, true).channel(NioSocketChannel.class)
                .handler(new TimeClientHandler());
        ChannelFuture cf = null;
        try {
            //一直阻塞,直到连接上服务端
            cf = b.connect(ConnectConfig.getHost(), ConnectConfig.getPort()).sync();
            //一直阻塞,直到该通道关闭
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //避免线程没有杀死
            work.shutdownGracefully();
        }
    }
}

NioEventLoopGroup

为什么说这个类呢?可以先解释一下,为什么说这个类,因为在实际的代码运行中,netty是通过这个类,去维护channel的读写以及注册功能的。然后在此之前,要先讲一下NioEventLoopGroup这个类,NioEventLoopGroup.这个链接中有基本的结构,就不过多说了。

基本的接口

1.EventExecutorGroup

其实从接口结构上看是比较简单的
具备的功能(管理EventExecutor)
1.说明具备了线程池的基本功能,继承了ScheduledExecutorService接口
2.新加了几个方法,主要是用于管理EventExecutor的。
    boolean isShuttingDown();
    Future<?> shutdownGracefully();
    Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
    Future<?> terminationFuture();
    EventExecutor next();

2.EventExecutor

其实从接口结构上看是比较简单的, 为EventExecutorGroup的子类
具备的功能,因为有一些地方还木有理解,就说比较关键的点。
1.获取工作组,具体体现在以下方法中
    EventExecutorGroup parent();
2.判断某个线程是否与EventExecutor处于同一个线程。
       boolean inEventLoop();

3.EventLoopGroup

为EventExecutor的子类,另外新增了几个方法。
新增的功能:
1.将获取的类型改为EventLoop(为EventLoopGroup的子类)
    @Override
    EventLoop next();
2.注册通道
    ChannelFuture register(Channel channel);

4.EventLoop

为EventLoopGroup的子类,没有太特殊的地方
有以下的方法:
    @Override
    EventLoopGroup parent();

结构上来看:EventExecutorGroup -》EventExecutor -》EventLoopGroup -》EventLoop
最后可以看出EventLoop大概是这么一个东西,具备判断线程池的功能,注册通道,还有获取自身对应的parent(Group),以及当前线程与EventLoop所绑定的线程是否一致(这里其实是为了减少资源竞争,一个channel的相关维护,只会被一个线程所处理)。

EventLoop是如何维护channel的读写以及注册的?

NioEventLoop的结构,以及生成

首先要从NioEventLoopGroup的构造方法入手

public class NioEventLoopGroup extends MultithreadEventLoopGroup {
    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        传入线程数,SelectorProvider,还有一个选择策略工厂,以及拒绝策略(当任务队列过长的时候)
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }
}

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
    private static final int DEFAULT_EVENT_LOOP_THREADS;

    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
    }
    默认的eventLoop线程数,为核心线程数*2
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
}


//到最后定位到此处。。。将无关代码省略了。为了方便阅读
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
    //该group管理的EventExecutor
    private final EventExecutor[] children;
    //EventExecutor的选择器
    private final EventExecutorChooserFactory.EventExecutorChooser chooser;
    //暴露给子类,用于实例化EventExecutor
    protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        //executor的类型为ThreadPerTaskExecutor,其实就是该executor每执行一个任务,都会创建一个线程去执行
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }
        //初始化数组
        children = new EventExecutor[nThreads];
        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                关键点在这里,这里涉及到EventExecutor的实例化。
                newChild是一个抽象方法,在子类中被实现。
                那么看看是如何实例化的,具体实现位于子类NioEventLoop中..........................................................................................................................................
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                throw new IllegalStateException("failed to create a child event loop", e);
            } 
        设置EventLoop的选择器,用于获取EventLoop。
        chooser = chooserFactory.newChooser(children);
    }
}

EventExecutor的实例化,newChild方法的实现, 在EventLoopGroup中被覆写

public class NioEventLoopGroup extends MultithreadEventLoopGroup {
    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
}

public final class NioEventLoop extends SingleThreadEventLoop {
    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        设置拒绝策略,以及任务队列的长度,以及线程池的类型,还是直接往里面看..............
        父类SingleThreadEventExecutor的构造方法
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        //这个provider是单例,所以可以确保这个工作组的selector是公用同一个
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }
}


public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
                                        boolean addTaskWakesUp, int maxPendingTasks,
                                        RejectedExecutionHandler rejectedHandler) {
        super(parent);
        this.addTaskWakesUp = addTaskWakesUp;
        this.maxPendingTasks = Math.max(16, maxPendingTasks);
        //执行器,其实说白就是通过executor来创建线程。netty没那么傻,肯定是有变量去判断这个eventLoop是否已经持有线程了。
        //因为executor的类型是ThreadPerTaskExecutor。感兴趣看看内部实现
        this.executor = ObjectUtil.checkNotNull(executor, "executor");
        //设置任务队列
        taskQueue = newTaskQueue(this.maxPendingTasks);
        //拒绝策略
        rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    }
}

ThreadPerTaskExecutor的内部实现,每执行一个任务创建一个线程。
public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

从上面的代码来看。可以得知,NioEventLoop具体有的功能如下:
1.创建线程(说白了就是每个NioEventLoop初始化的时候都是没对应的线程的,所以需要创建)
2.有任务队列(NioEventLoop去执行任务的时候,可以用于临时存储任务)
3.有拒绝策略(任务队列满了就拒绝)

NioEventLoop是如何去执行任务的

接下来,举个例子,看看是如何执行任务的。刚刚我们说到NioEventLoop的基本功能。
那么就从channel的注册,到连接,看看如何保证一个channel,只会被一个NioEventLoop处理。
下面还是先从注册开始吧

注册

注册通道的入口

public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        注册是在这里执行的======================注册是使用NioEventLoop去注册的,所以重点看此方法。
        最后实现其实是在SingleThreadEventLoop,单线程的eventLoop,看看下面的代码片段。
        拿到注册的回调,因为注册是异步运行的。
        所以注册的回调有几种情况,注册完成了,注册没完成。
        针对注册没完成,还有一些操作,所以加了监听器,在注册完成的时候会去调用监听器的方法。
        而注册成功的,直接去连接就好了。
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        若注册成功
        if (regFuture.isDone()) {
            if (!regFuture.isSuccess()) {
                return regFuture;
            }
            连接服务端
            return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
        } else {
            若注册没成功,先添加监听器,等到注册完成的时候,再去进行连接服务端
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        promise.setFailure(cause);
                    } else {
                        promise.registered();
                        doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
}

如何注册通道

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        通过unsafe来注册,Unsafe是一个内部类,这里不关心它是啥,只看是怎么注册的。
        通过debug,看到是在内部类中做的,直接看下面的代码片段。
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
}

注册的实现。

public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            //eventLoop为空,则抛异常
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            //通道注册了没?不得二次注册,这里netty的channel有一个标记用于判断channel是否已经注册
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            //这里没细看 估计是eventLoop的类型与通道是不是匹配,不是那么重要,跳过。。。。
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }
            //给这个channel设置对应的eventLoop,说明这个channel被这个eventLoop
            AbstractChannel.this.eventLoop = eventLoop;
            重点,判断当前线程是否与eventLoop中的线程是一个线程,是的话就同步执行好了。一开始当然不是
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    通过eventLoop的执行方法去执行注册。
                    同步的就没必要看了,因为我们不关心它怎么注册,而是线程如何去执行这个注册的task
                    ======================================直接debug,定位到位置
                   SingleThreadEventExecutor的execute方法。直接看下一个代码片段。
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    closeForcibly();
                    closeFuture.setClosed();
                    //给回调设置异常信息
                    safeSetFailure(promise, t);
                }
            }
        }
}

SingleThreadEventExecutor的execute方法

其实下面这个方法,看上去已经简单明了了。
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements 
OrderedEventExecutor {
    默认eventLoop没分配线程,所以是not_started状态,避免分配多个线程。
    private volatile int state = ST_NOT_STARTED;
    eventLoop对应的所分配的线程变量
    private volatile Thread thread;
    12345 分别对应几个状态
    private static final int ST_NOT_STARTED = 1;
    private static final int ST_STARTED = 2;
    private static final int ST_SHUTTING_DOWN = 3;
    private static final int ST_SHUTDOWN = 4;
    private static final int ST_TERMINATED = 5;
    任务队列
    private final Queue<Runnable> taskQueue;

    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        判断当前线程,与eventLoop分配的线程是不是同一个。
        boolean inEventLoop = inEventLoop();
        如果是同一个线程,则直接加任务即可。一开始eventLoop没分配线程,所以必然需要开启线程。
        if (inEventLoop) {
            addTask(task);
        } else {
            开启线程=============================这是很重要的方法
            下面的方法会提到
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

    
    private void startThread() {
        //比较简单的写法,双重锁
        if (state == ST_NOT_STARTED) {
            //cas 无锁
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                这是重点了===============================================
                doStartThread();
            }
        }
    }


    private void doStartThread() {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                boolean success = false;
                updateLastExecutionTime();
                try {
                    这里我们可以猜一下,由于线程起来了,要一直运行,所以里面必然是一个死循环。
                    只有EventLoop状态为关闭的时候,才会跳出这个循环。
                    所以接下来直接看看这个run方法。
                    ============================================================
                    这个run方法子类已经实现了,在NioEventLoop中,所以直接定位到这个位置。看下面的代码片段。
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                       下面省略了一部分,其实是EventLoop关闭的时候,怎么去修改状态的内容。相对来说比较简单,所以直接忽略了。
                }
            }
        });
    }
}

线程是如何去维护channel的读写以及各种任务的

NioEventLoop中的run方法就实现了。
在注册的时候,channel会注册到eventLoop中的selector中。所以eventloop只需要通过分配到的线程,去做一个死循环,轮询selector中就绪的事件即可。这样子就可以维护多个channel了。一个channel只能被一个eventLoop维护,而一个eventLoop可以维护多个channel。

public final class NioEventLoop extends SingleThreadEventLoop {
    @Override
    protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        处理就绪的事件,被该eventLoop所维护的channel。因为注册的时候会将相关的selectkey都存到对应的selectkey集合中,位于父类的成员变量中。
                        processSelectedKeys();
                    } finally {
                        执行队列里面的任务
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        处理就绪的事件
                        processSelectedKeys();
                    } finally {
                        final long ioTime = System.nanoTime() - ioStartTime;
                        执行队列里面的任务,其实就是成员变量里面的taskQueue,在父类中SingleThreadEventExecutor
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            对EventLoop关闭的处理,如果正则关闭,则该死循环结束。
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
}

注册完成后,如何连接呢?

先回到注册通道的入口

public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {
    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        注册是在这里执行的======================注册是使用NioEventLoop去注册的,所以重点看此方法。
        最后实现其实是在SingleThreadEventLoop,单线程的eventLoop,看看下面的代码片段。
        拿到注册的回调,因为注册是异步运行的。
        所以注册的回调有几种情况,注册完成了,注册没完成。
        针对注册没完成,还有一些操作,所以加了监听器,在注册完成的时候会去调用监听器的方法。
        而注册成功的,直接去连接就好了。
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        若注册成功
        if (regFuture.isDone()) {
            if (!regFuture.isSuccess()) {
                return regFuture;
            }
            连接服务端,看看服务端的代码
            return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
        } else {
            若注册没成功,先添加监听器,等到注册完成的时候,再去进行连接服务端
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        promise.setFailure(cause);
                    } else {
                        promise.registered();
                        doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }


    如何连接?最后注册完,一路debug,到这里。
    从这里可以看到,最后还是通过通道对应的eventLoop来。所以从通道的注册,到连接,以及处理通道中就绪的事件,一切都是在某个eventLoop中来维护的。
    这种做法就避免了一个channel的事件被多个线程执行,不存在资源竞争的关系。
    private static void doConnect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
        final Channel channel = connectPromise.channel();
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (localAddress == null) {
                    channel.connect(remoteAddress, connectPromise);
                } else {
                    channel.connect(remoteAddress, localAddress, connectPromise);
                }
                connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
        });
    }
}

概括一下:
注册操作:
channel的注册,会被某一个eventLoop所执行,同时注册到该eventLoop的selector中。同时channel会设置自己的eventLoop,即该channel只会被该eventLoop所维护。
连接操作:
channel获取自己的eventLoop,然后将连接任务丢到任务队列中,由eventLoop分配的线程去处理。
读写操作:
eventLoop分配的线程,轮询selector中的key,进行处理。

相关文章

网友评论

      本文标题:(三)NioEventLoopGroup的工作原理,以及如何完成

      本文链接:https://www.haomeiwen.com/subject/rkxecltx.html