美文网首页程序员我爱编程
高性能网络通信框架Netty-Netty客户端底层与Java N

高性能网络通信框架Netty-Netty客户端底层与Java N

作者: 阿里加多 | 来源:发表于2018-06-04 12:22 被阅读341次

    5.1 Netty客户端底层与Java NIO对应关系

    在讲解Netty客户端程序时候我们提到指定NioSocketChannel用于创建客户端NIO套接字通道的实例,下面我们来看NioSocketChannel是如何创建一个Java NIO里面的SocketChannel的。

    首先我们来看NioSocketChannel的构造函数:

        public NioSocketChannel() {
            this(DEFAULT_SELECTOR_PROVIDER);
        }
    

    其中DEFAULT_SELECTOR_PROVIDER定义如下:

        private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
    

    然后继续看

    //这里的provider为DEFAULT_SELECTOR_PROVIDER
        public NioSocketChannel(SelectorProvider provider) {
            this(newSocket(provider));
        }
    

    其中newSocket代码如下:

        private static SocketChannel newSocket(SelectorProvider provider) {
            try {
                return provider.openSocketChannel();
            } catch (IOException e) {
                throw new ChannelException("Failed to open a socket.", e);
            }
        }
    

    所以NioSocketChannel内部是管理一个客户端的SocketChannel的,这个SocketChannel就是讲Java NIO时候的SocketChannel,也就是创建NioSocketChannel实例对象时候相当于执行了Java NIO中:

    SocketChannel socketChannel = SocketChannel.open();
    

    另外在NioSocketChannel的父类AbstractNioChannel的构造函数里面默认会记录队op_read事件感兴趣,这个后面当链接完成后会使用到:

    
        protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
            super(parent, ch, SelectionKey.OP_READ);
        }
    

    另外在NioSocketChannel的父类AbstractNioChannel的构造函数里面设置了该套接字为非阻塞的

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
            super(parent);
            this.ch = ch;
            this.readInterestOp = readInterestOp;
            try {
                ch.configureBlocking(false);
            } catch (IOException e) {
             ...
            }
        }
    

    下面我们看Netty里面是哪里创建的NioSocketChannel实例,哪里注册到选择器的。
    下面我们看下Bootstrap的connect操作代码:

        public ChannelFuture connect(InetAddress inetHost, int inetPort) {
            return connect(new InetSocketAddress(inetHost, inetPort));
        }
    

    类似Java NIO传递了一个InetSocketAddress对象用来记录服务端ip和端口:

        public ChannelFuture connect(SocketAddress remoteAddress) {
           ...
            return doResolveAndConnect(remoteAddress, config.localAddress());
        }
    

    下面我们看下doResolveAndConnect的代码:

    private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
            //(1)
            final ChannelFuture regFuture = initAndRegister();
            final Channel channel = regFuture.channel();
    
            if (regFuture.isDone()) {
                if (!regFuture.isSuccess()) {
                    return regFuture;
                }
                 //(2)
                return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
            } 
                ...
           }
    }
    

    首先我们来看代码(1)initAndRegister:

         final ChannelFuture initAndRegister() {
            Channel channel = null;
            try {
                //(1.1)
                channel = channelFactory.newChannel();
                //(1.2)
                init(channel);
            } catch (Throwable t) {
                ...
            }
            //(1.3)
            ChannelFuture regFuture = config().group().register(channel);
            if (regFuture.cause() != null) {
                if (channel.isRegistered()) {
                    channel.close();
                } else {
                    channel.unsafe().closeForcibly();
                }
            }
    }
    

    其中(1.1)作用就是创建一个NioSocketChannel的实例,代码(1.2)是具体设置内部套接字的选项的。

    代码(1.3)则是具体注册客户端套接字到选择器的,其首先会调用NioEventLoop的register方法,最后调用NioSocketChannelUnsafe的register方法:

    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
                ...
                AbstractChannel.this.eventLoop = eventLoop;
    
                if (eventLoop.inEventLoop()) {
                    register0(promise);
                } else {
                    try {
                        eventLoop.execute(new Runnable() {
                            @Override
                            public void run() {
                                register0(promise);
                            }
                        });
                    } catch (Throwable t) {
                       ...
                    }
                }
     }
    

    其中 register0内部调用doRegister,其代码如下:

     protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                    //注册客户端socket到当前eventloop的selector上
                    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                    return;
                } catch (CancelledKeyException e) {
                   ...
                }
            }
     }
    

    到这里代码(1)initAndRegister的流程讲解完毕了,下面我们来看代码(2)的

    
      public final void connect(
                    final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
              ...
                try {
                    ...
    
                    boolean wasActive = isActive();
                    if (doConnect(remoteAddress, localAddress)) {
                        fulfillConnectPromise(promise, wasActive);
                    } else {
                      。。。
                    }
                } catch (Throwable t) {
                   ...
                }
      }
    

    其中doConnect代码如下:

        protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
           ...
            boolean success = false;
            try {
                //2.1
                boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
                //2.2
                if (!connected) {
                    selectionKey().interestOps(SelectionKey.OP_CONNECT);
                }
                success = true;
                return connected;
            } finally {
                if (!success) {
                    doClose();
                }
            }
        }
    

    其中2.1具体调用客户端套接字的connect方法,等价于Java NIO里面的。
    代码2.2 由于connect 方法是异步的,所以类似JavaNIO调用connect方法进行判断,如果当前没有完成链接则设置对op_connect感兴趣。

    最后一个点就是何处进行的从选择器获取就绪的事件的,具体是在该客户端套接关联的NioEventLoop里面的做的,每个NioEventLoop里面有一个线程用来循环从选择器里面获取就绪的事件,然后进行处理:

     protected void run() {
            for (;;) {
                try {
                    ...
                   select(wakenUp.getAndSet(false));
                    ...
                    processSelectedKeys();
                    ...
                } catch (Throwable t) {
                    handleLoopException(t);
                }
                ...
            }
        }
    

    其中select代码如下:

     private void select(boolean oldWakenUp) throws IOException {
            Selector selector = this.selector;
            try {
                ...
                for (;;) {
                    ...
                    int selectedKeys = selector.select(timeoutMillis);
                    selectCnt ++;
    
                  ...
            } catch (CancelledKeyException e) {
                ...
            }
     }
    

    可知会从选择器选取就绪的事件,其中processSelectedKeys代码如下:

     private void processSelectedKeys() {
            ...
         processSelectedKeysPlain(selector.selectedKeys());
            ...
        }
    

    可知会获取已经就绪的事件集合,然后交给processSelectedKeysPlain处理,后者循环调用processSelectedKey具体处理每个事件,代码如下:

     private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
           ...
            try {
                //(3)如果是op_connect事件
                int readyOps = k.readyOps();
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
                    //3.1
                    unsafe.finishConnect();
                }
                //4
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    ch.unsafe().forceFlush();
                }
                //5
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }
    

    代码(3)如果当前事件key为op_connect则去掉op_connect,然后调用NioSocketChannel的doFinishConnect:

        protected void doFinishConnect() throws Exception {
            if (!javaChannel().finishConnect()) {
                throw new Error();
            }
        }
    

    可知是调用了客户端套接字的finishConnect方法,最后会调用NioSocketChannel的doBeginRead方法设置对op_read事件感兴趣:

        protected void doBeginRead() throws Exception {
           ...
            final int interestOps = selectionKey.interestOps();
            if ((interestOps & readInterestOp) == 0) {
                selectionKey.interestOps(interestOps | readInterestOp);
            }
        }
    

    这里interestOps为op_read,上面在讲解NioSocketChannel的构造函数时候提到过。

    代码(5)如果当前是op_accept事件说明是服务器监听套接字获取到了一个链接套接字,如果是op_read,则说明可以读取客户端发来的数据了,如果是后者则会激活管线里面的所有handler的channelRead方法,这里会激活我们自定义的NettyClientHandler的channelRead读取客户端发来的数据,然后在向客户端写入数据。

    5.2 总结

    本节讲解了Netty客户端底层如何使用Java NIO进行实现的,可见与我们前面讲解的Java NIO设计的客户端代码步骤是一致的,只是netty对其进行了封装,方便了我们使用,了解了这些对深入研究netty源码提供了一个骨架指导。

    想了解JDK NIO和更多Netty基础的可以单击我
    更多关于分布式系统中服务降级策略的知识可以单击 单击我
    想系统学dubbo的单击我
    想学并发的童鞋可以 单击我

    image.png

    相关文章

      网友评论

        本文标题:高性能网络通信框架Netty-Netty客户端底层与Java N

        本文链接:https://www.haomeiwen.com/subject/eniesftx.html