美文网首页
1_Netty源码分析之Netty服务端启动

1_Netty源码分析之Netty服务端启动

作者: 小安的大情调 | 来源:发表于2019-03-10 19:06 被阅读0次

    本文均为原创,如需转载请注明出处。

    [TOC]

    Netty服务端创建流程分析

    ​ Netty为了向使用者屏蔽NIO通信的底层细节,在和用户交互的边界做了封装,母的就是为了减少用户开发工作量,降低开发难度。BootstrapSocket客户端创建工具类,用户听过Bootstrap可以方柏霓地创建Netty地客户端并发起异步TCP连接操作。

    Netty服务端时序图

    Netty服务端--Channel的创建

    首先基于NIO的学习,思考两个问题

    • 服务端的Socket在哪里初始化?

    • 在哪里accept连接?

    研究服务端是如何创建的,查看源码,首先应该从源头出发,直接调用的方法开始层层深入。

    Netty服务端的入口bind()方法

    ​ 服务端创建的入口bind()方法

    // 绑定端口,同步等待成功
    ChannelFuture f = b.bind(port).sync();
    

    进入bind()方法发现会执行一个dobind()方法

        private ChannelFuture doBind(final SocketAddress localAddress) {
            final ChannelFuture regFuture = initAndRegister();
            final Channel channel = regFuture.channel();
            if (regFuture.cause() != null) {
                return regFuture;
            }
    

    该方法调用了一个initAndRegister()

     final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                channel = channelFactory.newChannel();
                init(channel);
            } catch (Throwable t) {
                if (channel != null) {
                    // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                    channel.unsafe().closeForcibly();
                }
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
    

    很明显可以看出。这里会创建一个底层的channel,调用Netty维护的一个工厂类,下面的init()方法是一个抽象类,可以看看那些方法继承并实现了它呢?😊

    AbstractBootstrap下的两个子类.png

    所以可想而知!在调用bind()方法后,Netty会调用JDK底层初始化一个Channel,回过头来看看channelFactory.newChannel()的实现。

    ChannelFactory下有一个利用反射的实现子类ReflectiveChannelFactory(),从名字就可以看得出来,是一个利用反射来初始化channel的工厂类。该类下的newChannel()方法:

        @Override
        public T newChannel() {
            try {
                return clazz.newInstance();
            } catch (Throwable t) {
                throw new ChannelException("Unable to create Channel from class " + clazz, t);
            }
        }
    

    确实是利用了返回来进行实例化的。那么它是如何知道实话化那个对象呢?

    ​ 让我们回到上一层开始出现channelFactory.newChannel(),在这里可以很清楚的看出来,当前类AbstractBootstrap自身维护着该工厂对象,并且在构造函数中给该工厂类所需要的对象进行了赋值。

        AbstractBootstrap() {
            // Disallow extending from a different package.
        }
    
        AbstractBootstrap(AbstractBootstrap<B, C> bootstrap) {
            group = bootstrap.group;
            // 那么这里的bootstrap对象又是从哪里得到的呢?
            channelFactory = bootstrap.channelFactory;
            handler = bootstrap.handler;
            localAddress = bootstrap.localAddress;
            synchronized (bootstrap.options) {
                options.putAll(bootstrap.options);
            }
            synchronized (bootstrap.attrs) {
                attrs.putAll(bootstrap.attrs);
            }
        }
    

    这里的AbstractBootstrap<B, C> bootstrap很直接的就可以想到我们在最外层定义的AbstractBootstrap的子类Bootstrap/ServerBootstrap中设置了

              ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        .childHandler(new ChildChannelHandler());
    

    其中设置的channel属性就是告诉底层的channelFactory来实例化该对象。

    这里我们验证一下,直接从我们的代码中.channel()方法中去,可以很清楚的看出,上面说的调用的是ChannelFactory下的子类ReflectiveChannelFactory

        public B channel(Class<? extends C> channelClass) {
            if (channelClass == null) {
                throw new NullPointerException("channelClass");
            }
            // 调用工厂类,利用反射创建对象
            return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
        }
    

    该类的实现非常的简单:

    public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
    
        private final Class<? extends T> clazz;
    
        // 直接返回 需要初始化类的 class 对象
        public ReflectiveChannelFactory(Class<? extends T> clazz) {
            if (clazz == null) {
                throw new NullPointerException("clazz");
            }
            this.clazz = clazz;
        }
    
        // 利用反射 初始化返回实例对象
        @Override
        public T newChannel() {
            try {
                return clazz.newInstance();
            } catch (Throwable t) {
                throw new ChannelException("Unable to create Channel from class " + clazz, t);
            }
        }
    
        @Override
        public String toString() {
            return StringUtil.simpleClassName(clazz) + ".class";
        }
    }
    

    现在明白了channel是由谁创建的,那么到底是怎么创建出来的呢?现在进入NioServerSocketChannel.查看该对象的构造函数,做了那些事情。

    通过NioServerSocketChannel加密Channel的创建

        /**
         * Create a new instance
         */
        public NioServerSocketChannel() {
            this(newSocket(DEFAULT_SELECTOR_PROVIDER));
        }
    // 该方法 返回的对象为 java.nio.channels;下的。调用的方法也是JDK底层的实现
        private static ServerSocketChannel newSocket(SelectorProvider provider) {
            try {
                return provider.openServerSocketChannel();
            } catch (IOException e) {
                throw new ChannelException(
                        "Failed to open a server socket.", e);
            }
        }
    // 原来NioServerSocketChannel 是直接调用的JDK底层的newSocket来来创建Channel 通道
    

    其中还有一个构造

        /**
         * Create a new instance using the given {@link ServerSocketChannel}.
         */
        public NioServerSocketChannel(ServerSocketChannel channel) {
            super(null, channel, SelectionKey.OP_ACCEPT);
            // 这里是可以生成一个NiO channel配置信息
            config = new NioServerSocketChannelConfig(this, javaChannel().socket());
        }
    
    

    一次点进该方法的父类可以找到在NIO编程中不可避免地一项异步配置:ch.configureBlocking(false);,配置为异步非阻塞。

        protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
            super(parent);
            this.ch = ch;
            this.readInterestOp = readInterestOp;
            try {
                ch.configureBlocking(false);// 配置为异步 非阻塞 (重点)
            } catch (IOException e) {
                try {
                    ch.close();
                } catch (IOException e2) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(
                                "Failed to close a partially initialized socket.", e2);
                    }
                }
    
                throw new ChannelException("Failed to enter non-blocking mode.", e);
            }
        }
    
    Channel UML图.png

    在上面继承关系中AbstractChannel维护着channel通道地内部属性

    
        /**
         * Creates a new instance.
         *
         * @param parent
         *        the parent of this channel. {@code null} if there's no parent.
         */
        protected AbstractChannel(Channel parent) {
            this.parent = parent;
            id = newId(); // Channel的 唯一标识
            unsafe = newUnsafe();// 对应 底层TCP读写的相关操作
            pipeline = newChannelPipeline();// 后续研究😊
        }
    

    在服务端channel初始化完成之后,下一步就需要将该channel注册到selector上面。

    注册selector

    ​ 在上述代码channel初始化完成的地方为:调用了init()方法。因此应该从该地方出发查看如何将channel注册到selector上。initAndRegister初始并注册。

    
        final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                // 初始化操作
                channel = channelFactory.newChannel();
                init(channel);
            } catch (Throwable t) {
                if (channel != null) {
                    // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                    channel.unsafe().closeForcibly();
                }
                // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
                return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
            }
    
            // 注册channel
            ChannelFuture regFuture = config().group().register(channel);
            if (regFuture.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close();
                } else {
                    channel.unsafe().closeForcibly();
                }
            }
    

    其中register的实现在AbstractChannel下。

     @Override
            public final void register(EventLoop eventLoop, final ChannelPromise promise) {
                if (eventLoop == null) {
                    throw new NullPointerException("eventLoop");
                }
                if (isRegistered()) {
                    promise.setFailure(new IllegalStateException("registered to an event loop already"));
                    return;
                }
                if (!isCompatible(eventLoop)) {
                    promise.setFailure(
                            new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                    return;
                }
    
                // 绑定线程,简单的赋值操作
                AbstractChannel.this.eventLoop = eventLoop;
    
                if (eventLoop.inEventLoop()) {
                    register0(promise);
                } else {
                    try {
                        eventLoop.execute(new Runnable() {
                            @Override
                            public void run() {
                                // 实现注册
                                register0(promise);
                            }
                        });
                    } catch (Throwable t) {
                        logger.warn(
                                "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                                AbstractChannel.this, t);
                        closeForcibly();
                        closeFuture.setClosed();
                        safeSetFailure(promise, t);
                    }
                }
            }
    
    

    register0(promise);

    private void register0(ChannelPromise promise) {
                try {
                    // check if the channel is still open as it could be closed in the mean time when the register
                    // call was outside of the eventLoop
                    if (!promise.setUncancellable() || !ensureOpen(promise)) {
                        return;
                    }
                    boolean firstRegistration = neverRegistered;
                    // 调用Jdk底层注册
                    doRegister();
                    neverRegistered = false;
                    registered = true;
    
                    // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                    // user may already fire events through the pipeline in the ChannelFutureListener.
                    // 触发事件
                    pipeline.invokeHandlerAddedIfNeeded();
    
                    safeSetSuccess(promise);
                    // 传播时间
                    pipeline.fireChannelRegistered();
                    // Only fire a channelActive if the channel has never been registered. This prevents firing
                    // multiple channel actives if the channel is deregistered and re-registered.
                    if (isActive()) {
                        if (firstRegistration) {
                            pipeline.fireChannelActive();
                        } else if (config().isAutoRead()) {
                            // This channel was registered before and autoRead() is set. This means we need to begin read
                            // again so that we process inbound data.
                            //
                            // See https://github.com/netty/netty/issues/4805
                            beginRead();
                        }
                    }
                } catch (Throwable t) {
                    // Close the channel directly to avoid FD leak.
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
    
    
        @Override
        protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                    // 调用JDK 方法实现注册
                    selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                    return;
    

    注册成功后,考虑端口绑定。

    端口绑定

    还是根据dobind()方法可以看到里面有个bind0()方法。channel对象调用bind()方法,在AbstractChannel()下有具体实现

    @Override
            public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
                assertEventLoop();
    
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
    
                // See: https://github.com/netty/netty/issues/576
                if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
                    localAddress instanceof InetSocketAddress &&
                    !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
                    !PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {
                    // Warn a user about the fact that a non-root user can't receive a
                    // broadcast packet on *nix if the socket is bound on non-wildcard address.
                    logger.warn(
                            "A non-root user can't receive a broadcast packet if the socket " +
                            "is not bound to a wildcard address; binding to a non-wildcard " +
                            "address (" + localAddress + ") anyway as requested.");
                }
    
                boolean wasActive = isActive();
                try {
                    // 调用JDK底层 绑定
                    doBind(localAddress);
                } catch (Throwable t) {
                    safeSetFailure(promise, t);
                    closeIfClosed();
                    return;
                }
    
                if (!wasActive && isActive()) {
                    invokeLater(new Runnable() {
                        @Override
                        public void run() {
                            // 触发事件
                            pipeline.fireChannelActive();
                        }
                    });
                }
    
                safeSetSuccess(promise);
            }
    
    

    调用底层的绑定方法

        @Override
        protected void doBind(SocketAddress localAddress) throws Exception {
            if (PlatformDependent.javaVersion() >= 7) {
                // 调用底层JDK的绑定方法
                javaChannel().bind(localAddress, config.getBacklog());
            } else {
                javaChannel().socket().bind(localAddress, config.getBacklog());
            }
        }
    

    总结

    ​ 服务端启动核心路径总结:

    服务端启动核心路径.png

    首先调用服务端的newChannel()创建服务端channel,这个过程实际上就是调用JDK底层的API来创建一个JDK channel,然后Netty将其包装成自己的服务端的channel,同时会创建一些基本的组件绑定在此channel上(比喻:pipeline)。然后调用init()来初始化服务端channel,这个过程最重要的就是为服务端的channel添加一个连接处理器。随后调用register()方法注册selector,这个过程NettyJDK底层生成的channel注册到selector上,最后调用bind()方法通过jdk底层的API将端口号绑定。来实现,绑定之后,nettyselector绑定一个OP_ACCEPT事件,然后selector就可以接收绑定其他channel了。

    相关文章

      网友评论

          本文标题:1_Netty源码分析之Netty服务端启动

          本文链接:https://www.haomeiwen.com/subject/evjppqtx.html