美文网首页Java 杂谈深入浅出Netty源码剖析
Netty源码阅读——服务器端的启动过程

Netty源码阅读——服务器端的启动过程

作者: 曾泽浩 | 来源:发表于2018-12-27 16:55 被阅读0次

    笔者个人理解,如有错误之后,请指正。

    一个简单的例子,不过是不可以运行的,少了一些类哈

    /**
     * Echoes back any received data from a client.
     */
    public final class EchoServer {
    
        static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    
        public static void main(String[] args) throws Exception {
            // Configure the server.
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            final EchoServerHandler serverHandler = new EchoServerHandler();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                 .channel(NioServerSocketChannel.class)
                 .option(ChannelOption.SO_BACKLOG, 100)
                 .handler(new LoggingHandler(LogLevel.INFO))
                 .childHandler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception {
                         ChannelPipeline p = ch.pipeline();
                         p.addLast(serverHandler);
                     }
                 });
    
                // Start the server.
                ChannelFuture f = b.bind(PORT).sync();
    
                // Wait until the server socket is closed.
                f.channel().closeFuture().sync();
            } finally {
                // Shut down all event loops to terminate all threads.
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    

    在分析源码之前,我们可以尝试思考几个问题,带着问题去看源码,才有目的性,才会有一种柳岸花明的感觉。

    1. 首先,netty是对原生NIO的一层封装,那netty是如何实现对原生NIO的封装的
    2. 第二,服务器是如何不断处理客户端的请求的
    3. 第三,bossGroup和workerGroup是如何分工的

    入口ServerBootStrap的bind()

    AbstractBootstrap#bind()

    public ChannelFuture bind(SocketAddress localAddress) {
        validate();
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }
    

    AbstractBootstrap#doBind()

    private ChannelFuture doBind(final SocketAddress localAddress) {
        // 初始化并且注册
        // 重点关注initAndRegister()
        final ChannelFuture regFuture = initAndRegister();
        // 拿到Channel
        final Channel channel = regFuture.channel();
        // cause()不为空,表示出错l
        if (regFuture.cause() != null) {
            return regFuture;
        }
        // 已经成功
        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();
    
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
    

    AbstractBootstrap#initAndRegister()

    final ChannelFuture initAndRegister() {
        // 对于ServerBootstrap,对应的是NioServerSocketChannel
        Channel channel = null;
        try {
            // 实例化Channel对象
            channel = channelFactory.newChannel();
            // 初始化
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }
    
        // 注册Channel
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
    
        return regFuture;
    }
    

    上面有一步很关键的代码

    // 实例化Channel对象
    channel = channelFactory.newChannel();
    

    首先看一下channelFactory,它是一个Channel工厂,它是什么时候被赋值的呢?

    AbstractBootstrap#channel()

    public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }
    

    在我们指定Channel的时候,会调用这个方法,服务器启动时传进来的是NioServerSocketChannel.class。

    看一下ReflectiveChannelFactory这个类,是一个反射的Channel工厂,通过传进来的Class来创建对应的Channel,还是比较容易理解的。

    public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
    
        private final Class<? extends T> clazz;
    
        public ReflectiveChannelFactory(Class<? extends T> clazz) {
            if (clazz == null) {
                throw new NullPointerException("clazz");
            }
            this.clazz = clazz;
        }
    
        @Override
        public T newChannel() {
            try {
                return clazz.getConstructor().newInstance();
            } catch (Throwable t) {
                throw new ChannelException("Unable to create Channel from class " + clazz, t);
            }
        }
    
        @Override
        public String toString() {
            return StringUtil.simpleClassName(clazz) + ".class";
        }
    }
    

    看回去channelFactory()方法

    public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
        return channelFactory((ChannelFactory<C>) channelFactory);
    }
    
    public B channelFactory(ChannelFactory<? extends C> channelFactory) {
        if (channelFactory == null) {
            throw new NullPointerException("channelFactory");
        }
        if (this.channelFactory != null) {
            throw new IllegalStateException("channelFactory set already");
        }
        //赋值,ChannelFactory
        this.channelFactory = channelFactory;
        return self();
    }
    

    上面就完成了对channelFactory的赋值,而且这个channelFactory是用来创建NioServerSocketChannel实例的。

    知道了channelFactory是怎么赋值的,是用来干嘛的,接着回去。

    // 实例化Channel对象
    channel = channelFactory.newChannel();
    

    上面这句话的意思就是创建一个NioServerSocketChannel的实例,那自然会调用它的构造方法,那就接下去看看NioServerSocketChannel的构造方法。

    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    
    /**
     * 创建一个新的ServerSocketChannel
     * @param provider
     * @return
     */
    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            /**
             *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
             *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
             *
             *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
             */
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }
    

    这一步,跟JDK NIO一样,需要创建一个ServerSocketChannel。

    public NioServerSocketChannel(ServerSocketChannel channel) {
        // 监听客户端连接
        super(null, channel, SelectionKey.OP_ACCEPT);
        // 配置
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
    

    接着看父类的构造函数

    AbstractNioMessageChannel的构造函数

    protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent, ch, readInterestOp);
    }
    

    继续往下看

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        // 这里的parent是为null
        super(parent);
        // 这里ch是ServerSocketChannel
        this.ch = ch;
        // readInterestOp是子类的传过来SelectionKey.OP_ACCEPT
        // 表示这个ch对客户端的连接事件有兴趣
        this.readInterestOp = readInterestOp;
        try {
            // 设置为非阻塞
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }
    
            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }
    

    总结一下,从上面channelFactory.newChannel()的过程中,会去创建一个ServerSocketChannel(这与JDK NIO的ServerSocketChannel是一样的),然后还有准备监听SelectionKey.OP_ACCEPT,不过从构造函数看来还没开始注册监听事件。

    继续回去看看,回到AbstractBootstrap#initAndRegister()的这个方法

    init(channel);
    

    对channel进行初始化,init()是一个抽象方法,由它的子类是实现。

    这里对应的子类是ServerBootstrap

    @Override
    void init(Channel channel) throws Exception {
        // 所有的选项
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }
    
        // 所有的属性
        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
    
        // 管道
        ChannelPipeline p = channel.pipeline();
    
        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
        }
    
        // 添加管道处理
        // 这一步并不是马上执行
        // 因为执行addLast方法时,会被判断这个Channel有没有被注册
        // 如果没有被注册的话,则会等到注册时再回调这个方法
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                // 处理器
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
    
                // 增加一个ServerBootstrapAcceptor
                // 这里用来处理新的客户端连接
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }
    

    上面的方法中,有ServerBootstrapAcceptor处理类,等会再回来看看。

    接着又看回去initAndRegister()

    final ChannelFuture initAndRegister() {
        // 对于ServerBootstrap,对应的是NioServerSocketChannel
        Channel channel = null;
        try {
            // 实例化Channel对象
            channel = channelFactory.newChannel();
            // 初始化
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
        }
    
        // 注册Channel
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }
    

    下面执行到这一步

    // 注册Channel
    ChannelFuture regFuture = config().group().register(channel);
    

    这个config().group()其实就是一开始传进来的bossGroup,看看register()方法

    这里最终会调用SingleThreadEventLoop的register()方法

    @Override
    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }
    
    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
    

    最后调用AbstractUnsafe这个类

    //注册
    @Override
    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        // 判断EventLoop不能为空
        if (eventLoop == null) {
            throw new NullPointerException("eventLoop");
        }
        // 判断是否已经注册过
        if (isRegistered()) {
            promise.setFailure(new IllegalStateException("registered to an event loop already"));
            return;
        }
        if (!isCompatible(eventLoop)) {
            promise.setFailure(
                    new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
            return;
        }
        // 赋值
        // 对于NioServerSocketChannel来说,这个才给eventLoop赋值
        // 指定这个channel对应的eventLoop
        AbstractChannel.this.eventLoop = eventLoop;
    
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                // 这里就会开启一个新的线程
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 接着继续看这里
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
                logger.warn(
                        "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                        AbstractChannel.this, t);
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }
    }
    

    register0()

    //注册
    private void register0(ChannelPromise promise) {
        try {
            // check if the channel is still open as it could be closed in the mean time when the register
            // call was outside of the eventLoop
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }
            // 第一次注册
            boolean firstRegistration = neverRegistered;
            doRegister();
            neverRegistered = false;
            // 已经注册
            registered = true;
    
            // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
            // user may already fire events through the pipeline in the ChannelFutureListener.
            
            // 这一步,会将之前addLast,但是没有添加进来的Handler,重新添加进来
            pipeline.invokeHandlerAddedIfNeeded();
    
            safeSetSuccess(promise);
    
            // 回调fireChannelRegistered() Channel已经被注册
            pipeline.fireChannelRegistered();
            // Only fire a channelActive if the channel has never been registered. This prevents firing
            // multiple channel actives if the channel is deregistered and re-registered.
            // 服务器刚开始启动时不会进来
            if (isActive()) {
                // 如果是第一次注册
                if (firstRegistration) {
                    // 回调
                    // 活跃状态
                    pipeline.fireChannelActive();
                } else if (config().isAutoRead()) {
                    // This channel was registered before and autoRead() is set. This means we need to begin read
                    // again so that we process inbound data.
                    //
                    // See https://github.com/netty/netty/issues/4805
                    beginRead();
                }
            }
        } catch (Throwable t) {
            // Close the channel directly to avoid FD leak.
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
    

    上面的操作时注册Channel,这个时候会去启动一个线程,这里的eventLoop是NioEventLoopGroup,这里执行execute()方法时,回去调用NioEventLoop的run()方法

    /**
     * 线程启动的时候会执行这个方法
     */
    @Override
    protected void run() {
        for (;;) {
            try {
                try {
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
    
                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO
    
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
    
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                    // the selector and retry. https://github.com/netty/netty/issues/8566
                    rebuildSelector0();
                    handleLoopException(e);
                    continue;
                }
    
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        // 处理SelectedKeys
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        // 运行所有任务
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        // 处理SelectedKeys
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            // Always handle shutdown even if the loop processing threw an exception.
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
    

    这里的run()方法先不细说,回头再看看。

    run()方法主要做了3件事情:

    1. 轮询注册到selector的所有的channel的I/O事件

      select();
      
    2. 处理产生网络I/O事件的channel

      processSelectedKeys();
      
    3. 运行所有任务

    runAllTasks();
    

    到这里,我们就分析玩了initAndRegister()这个方法,然后现在继续返回看doBind()方法吧。

    private ChannelFuture doBind(final SocketAddress localAddress) {
        // 初始化并且注册
        final ChannelFuture regFuture = initAndRegister();
        // 拿到Channel
        final Channel channel = regFuture.channel();
        // cause()不为空,表示出错l
        if (regFuture.cause() != null) {
            return regFuture;
        }
        // 已经成功
        if (regFuture.isDone()) {
            // At this point we know that the registration was complete and successful.
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else {
            // Registration future is almost always fulfilled already, but just in case it's not.
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                        // IllegalStateException once we try to access the EventLoop of the Channel.
                        promise.setFailure(cause);
                    } else {
                        // Registration was successful, so set the correct executor to use.
                        // See https://github.com/netty/netty/issues/2586
                        promise.registered();
    
                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }
    

    接着看doBind0()方法

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
    
        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        //
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                //
                if (regFuture.isSuccess()) {
                    // 绑定
                    // 添加监听
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
    

    最终会调用AbstractUnsafe类的bind()方法,其实所有的网络I/O操作最终都是由Unsafe类去完成的。

    // 绑定
    @Override
    public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    
        assertEventLoop();
    
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
    
        // See: https://github.com/netty/netty/issues/576
        if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
            localAddress instanceof InetSocketAddress &&
            !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
            !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
            // Warn a user about the fact that a non-root user can't receive a
            // broadcast packet on *nix if the socket is bound on non-wildcard address.
            logger.warn(
                    "A non-root user can't receive a broadcast packet if the socket " +
                    "is not bound to a wildcard address; binding to a non-wildcard " +
                    "address (" + localAddress + ") anyway as requested.");
        }
    
        // 激活状态
        boolean wasActive = isActive();
        try {
            // 绑定处理
            doBind(localAddress);
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            closeIfClosed();
            return;
        }
    
        if (!wasActive && isActive()) {
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    pipeline.fireChannelActive();
                }
            });
        }
    
        safeSetSuccess(promise);
    }
    

    接着NioServerSocketChannel的doBind()方法。

    上面还有一步操作pipeline.fireChannelActive(),通知channel处于激活状态,

    跟踪进去。

    DefaultChannelPipeline的fireChannelActive()

    @Override
    public final ChannelPipeline fireChannelActive() {
        AbstractChannelHandlerContext.invokeChannelActive(head);
        return this;
    }
    
    static void invokeChannelActive(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelActive();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelActive();
                }
            });
        }
    }
    
    private void invokeChannelActive() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelActive(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelActive();
        }
    }
    

    DefaultChannelPipeline的channelActive()

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    
        readIfIsAutoRead();
    }
    

    跟踪进来这么久,发现了一个比较有用的东西

    readIfIsAutoRead();
    

    那就接着进去看看呗

    private void readIfIsAutoRead() {
        if (channel.config().isAutoRead()) {
            channel.read();
        }
    }
    

    默认是自动读的,又看到channel去调用网络I/O操作,那最终还是交给UnSafe类去完成的

    果不其然,最终调用了DefaultChannelPipeline的read()方法

    然后是AbstractUnSafe类的beginRead()方法

    @Override
    public final void beginRead() {
        assertEventLoop();
    
        if (!isActive()) {
            return;
        }
    
        try {
            doBeginRead();
        } catch (final Exception e) {
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    // 捕获异常
                    pipeline.fireExceptionCaught(e);
                }
            });
            close(voidPromise());
        }
    }
    

    然后再调用AbsrtactNioChannel的doBeginRead()方法

    @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }
    
        readPending = true;
    
        final int interestOps = selectionKey.interestOps();
        // 等于0 说明没有注册读事件
        if ((interestOps & readInterestOp) == 0) {
            // 监听网络事件
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
    

    还记得readInterestOp吗?在最开始的时候,创建NioServerSocketChannel实例的时候,传进去了SelectionKey.OP_ACCEPT,所以这个时候channel就会监听客户端的连接事件了。

    selectionKey.interestOps(interestOps | readInterestOp);
    

    总结一下上面的分析,上面的分析涵盖了原生NIO的使用,创建ServerSocketChannel,绑定端口,注册监听事件,轮询I/O事件,处理I/O事件等等。

    记得上面提到的ServerBootstrapAcceptor类吗?

    @Override
    @SuppressWarnings("unchecked")
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 这里的Channel是NioSocketChannel
        final Channel child = (Channel) msg;
    
        child.pipeline().addLast(childHandler);
    
        setChannelOptions(child, childOptions, logger);
    
        for (Entry<AttributeKey<?>, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }
    
        try {
            // childGroup是workerGroup
            // 这里会把channel交给workerGroup处理
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }
    

    呀呀呀呀,感觉讲得不是很好,还有一些细节的东西没有讲到。

    有时候看懂代码了,但是讲不清楚,但是需要努力呀。

    相关文章

      网友评论

        本文标题:Netty源码阅读——服务器端的启动过程

        本文链接:https://www.haomeiwen.com/subject/gzcdlqtx.html