美文网首页
5. Netty解析:connect/bind方法背后

5. Netty解析:connect/bind方法背后

作者: 饿了就下楼 | 来源:发表于2020-02-12 11:12 被阅读0次

    前言

       在之前的文章中,我们已经知道了netty中channel创建及注册:这个过程是connect方法(client端)或者bind方法(server端)所做的第一件事,体现在initAndRegister方法中,在这之后还需要完成一些操作以实现connect。我们先从client端开始。

        private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
            final ChannelFuture regFuture = initAndRegister();
            final Channel channel = regFuture.channel();
            if (regFuture.cause() != null) {
                return regFuture;
            }
    
            final ChannelPromise promise = channel.newPromise();
    
            if (regFuture.isDone()) {
                doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
            } else {
                regFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
                    }
                });
            }
    
            return promise;
        }
    

    客户端connect

      initAndRegister会返回一个ChannelFuture对象,注册逻辑会提交给对应EventLoop来异步的执行,而通过这个ChannelFuture实例我们就可以判断异步任务的执行状态。由于是异步任务,所以它是否已经执行完毕不得知,所以通过ChannelFuture判断任务(注册任务)是否执行完毕,如果没有执行完毕就为其添加一个监听回调,回调时机发生在任务结束。当任务完成后,开始执行doConnect0方法。并返回一个新的ChannelFuture实例,顺便提一下通过这里的regFuture和promise,我们也可以看出netty中存在大量的异步处理方式。

        private static void doConnect0(
                final ChannelFuture regFuture, final Channel channel,
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    
            // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
            // the pipeline in its channelRegistered() implementation.
            channel.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    if (regFuture.isSuccess()) {
                        if (localAddress == null) {
                            channel.connect(remoteAddress, promise);
                        } else {
                            channel.connect(remoteAddress, localAddress, promise);
                        }
                        promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                    } else {
                        promise.setFailure(regFuture.cause());
                    }
                }
            });
        }
    
    

      通过代码,我们看到,通道的连接操作又是作为一个异步任务交于channel所注册的EventLoop来执行,前提条件是注册任务必须已经成功完成了。在客户端,一般没有执行localAddress,所以我们继续跟踪channel.connect(remoteAddress, promise),发现,channel的connect操作由pipeline来实现,这次与之前不同的是,它调用了connect操作,完成出站处理器在流水线上的执行,与入站从头开始不同,出站操作connect是从尾部开始的。与入站相似,会依次找到下一个出站处理器,回调其中的connect方法(这里大家可以调试看一下,不在赘述),最终pipeline的流程会到达头结点

        @Override
        public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
            return pipeline.connect(remoteAddress, promise);
        }
    
    
        @Override
        public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
            return tail.connect(remoteAddress, promise);
        }
    
    

    ↓头结点负责完成客户端连接的代码↓

        @Override
        public void connect(
                ChannelHandlerContext ctx,
                SocketAddress remoteAddress, SocketAddress localAddress,
                ChannelPromise promise) throws Exception {
            unsafe.connect(remoteAddress, localAddress, promise);
        }
    

      在头结点中,调用了一个unsafe实例的connect方法。重点关注doConnect方法。

        @Override
        public final void connect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
                /*忽略*/
    
                boolean wasActive = isActive();
                if (doConnect(remoteAddress, localAddress)) {
                    fulfillConnectPromise(promise, wasActive);
                } else {
                    /*忽略*/
                }
            } catch (Throwable t) {
                promise.tryFailure(annotateConnectException(t, remoteAddress));
                closeIfClosed();
            }
        }
    
    
        // NioSocketChannel类中
    
        @Override
        protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
            if (localAddress != null) {
                doBind0(localAddress);
            }
    
            boolean success = false;
            try {
                boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
                if (!connected) {
                    selectionKey().interestOps(SelectionKey.OP_CONNECT);
                }
                success = true;
                return connected;
            } finally {
                if (!success) {
                    doClose();
                }
            }
        }
    
    
        public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)
                throws IOException {
            try {
                return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
                    @Override
                    public Boolean run() throws IOException {
                        return socketChannel.connect(remoteAddress);
                    }
                });
            } catch (PrivilegedActionException e) {
                throw (IOException) e.getCause();
            }
        }
    
    
    

      通过SocketUtils的connect方法,我们可以看到,底层借助NIO的SocketChannel进行连接。而由于连接不会立即成功,所以一般不会返回true,因此connected为false,则会执行下面这行代码,注册NIO连接事件

    selectionKey().interestOps(SelectionKey.OP_CONNECT);

      由于配置了连接事件,所以当底层连接建立好之后,后续的逻辑处理在哪里呢?还记得NioEventLoop里面的run方法吧。代码在这里再贴一下。

        @Override
        protected void run() {
            for (;;) {
                try {
                    switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.SELECT:
                            select(wakenUp.getAndSet(false));
                            if (wakenUp.get()) {
                                selector.wakeup();
                            }
                            // fall through
                        default:
                    }
    
                    cancelledKeys = 0;
                    needsToSelectAgain = false;
                    final int ioRatio = this.ioRatio;
                    if (ioRatio == 100) {
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            runAllTasks();
                        }
                    } else {
                        final long ioStartTime = System.nanoTime();
                        try {
                            processSelectedKeys();
                        } finally {
                            // Ensure we always run tasks.
                            final long ioTime = System.nanoTime() - ioStartTime;
                            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
                // Always handle shutdown even if the loop processing threw an exception.
                try {
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
        }
    

    当连接建立好后,会通过processSelectedKeys方法处理连接事件。最终会执行到这样一段在之前见到过的代码。

        private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final NioUnsafe unsafe = ch.unsafe();
            if (!k.isValid()) {
                final EventLoop eventLoop;
                try {
                    eventLoop = ch.eventLoop();
                } catch (Throwable ignored) {
                 
                    return;
                }
                if (eventLoop != this || eventLoop == null) {
                    return;
                }
                unsafe.close(unsafe.voidPromise());
                return;
            }
    
            try {
                int readyOps = k.readyOps();
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
    
                    unsafe.finishConnect();
                }
    
                // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                    ch.unsafe().forceFlush();
                }
               if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }
    
    

    当接收到连接事件时会取消掉连接事件的注册。随后调用了unsafe.finishConnect()完成连接后的处理,finishConnect中调用了fulfillConnectPromise(connectPromise, wasActive)方法。

        private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
            if (promise == null) {
                // Closed via cancellation and the promise has been notified already.
                return;
            }
    
            // 当连接建立后,底层的socketChannl打开并建立好连接,active返回为true
            boolean active = isActive();
    
            // 修改异步执行状态
            boolean promiseSet = promise.trySuccess();
            if (!wasActive && active) {
                // 流水线从头逐个回调入站的channelActive方法。
                pipeline().fireChannelActive();
            }
    
            // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
            if (!promiseSet) {
                close(voidPromise());
            }
        }
    
    

    随后,pipeline().fireChannelActive()就开始从流水线头部回调channelActive方法。

       // 头部节点HeadContext的channelActive方法。
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelActive();
    
            readIfIsAutoRead();
        }
    

    头部节点会首先让流水线上的channelActive回调继续下去(在Echo Server这个例子中,EchoClientHandler的channelActive方法也会执行),当所有的channelActive回调完成后,调用readIfIsAutoRead方法从流水线尾部开始逐个回调read方法(这里省略了一些步骤,大家可以自行查看)。最终read回调又会到达头结点。

        @Override
        public void read(ChannelHandlerContext ctx) {
            unsafe.beginRead();
        }
    
        @Override
        public final void beginRead() {
            assertEventLoop();
    
            if (!isActive()) {
                return;
            }
    
            try {
                doBeginRead();
            } catch (final Exception e) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireExceptionCaught(e);
                    }
                });
                close(voidPromise());
            }
        }
    
    
        @Override
        protected void doBeginRead() throws Exception {
            // Channel.read() or ChannelHandlerContext.read() was called
            if (inputShutdown) {
                return;
            }
    
            final SelectionKey selectionKey = this.selectionKey;
            if (!selectionKey.isValid()) {
                return;
            }
    
            readPending = true;
    
            final int interestOps = selectionKey.interestOps();
            if ((interestOps & readInterestOp) == 0) {
                selectionKey.interestOps(interestOps | readInterestOp);
            }
        }
    

    在头部节点调用了unsafe.beginRead(),随后又调用doBeginRead,可以发现,在doBeginRead中,注册了readInterestOp事件。而readInterestOp所代表的的事件就是在生成channel时传入的读事件。因此在这里是完成了读事件的注册

    服务端bind

      分析了客户端后,服务端也就比较好去分析了。服务端在bind执行后,会先去调用initAndRegister完成NioServerSocketChannel向父循环组中的时间循环的注册,但是再注册的时候并没有注册有效的事件。注册后依次经历下面几个方法:doBind0 --> channel.bind --> pipeline.bind。pipeline的bind方法又会从尾部依次调用流水线上的出站处理器bind回调方法,一直延续到头结点。头结点又调用unsafe.bind()。在unsafe.bind()中,doBind借助serverSocketChannel.bind方法完成绑定。绑定操作就此结束。随后如同客户端在借助SocketChannel完成connect后会发出pipeline.fireChannelActive()一样,server端在绑定结束后也会进行流水线上channelActive的回调。回调从头结点开始,这就跟client端很相似。但不同之处在于,客户端的头结点在fireChannelRead后的readIfIsAutoRead会将读事件注册,而在server端,由于在创建NioServerSocketChannel时传入的readInterestOp为accept事件,因此在通道激活active后,为NioServerSocketChannel中的ServerSocketChannel注册了接受连接Accept事件。

    总结

      我们综合前面的文章以及本文,来总结一下connect和bind方法背后的逻辑。两者首先都进行了通道(NioSocketChannel或NioServerSocketChannel)的创建和注册,注册的过程只是把其中封装的SocketChannel或者ServerSocketChannel注册到对应的NioEventLoop的selector中,并没有实际注册什么有效事件。当通道完成注册后,添加到流水线上的handler的handlerAdded方法才会被回调(而通道注册完成后,再向流水线添加handler时,其handlerAdded方法会立即回调)。随后流水线调用fireChannelRegistered。当具体通道的连接或者绑定操作完成后,流水线又会调用fireChannelActive方法,表明通道已经激活。通道激活并且channelActive回调都执行完成后,客户端注册了读事件而服务端注册了accept事件。
      

    *链接

    1. Netty解析:第一个demo——Echo Server
    2. Netty解析:NioEventLoopGroup事件循环组
    3. Netty解析:NioSocketChannel、NioServerSocketChannel的创建及注册
    4. Netty解析:Handler、Pipeline大动脉及其在注册过程中体现
    5. Netty解析:connect/bind方法背后
    6. Netty解析:服务端如何接受连接并后续处理读写事件

    相关文章

      网友评论

          本文标题:5. Netty解析:connect/bind方法背后

          本文链接:https://www.haomeiwen.com/subject/tbshnctx.html