美文网首页
4.Netty源码-服务器启动

4.Netty源码-服务器启动

作者: 砂糖z | 来源:发表于2020-04-15 16:21 被阅读0次

    我们带着问题去看源码,下面是我的问题:

    • serverSocketChannel是什么时候与selector绑定的,流程是什么
    • 启动的线程数有多少,什么时候new的,什么时候start的
    • 有那些地方需要用锁

    前提概要:netty代码非常深,尽量写详细,贴出代码中可以看todo的注解。2.1章节为重点,提到了线程的创建与start
    可以先看最后的总结再看。

    服务端代码

    主要是分析

    • NioEventLoopGroup bossGroup = new NioEventLoopGroup();
    • ChannelFuture cf = bootstrap.bind(6666).sync();
     public static void main(String[] args)  {
        //1.创建Boss NioEventLoopGroup , Worker NioEventLoopGroup
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
          //2.创建服务器端的启动对象,配置参数
          ServerBootstrap bootstrap = new ServerBootstrap();
          bootstrap.group(bossGroup, workerGroup)
              .channel(NioServerSocketChannel.class)
              //线程队列得到连接个数
              .option(ChannelOption.SO_BACKLOG, 128)
              //设置保持活动链接状态
              .childOption(ChannelOption.SO_KEEPALIVE, true)
              //给workGroup的EventLoop对应的管道设置handler
              .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                  socketChannel.pipeline().addLast(new NettyServerHandler());
                }
              });
          //3.绑定端口并且同步
          ChannelFuture cf = bootstrap.bind(6666).sync();
    
          //4.对关闭通道进行监听
          cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
          e.printStackTrace();
        }finally {
          bossGroup.shutdownGracefully();
          workerGroup.shutdownGracefully();
        }
      }
    

    猜想

    NioEventLoopGroup bossGroup = new NioEventLoopGroup();

    • 创建NioEventLoop
    • 创建Selector
      ChannelFuture cf = bootstrap.bind(6666).sync();
    • 创建ServerSocketChannel
    • 选出一个Selector于ServerSocketChannel绑定

    实际

    1.NioEventLoopGroup bossGroup = new NioEventLoopGroup();

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                                EventExecutorChooserFactory chooserFactory, Object... args) {
          // TODO: 不指定的时候,线程数默认cpu核数 *2
            if (nThreads <= 0) {
                throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
            }
         //TODO:executor.execute(Runnable command) 使用ThreadFactory创建一个新线程并且start();
            if (executor == null) {
                executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
            }
    
            children = new EventExecutor[nThreads];
    
            for (int i = 0; i < nThreads; i ++) {
                boolean success = false;
                try {
                    // TODO: 创建NioEventLoop
                    children[i] = newChild(executor, args);
                    success = true;
                } catch (Exception e) {
                    throw new IllegalStateException("failed to create a child event loop", e);
                } finally {
                    if (!success) {
                        for (int j = 0; j < i; j ++) {
                            children[j].shutdownGracefully();
                        }
    
                        for (int j = 0; j < i; j ++) {
                            EventExecutor e = children[j];
                            try {
                                while (!e.isTerminated()) {
                                    e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                                }
                            } catch (InterruptedException interrupted) {
                                // Let the caller handle the interruption.
                                Thread.currentThread().interrupt();
                                break;
                            }
                        }
                    }
                }
            }
           // TODO: 根据nThreads线程数来选出选择选择器
            chooser = chooserFactory.newChooser(children);
    
            final FutureListener<Object> terminationListener = new FutureListener<Object>() {
                @Override
                public void operationComplete(Future<Object> future) throws Exception {
                    if (terminatedChildren.incrementAndGet() == children.length) {
                        terminationFuture.setSuccess(null);
                    }
                }
            };
        // TODO:加一个监听器
            for (EventExecutor e: children) {
                e.terminationFuture().addListener(terminationListener);
            }
        //TODO: 做出一个只读的集合
            Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
            Collections.addAll(childrenSet, children);
            readonlyChildren = Collections.unmodifiableSet(childrenSet);
        }
    

    我觉得需要着重注意一下executor这个属性类型ThreadPerTaskExecutor,因为代码中充斥了大量的使用这个,下面我们去看NioEventLoop的创建,时刻记得我们的问题什么时候创建的线程,什么时候启动的线程?

    例子

    1.2 children[i] = newChild(executor, args);

    这里调用的逻辑为NioEventLoopGroup类中的


    newChild

    创建好的NioEventLoop


    NioEventLoop
    • 所以这个时候并没有创建NioEventLoop里的线程
    • 创建了Selector
    • 有一个队列底层使用:Mpsc.newMpscQueue();,底层使用MpscUnboundedArrayQueue.class,传值1024
    • 有一个threadLock
    • exector较NioEventLoopGroup的exector又包装了一层 ,exector类型为ThreadExecutorMap.class 。很显然eventLoop.exector()的时候在开始的时候会来调用这个NioEventLoopGroup的exector中的ThreadFactory去newThread并且start()。
      • ThreadExecutorMap里面存储:每个线程一个NioEventLoop

    1.3 chooser = chooserFactory.newChooser(children);

    public EventExecutorChooser newChooser(EventExecutor[] executors) {
            if (isPowerOfTwo(executors.length)) {
                return new PowerOfTwoEventExecutorChooser(executors);
            } else {
                return new GenericEventExecutorChooser(executors);
            }
        }
    

    根据线程数初始化选择器,如果是偶数,则选PowerOfTwoEventExecutorChooser(count &(len -1)),这样的效率比取余数效率更高。

    1 小总结

    • NioEventLoopGroup除了有NioEventLoop的数组,还要一个只读的集合。ServerBootstrap.class里面也有一个多有数据只读的属性ServerBootstrapConfig.class
    • 这一步只是创建了Selector和choose

    2.ChannelFuture cf = bootstrap.bind(6666).sync();

    代码太多了,这里打算不粘贴全部源码了,只写出我觉得是重点的部分,可以打开源码看文档,搭配食用
    调用io.netty.bootstrap.AbstractBootstrap#doBind

          //TODO: 根据反射创建ServerSocketChannel,完成注册
            final ChannelFuture regFuture = initAndRegister();
          
           //TODO: ServerSocketChannel地址绑定
            doBind0(regFuture, channel, localAddress, promise);
    

    2.1 final ChannelFuture regFuture = initAndRegister();

    final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                //TODO: 反射初始化Channel,
                channel = channelFactory.newChannel();
              //TODO: 小重点,初始化channel
                init(channel);
            } catch (Throwable t) {
                if (channel != null) {
                    // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                    channel.unsafe().closeForcibly();
                    // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                    return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
                }
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
            }
           //TODO: 没有异常一般都会执行到这部分逻辑
            ChannelFuture regFuture = config().group().register(channel);
            if (regFuture.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close();
                } else {
                    channel.unsafe().closeForcibly();
                }
            }
            return regFuture;
        }
    

    channel = channelFactory.newChannel();初始化后的channel

    NioServerSocketChannel
    在初始化的时候创建java的ServerSocketChannel,并且设置为非阻塞
    io.netty.bootstrap.ServerBootstrap#init
    @Override
        void init(Channel channel) {
            setChannelOptions(channel, options0().entrySet().toArray(EMPTY_OPTION_ARRAY), logger);
            setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
    
            ChannelPipeline p = channel.pipeline();
            final EventLoopGroup currentChildGroup = childGroup;
            final ChannelHandler currentChildHandler = childHandler;
            final Entry<ChannelOption<?>, Object>[] currentChildOptions =
                    childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY);
            final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
          
          
            p.addLast(new ChannelInitializer<Channel>() {
               //TODO: 需要关注这个方法执行的时间
               @Override
                public void initChannel(final Channel ch) {
                    final ChannelPipeline pipeline = ch.pipeline();
                    //TODO: 在pipline上添加配置的handler
                    ChannelHandler handler = config.handler();
                    if (handler != null) {
                        pipeline.addLast(handler);
                    }
                    //TODO: 给pipLines中添加一个ServerBootstrapAcceptor的handler
                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
                }
            });
        }
    
    channel初始化后

    别想错了,这个时候还没有执行那个ChannelInitializer#initChannel方法。所以目前都还没有执行eventLoop.executor(..)

    ChannelFuture regFuture = config().group().register(channel);

    io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
     @Override
        public ChannelFuture register(Channel channel) {
           //TODO: 调用choose选出一个NioEventLoop注册上去
            return next().register(channel);
        }
    register方法最后会调用的逻辑在,io.netty.channel.AbstractChannel.AbstractUnsafe#register
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
                ObjectUtil.checkNotNull(eventLoop, "eventLoop");
                if (isRegistered()) {
                    promise.setFailure(new IllegalStateException("registered to an event loop already"));
                    return;
                }
                if (!isCompatible(eventLoop)) {
                    promise.setFailure(
                            new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                    return;
                }
    
                AbstractChannel.this.eventLoop = eventLoop;
                //TODO:当前线程是main线程,返回false
                if (eventLoop.inEventLoop()) {
                    register0(promise);
                } else {
                    try {
                       //TODO:第一次调用调用  eventLoop.execute,这次会在NioEventLoop创建线程并且start
                        eventLoop.execute(new Runnable() {
                            @Override
                            public void run() {
                                register0(promise);
                            }
                        });
                    } catch (Throwable t) {
                        logger.warn(
                                "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                                AbstractChannel.this, t);
                        closeForcibly();
                        closeFuture.setClosed();
                        safeSetFailure(promise, t);
                    }
                }
            }
    io.netty.channel.AbstractChannel.AbstractUnsafe#register0
    private void register0(ChannelPromise promise) {
                try {
                    // check if the channel is still open as it could be closed in the mean time when the register
                    // call was outside of the eventLoop
                    if (!promise.setUncancellable() || !ensureOpen(promise)) {
                        return;
                    }
                    boolean firstRegistration = neverRegistered;
                    //TODO:重点1
                    doRegister();
                    neverRegistered = false;
                    registered = true;
    
                    // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                    // user may already fire events through the pipeline in the ChannelFutureListener.
                    //TODO:重点2
                    pipeline.invokeHandlerAddedIfNeeded();
                    //TODO:重点3,在doBind0()绑定地址哪里有讲
                    safeSetSuccess(promise);
                     
                    pipeline.fireChannelRegistered();
                    // Only fire a channelActive if the channel has never been registered. This prevents firing
                    // multiple channel actives if the channel is deregistered and re-registered.
                    if (isActive()) {
                        if (firstRegistration) {
                            pipeline.fireChannelActive();
                        } else if (config().isAutoRead()) {
                            // This channel was registered before and autoRead() is set. This means we need to begin read
                            // again so that we process inbound data.
                            //
                            // See https://github.com/netty/netty/issues/4805
                            beginRead();
                        }
                    }
                } catch (Throwable t) {
                    // Close the channel directly to avoid FD leak.
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
    重点1:io.netty.channel.nio.AbstractNioChannel#doRegister
    @Override
        protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                  //TODO:给这个channel注册了0事件,注意不是OP_ACCEPT
                    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                    return;
                } catch (CancelledKeyException e) {
                    if (!selected) {
                        // Force the Selector to select now as the "canceled" SelectionKey may still be
                        // cached and not removed because no Select.select(..) operation was called yet.
                        eventLoop().selectNow();
                        selected = true;
                    } else {
                        // We forced a select operation on the selector before but the SelectionKey is still cached
                        // for whatever reason. JDK bug ?
                        throw e;
                    }
                }
            }
        }
    重点2:最后逻辑调用io.netty.channel.DefaultChannelPipeline#callHandlerAddedForAllHandlers
    private void callHandlerAddedForAllHandlers() {
            final PendingHandlerCallback pendingHandlerCallbackHead;
            synchronized (this) {
                assert !registered;
    
                // This Channel itself was registered.
                registered = true;
    
                pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
                // Null out so it can be GC'ed.
                this.pendingHandlerCallbackHead = null;
            }
    
            // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
            // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
            // the EventLoop.
            PendingHandlerCallback task = pendingHandlerCallbackHead;
            while (task != null) {
               //todo:这个方法最后其实就是调用了pipLine.addLast(ChannelInitializer<Channel>#initChannel)
              //完成最后NioServerSocketChannel的pipline中handler的初始化
                task.execute();
                task = task.next;
            }
        }
    
    

    强势总结:

    • 代码执行到现在NioServerSocketChannel初始化了。regregister0()里面的逻辑是channel与selector绑定,并且完成最后的pipline的初始化,但是异步执行,所以需要看那个地方执行了regregister0()

    2.1 重点中的重点第一次调用eventLoop.executor(),所以分析逻辑

    会调用io.netty.util.concurrent.SingleThreadEventExecutor#execute(java.lang.Runnable)
     @Override
        public void execute(Runnable task) {
            ObjectUtil.checkNotNull(task, "task");
            execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
        }
    execute:
    private void execute(Runnable task, boolean immediate) {
            boolean inEventLoop = inEventLoop();
            // TODO: 加入NioEventLoop的taskQueue中,什么时候执行等我揭晓。
            addTask(task);
           //TODO:当前线程仍然是main,则当我们在NioEventLoop线程中exeutor(Runable)会直接加入队列中
            if (!inEventLoop) {
               //TODO:重点
                startThread();
                if (isShutdown()) {
                    boolean reject = false;
                    try {
                        if (removeTask(task)) {
                            reject = true;
                        }
                    } catch (UnsupportedOperationException e) {
                        // The task queue does not support removal so the best thing we can do is to just move on and
                        // hope we will be able to pick-up the task before its completely terminated.
                        // In worst case we will log on termination.
                    }
                    if (reject) {
                        reject();
                    }
                }
            }
    
            if (!addTaskWakesUp && immediate) {
                wakeup(inEventLoop);
            }
        }
    
    startThread:
     private void startThread() {
            if (state == ST_NOT_STARTED) {
                if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                    boolean success = false;
                    try {
                        doStartThread();
                        success = true;
                    } finally {
                        if (!success) {
                            STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                        }
                    }
                }
            }
        }
    
    

    有点绕,仔细看箭头:
    结论:doStartThread()里面的这个Runable的方法在NioEventLoop中的线程.start()的时候调用的。则在threadFactory.newThread(这里的runable是doStartThread中new出来的).start()


    第一次eventLoop.executor(..)

    看到现在强势总结:

    • 线程也创建了但是仍然没有看到要执行register0()所以现在可以看doStartThread()中SingleThreadEventExecutor.this.run(),执行这行代码的是在NioEventLoop线程中的,所以这个this没有问题吧。SingleThreadEventExecutor里面的代码大致是轮训遍历执行Selector里准备就绪的事件和taskQueue里面的事件,在下一章会有介绍。

    2.2 doBind0(regFuture, channel, localAddress, promise);

    什么时候被调用的?

    • 因为它是在 regFuture.addListener(new ChannelFutureListener() {})中的回调方法中的执行的,所以他在 io.netty.channel.AbstractChannel.AbstractUnsafe#register0 的 safeSetSuccess(promise);行代码被调用的。
    • 代码非常深,逻辑处理:io.netty.channel.socket.nio.NioServerSocketChannel#doBind
    @SuppressJava6Requirement(reason = "Usage guarded by java version check")
        @Override
        protected void doBind(SocketAddress localAddress) throws Exception {
            if (PlatformDependent.javaVersion() >= 7) {
                javaChannel().bind(localAddress, config.getBacklog());
            } else {
                javaChannel().socket().bind(localAddress, config.getBacklog());
            }
        }
      最后还会执行这一段代码:
           invokeLater(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.fireChannelActive();
                        }
                    });
     pipeline.fireChannelActive();代码非常深:会执行这段逻辑
    io.netty.channel.nio.AbstractNioChannel#doBeginRead
    @Override
        protected void doBeginRead() throws Exception {
            // Channel.read() or ChannelHandlerContext.read() was called
            final SelectionKey selectionKey = this.selectionKey;
            if (!selectionKey.isValid()) {
                return;
            }
    
            readPending = true;
    
            final int interestOps = selectionKey.interestOps();
            if ((interestOps & readInterestOp) == 0) {
                //TODO:注册16,即OP_ACCEPT事件
                selectionKey.interestOps(interestOps | readInterestOp);
            }
        }
    

    总结

    那么Netty的服务端的启动流程:

    • NioEventLoopGroup bossGroup = new NioEventLoopGroup();
      • 创建NioEventLoop,即创建Selector,Selector是NioEventLoop的一个属性变量
      • 创建choose : 轮训让Channel绑定NioEventLoop的Selector上的算法
    • ChannelFuture cf = bootstrap.bind(6666).sync();
      • 通过反射创建NioServerSocketChannel
      • NioServerSocketChannel初始化,添加一个回调函数为pipLine上加一个ServerBootstrapAcceptor的handler
      • 添加一个回调函数绑定地址
      • (在NioEventLoop线程中)NioServerSocketChannel注册上NioEventLoop的Selector上,绑定的是0
      • (在NioEventLoop线程中)调用之前的回调函数NioServerSocketChannel绑定地址
      • (在NioEventLoop线程中) 为NioServerSocektChannel注册OP_ACCEPT事件。

    相关文章

      网友评论

          本文标题:4.Netty源码-服务器启动

          本文链接:https://www.haomeiwen.com/subject/sndqvhtx.html