美文网首页
2_netty_NioEventLoopGroup

2_netty_NioEventLoopGroup

作者: loading_17 | 来源:发表于2018-06-16 23:37 被阅读0次

    NioEventLoopGroup

    直接拿官方的EchoServer类做分析

    public final class EchoServer {
    
        static final boolean SSL = System.getProperty("ssl") != null;
        static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    
        public static void main(String[] args) throws Exception {
            // Configure SSL.
            final SslContext sslCtx;
            if (SSL) {
                SelfSignedCertificate ssc = new SelfSignedCertificate();
                sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
            } else {
                sslCtx = null;
            }
    
            // Configure the server.
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            // workGroup没有传入线程数,则默认CPU*2线程
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            final EchoServerHandler serverHandler = new EchoServerHandler();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 100)
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline p = ch.pipeline();
                                if (sslCtx != null) {
                                    p.addLast(sslCtx.newHandler(ch.alloc()));
                                }
                                //p.addLast(new LoggingHandler(LogLevel.INFO));
                                p.addLast(serverHandler);
                            }
                        });
    
                // Start the server.
                ChannelFuture f = b.bind(PORT).sync();
    
                // Wait until the server socket is closed.
                f.channel().closeFuture().sync();
            } finally {
                // Shut down all event loops to terminate all threads.
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    

    直接看

    EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    

    EventLoopGroup负责管理一组EventLoop。
    boss用来accept客户端连接,worker用来处理客户端数据的读写操作。


    那么直接看NioEventLoopGroup的构造方法,它有多个构造方法,最终都会调用如下:

        public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                                 final SelectStrategyFactory selectStrategyFactory) {
            super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
        }
    
    • nThreads:线程数
    • executor:线程执行器,默认是null
    • selectorProvider:默认是KQueueSelectorProvider
    • selectStrategyFactory:默认DefaultSelectStrategyFactory
    • 还有个默认的RejectedExecutionHandlers:拒绝策略

    接着调用父类的构造,抽象类MultithreadEventLoopGroup,这个类继承了MutithreadEventExecutorGroup并实现了EventLoopGroup。用于多线程处理任务。

    //这里的DEFAULT_EVENT_LOOP_THREADS在类初始化的时候,静态代码块
    //中生成Runtime.getRuntime().availableProcessors() * 2
        protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
            super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
        }
    

    DEFAULT_EVENT_LOOP_THREADS默认为处理器数量的两倍。

    上述的构造会调用其父类MultithreadEventExecutorGroup的构造方法,下面会设置其两个属性值,children和chooser

    • children(private final EventExecutor[] children):保存EventLoop
    • chooser(EventExecutorChooserFactory.EventExecutorChooser chooser):选择EventLoop的策略类
    //这里又多了个参数
    //这里出现一个EventExecutor工厂选择类,默认是
    //DefaultEventExecutorChooserFactory,这个工厂使用轮询来选择
    //EventExecutor
        protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
            this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
        }
    

    接着调用该类的构造

        protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
            if (nThreads <= 0) {
                throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
            }
    
            if (executor == null) {
    //默认的ThreadPerTaskExecutor,每次都新建一个FastThreadLocalThread
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
    
            children = new EventExecutor[nThreads];
    
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                //通过newChild方法来创建EventExecutor,可以通过next()方法访问到。
                //每个将会为MultithreadEventExecutorGroup服务的线程都会调用这个方法
                //这里调用的是NioEventLoopGroup的newChild方法,方法里面创建了NioEventLoop
                //这里 EventLoop的构造,接收了NioEventLoopGroup、Executor、
                //SelectorProvider、SelectStrategy和RejectedExecutionHandler
                    children[i] = newChild(executor, args);
                    success = true;
                } catch (Exception e) {
                    // TODO: Think about if this is a good exception type
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                  //如果不成功就释放资源
                    if (!success) {
                        for (int j = 0; j < i; j ++) {
                            children[j].shutdownGracefully();
                        }
    
                        for (int j = 0; j < i; j ++) {
                            EventExecutor e = children[j];
                            try {
                                while (!e.isTerminated()) {
                                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                }
                            } catch (InterruptedException interrupted) {
                                // Let the caller handle the interruption.
                                Thread.currentThread().interrupt();
                                break;
                            }
                        }
                    }
                }
            }
    //这里使用的是DefaultEventExecutorChooserFactory,根据executor的长度选择Chooser
    //长度是2的幂次方,则选择PowerOfTwoEventExecutorChooser
    //如果不是选择GenericEventExecutorChooser,这里长度为1
    //选择前者。Chooser里面提供了next方法
            chooser = chooserFactory.newChooser(children);
    
    //创建终止监听器
            final FutureListener<Object> terminationListener = new FutureListener<Object>() {
                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (terminatedChildren.incrementAndGet() == children.length) {
                        terminationFuture.setSuccess(null);
                    }
                }
            };
    
            for (EventExecutor e: children) {
                e.terminationFuture().addListener(terminationListener);
            }
    
            Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
            Collections.addAll(childrenSet, children);
            readonlyChildren = Collections.unmodifiableSet(childrenSet);
        }
    

    再来看看这幅图,看下Netty中的Reactor模式。就是由一个不断等待和循环的进程(线程)来响应IO请求,IO请求就绪后,指派调用指定的handler做处理。



    上面定义的boss就是mainReactor,worker就是subReactor。代码后面再做分析。

    总结:

    1. NioEventLoopGroup管理了一组NioEventLoop
    2. 它指定了线程数量 核心数*2
    3. 它创建了executor,默认为ThreadPerTaskExecutor
    4. 它通过newChild方法创建NioEventLoop,NioEventLoop包含了上面创建的executor

    相关文章

      网友评论

          本文标题:2_netty_NioEventLoopGroup

          本文链接:https://www.haomeiwen.com/subject/rrubeftx.html