美文网首页
Netty源码2--server启动初始化过程

Netty源码2--server启动初始化过程

作者: xian_cheng | 来源:发表于2018-10-30 21:59 被阅读0次

上文分析了NioEventLoopGroup和NioEventLoop的关系,本文分析一下Netty在启动server时的初始化流程,仍然从netty server启动的demo例子入手。

public void start() throws InterruptedException {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        try {
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoServerHandler());
                        }
                    });
            ChannelFuture channelFuture = serverBootstrap.bind().sync();
            System.out.println(NettyServer.class.getName() +
                    " started and listen on " + channelFuture.channel().localAddress());
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully().sync();
        }
    }

整个启动过程,首先初始化NioEventLoopGroup线程池,然后初始化引导类,初始化内容稍后分析时讲到时再说,这里主要看serverBootstrap.bind()这个方法,通过进入源码,bind操作主要的代码实现流程在AbstractBootstrap类中实现。

   private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
         ....
        if (regFuture.isDone()) {
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
          ....
    }

这个doBind方法中最重要的是initAndRegister方法和doBind0方法,下面我们详细分析下这两个方法的主要实现过程。

  1. initAndRegister方法
    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
        }
           ....
        ChannelFuture regFuture = config().group().register(channel);
       ....
        return regFuture;
    }

这个方法首先通过channelFactory创建一个Channel,然后对这个Channel进行初始化操作,最后为这个Channel分配一个NioEventLoop完成这个Channel上的读写事件。
1.1 Channel的创建
我们首先看下channelFactory是在哪初始化的,通过追溯代码发现是在Demo程序中调用 .channel(NioServerSocketChannel.class)这句代码时完成初始化的。

      public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

channelFactory的具体实现类是ReflectiveChannelFactory,继续看下这个实现类。

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }

    @Override
    public T newChannel() {
        try {
            return clazz.getConstructor().newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

从这个实现类中的newChannel方法可以看到是通过反射创建了一个Channel对象,因为我们demo程序中传入的时NioServerSocketChannel类,所以在initAndRegister方法中创建的Channel就是一个NioServerSocketChannel对象。
1.2 init方法

    @Override
    void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
        ChannelPipeline p = channel.pipeline();
        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;   
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

这一堆代码中,首先对刚才第一步新建的NioServerSocketChannel完成参数赋值。
然后为这个channel对应的ChannelPipeline中添加了一个ServerBootstrapAcceptor处理器,这个处理器的作用主要是用来处理新建连接的,后面一节会详细讲。
1.3 register方法
这个方法主要是为新建的channel分配一个NioEventLoop,处理这个channel通道上的事件,看下下面这句代码。

    final ChannelFuture initAndRegister() {
        ChannelFuture regFuture = config().group().register(channel);
    }

config().group()返回的是一个bossGroup,bossGroup就是我们在demo程序中第一句创建的NioLoopGroup,这个group主要用来为新来的连接分配NioEventLoop。
register方法的具体实现在MultithreadEventLoopGroup类中,如下:

       @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
    @Override
    public EventExecutor next() {
        return chooser.next();
    }

分配NioEventLoop的具体实现在next方法,这个分配策略上节内容已经讲过,这里不再描述。
register方法的具体实现在AbstractChannel这个类中:

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                
                }
            }
        }

(1)通过AbstractChannel.this.eventLoop = eventLoop这句将分配到的NioEventLoop绑定到NioServersocketChannel上。
(2)判断NioEventLoop的线程是否已经启动,如果已经启动,调用register0方法;否则调用 eventLoop.execute方法启动线程,看一下这个方法的具体实现。

       @Override
    public void execute(Runnable task) {
        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            startThread();
        }
    }

    private void startThread() {
        if (state == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                try {
                    doStartThread();
                } catch (Throwable cause) {
                 
                }
            }
        }
    }

    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {        
    }

首先向LinkedBlockingQueue阻塞队列中添加需要执行的任务,然后这里会启动一个线程,这个线程的主要功能是由SingleThreadEventExecutor.this.run()这行代码中完成的,这个方法的具体实现实在NioEventLoop中实现的。

    protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                }

                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
           
        }
    }

这个方法里有一个for循环,最重要的两个方法是processSelectedKeys和runAllTasks,processSelectedKeys主要是监听io读写事件,runAllTasks主要是执行队列中存放的任务的,这两个方法的具体实现先不去研究,只要知道register方法会启动NioEventLoop的线程,此后这个线程会一直监听io事件和执行队列中的任务就可以了。
(3)从上面可以发现真正的注册是在register0这个方法中实现的,下面我们就看下这个方法的具体功能。

   @Override
   protected void doRegister() throws Exception {
       boolean selected = false;
       for (;;) {
           try {
               selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
               return;
           } catch (CancelledKeyException e) {
             
               }
           }
       }
   }

这个方法的主要功能就是将1.1 中新建的Channel注册到NioEventLoop的Selector上,这样NioEventLoop的Selector就可以监听到此channel的io事件了。
initAndRegister方法的主要过程就分析完了,其实主要有三点:
(1)创建了一个NioServerSocketChannel对象
(2)为NioServerSocketChannel对应的ChannelPipeLine增加了一个ServerBootstrapAcceptor处理器,用来处理新的连接。
(3) 从NioEventLoopGroup中分配了一个NioEventLoop,用于监听NioServerSocketChannel通道上的io事件。
这里要注意的是NioEventLoopGroup对应的是demo程序中的bossGroup,这个NioEventLoopGroup的NioEventLoop主要就是处理NioServerSocketChannel连接事件,下面一节就讲下客户端连接服务端时,服务端是如何处理这个连接事件的。

相关文章

网友评论

      本文标题:Netty源码2--server启动初始化过程

      本文链接:https://www.haomeiwen.com/subject/gtnktqtx.html