美文网首页
2-netty源码分析之Client

2-netty源码分析之Client

作者: 致虑 | 来源:发表于2018-09-20 11:27 被阅读0次

    2-netty源码分析之Client

    其实记录netty客户端的启动过程基本跟server端相似,无非是借助Bootstrap组装组件,然后通过connect发起连接。前面的步骤不多记录,看server启动过程基本了解。

    那么这里主要记录下connect相关细节,以及client发起connect后,server是如何处理的。
    依然从demo出发:

    EventLoopGroup group = new NioEventLoopGroup();
    try {
        Bootstrap b = new Bootstrap();
        b.group(group)
         .channel(NioSocketChannel.class)
         .option(ChannelOption.TCP_NODELAY, true)
         .handler(new ChannelInitializer<SocketChannel>() {
             @Override
             public void initChannel(SocketChannel ch) throws Exception {
                 ChannelPipeline p = ch.pipeline();
                 if (sslCtx != null) {
                     p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                 }
                 p.addLast(new EchoClientHandler());
             }
         });
    
        // Start the client.
        ChannelFuture f = b.connect(HOST, PORT).sync();
    
        // Wait until the connection is closed.
        f.channel().closeFuture().sync();
    } finally {
        // Shut down the event loop to terminate all threads.
        group.shutdownGracefully();
    }
    

    可以看出前半部分基本一致,那就直接从下面这行核心代码出发:

    ChannelFuture f = b.connect(HOST, PORT).sync();
    

    1.connect分析
    private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
    
        final ChannelPromise promise = channel.newPromise();
        if (regFuture.isDone()) {
            doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
        } else {
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
                }
            });
        }
    
        return promise;
    }
    

    其实一眼看去做了这几件事:

    • 1.初始化并且注册channel,跟服务端类似,也就SocketChannel不一样等差别
    • 2.设置异步回调
    • 3.channel注册OK之后,执行doConnect操作,也就是doConnect0方法,这一步通过注册监听回调完成。

    那么ChannelFuture何时出发operationComplete方法呢。从initAndRegister跟踪看看。


    image.png

    启动eventLoop线程执行register逻辑


    image.png

    跟着debug走到这里进去继续跟踪对于promise的相关设置


    image.png

    看到了promise.trySuccess()方法,继续走:


    image.png image.png

    看到了notify方法体,


    image.png

    OK,再走一步 触发回调的逻辑就出现了:

    private static void notifyListener0(Future future, GenericFutureListener l) {
        try {
            l.operationComplete(future);
        } catch (Throwable t) {
            logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
        }
    }
    

    到这里紧接着就是执行开始设置的监听器里的doConnect逻辑了


    image.png

    ,OK回调详细调用链很简单,一步步debug下去自然就很清楚。
    无非是:

    • 1.将用户执行的线程转化为EventLoop线程
    • 2.执行channel register操作
    • 3.设置DefaultPromise属性,执行回调方法

    2.doConnect
    /** 发起连接就在这里啦 */
    private static void doConnect0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    
        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
    
        /** 注意:这里是eventLoop线程 */
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    if (localAddress == null) {
                        channel.connect(remoteAddress, promise);
                    } else {
                        channel.connect(remoteAddress, localAddress, promise);
                    }
                    promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }
    

    可以看到这里,一罐的做法,将真正执行发起连接的操作扔进任务队列,交由EventLoop去执行。此处没有设置localAdress,那么紧接着就进入到一下:


    image.png

    到这里,出现了比较熟悉的piepline,pipeline暂时不详细分析,后面一章会专门记录。那么这里就是交给管道去处理,其实可以猜想就是交给piepline中的handler处理:


    image.png

    果然如此,找到piepline链路的末节点tail处理执行,那么究竟这个connect具体是那个handler处理的呢?继续:

    public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    
        ...
        
        /**
         * 调用 findContextOutbound 方法, 从 DefaultChannelPipeline 内的双向链表的 tail 开始,
         * 不断向前寻找第一个 outbound 为 true 的 AbstractChannelHandlerContext, 然后调用它的 invokeConnect 方法
         *
         * 在 DefaultChannelPipeline 的构造器中, 会实例化两个对象: head 和 tail, 并形成了双向链表的头和尾.
         * head 是 HeadContext 的实例, 它实现了 ChannelOutboundHandler 接口, 并且它的 outbound 字段为 true.
         * 因此在 findContextOutbound 中, 找到的 AbstractChannelHandlerContext 对象其实就是 head.
         * 进而在 invokeConnect 方法中, 我们向上转换为 ChannelOutboundHandler --> invokeConnect 方法
         */
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeConnect(remoteAddress, localAddress, promise);
        } else {
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeConnect(remoteAddress, localAddress, promise);
                }
            }, promise, null);
        }
        return promise;
    }
    

    其实可以看到tail节点只不过是connect任务流转的第一个节点,这里会通过findContextOutbound去寻找第一个outbound为true的handler,然后由其去执行invokerConnect方法。还记的之前分析server时讲过:数据从外部流入对应的是inbound,相反则是outbound么.那么这里发起连接找得处理器必然是outbound对应的处理节点,SO 继续跟踪。


    image.png

    可以看到这里找到的是head节点,看下head类的描述:

    final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler
    

    果然继承了ChannelOutboundHandler,那么找到它就合理了。那么很自然最终执行connect就会进入HeadContext的connect方法了:

    @Override
    public void connect(
            ChannelHandlerContext ctx,
            SocketAddress remoteAddress, SocketAddress localAddress,
            ChannelPromise promise) throws Exception {
    
        /**  AbstractNioByteChannel.NioByteUnsafe */
        unsafe.connect(remoteAddress, localAddress, promise);
    }
    

    最终还不是具体的执行connect,交给了UnSafe去操作,其实这里也很合理,还记得之前分析Server时说过,netty最终与jdk nio底层打交道的所有事件,都是交给了UnSafe去操作的,那么这里就很合理这个设计了。
    看看这里的UnSafe具体是哪个:


    image.png

    这里了是NioSocketChannelUnsafe,这里直接没有覆盖父类AbstractNioUnsafe的connect方法,那么下一步就直接进入了AbstractNioUnsafe的connect了,分析一下:

    public final void connect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
    
        try {
            if (connectPromise != null) {
                // Already a connect in process.
                throw new ConnectionPendingException();
            }
    
            boolean wasActive = isActive();
    
            /** doConnect 子类实现 */
            if (doConnect(remoteAddress, localAddress)) {
                fulfillConnectPromise(promise, wasActive);
            } else {
                ...
            }
        } catch (Throwable t) {
            promise.tryFailure(annotateConnectException(t, remoteAddress));
            closeIfClosed();
        }
    }
    
    public boolean isActive() {
        SocketChannel ch = javaChannel();
        return ch.isOpen() && ch.isConnected();
    }
    

    我们这是刚启动clinet,那么这里的isActive根据代码就知道返回false,可以看到UnSafe调用了外部类AbstractNioChannel的doConnect方法,而这里doConnect的具体执行逻辑交由具体子类实现,很自然走到了NioSocketChannel(服务端对应的是NioServerSocketChannel).

    @Override
    protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        if (localAddress != null) {
            doBind0(localAddress);
        }
    
        boolean success = false;
        try {
            boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
            if (!connected) {
                selectionKey().interestOps(SelectionKey.OP_CONNECT);
            }
            success = true;
            return connected;
        } finally {
            if (!success) {
                doClose();
            }
        }
    }
    

    SocketUtils:

    /**
     * 从 NioSocketChannel.newSocket 返回的 SocketChannel 对象;
     * 然后是调用 SocketChannel.connect 方法完成 Java NIO 层面上的 Socket 的连接.
     */
    public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)
            throws IOException {
        try {
            return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
                @Override
                public Boolean run() throws IOException {
                    return socketChannel.connect(remoteAddress);
                }
            });
        } catch (PrivilegedActionException e) {
            throw (IOException) e.getCause();
        }
    }
    

    完成了jdk socket 的connect触发动作,连接发起也就算OK了,发起连接之后,如果此时连接尚未真正建立,设置interestOps为OP_CONNECT,这样就可以监听连接动作了,完成最终操作。

    既然client已经发起了连接请求,那么接下来就要分析server如何处理连接请求了。


    3.连接接入处理

    其实这里的主要工作停留在server端,但是这个会直接决定整个client启动的成果,那么我们将server服务先跑起来,debug停留在reactor轮训那里,然后启动client发起请求。

    那么服务端是怎么处理新连接的呢?主要如下:

    • 1.轮训出新连接
    • 2.处理新连接,其实这里会交给workGroup处理
    • 3.注册读事件

    前面说把debug停留在轮训处,还记得服务端轮训的diamante段吗,直接里面有这样一个逻辑:

    processSelectedKeys();
    
    private void processSelectedKeys() {
        if (selectedKeys != null) {
            /** 处理优化过的selectedKeys */
            processSelectedKeysOptimized();
        } else {
            /** 正常的处理 */
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
    
    /**
     *  迭代 selectedKeys 获取就绪的 IO 事件, 然后为每个事件都调用 processSelectedKey 来处理它.
     *
     *  1.对于boss NioEventLoop来说,轮询到的是基本上就是连接事件,后续的事情就通过他的pipeline将连接扔给一个worker NioEventLoop处理
     *  2.对于worker NioEventLoop来说,轮询到的基本上都是io读写事件,后续的事情就是通过他的pipeline将读取到的字节流传递给每个channelHandler来处理
     *
     *
     *  netty的reactor线程第二步做的事情为处理IO事件,netty使用数组替换掉jdk原生的HashSet来保证IO事件的高效处理,
     *  每个SelectionKey上绑定了netty类AbstractChannel对象作为attachment,在处理每个SelectionKey的时候,
     *  就可以找到AbstractChannel,然后通过pipeline的方式将处理串行到ChannelHandler,回调到用户方法
     */
    private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
    
            /** 1.取出IO事件以及对应的channel */
            final SelectionKey k = selectedKeys.keys[i];
            
            /** 取出后将数组置为空 */
            selectedKeys.keys[i] = null;
    
            final Object a = k.attachment();
    
            /** 处理该channel */
            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
    
            /**
             * 判断是否该再来次轮询
             * 也就是说,对于每个NioEventLoop而言,每隔256个channel从selector上移除的时候,就标记 needsToSelectAgain 为true,我们还是跳回到上面这段代码
             */
            if (needsToSelectAgain) {
                // See https://github.com/netty/netty/issues/2363
                /** 将selectedKeys的内部数组全部清空 */
                selectedKeys.reset(i + 1);
    
                /** 重新调用selectAgain重新填装一下 selectionKey */
                selectAgain();
                i = -1;
            }
        }
    }
    
    /**
     * processSelectedKey 中处理了三个事件, 分别是:
    
     * 1.OP_READ, 可读事件, 即 Channel 中收到了新数据可供上层读取.
     * 2.OP_WRITE, 可写事件, 即上层可以向 Channel 写入数据.
     * 3.OP_CONNECT, 连接建立事件, 即 TCP 连接已经建立, Channel 处于 active 状态.
     * 4.OP_ACCEPT,请求连接事件
     */
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                return;
            }
    
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }
    
        try {
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            /**
             * 事件是 OP_CONNECT, 即 TCP 连接已建立事件.
             * 1.我们需要将 OP_CONNECT 从就绪事件集中清除, 不然会一直有 OP_CONNECT 事件.
             * 2.调用 unsafe.finishConnect() 通知上层连接已建立
             *  */
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);
    
                /**
                 * unsafe.finishConnect() 调用最后会调用到 pipeline().
                 * fireChannelActive(), 产生一个 inbound 事件, 通知 pipeline 中的各个 handler TCP 通道已建立(即 ChannelInboundHandler.channelActive 方法会被调用)
                 */
                unsafe.finishConnect();
            }
    
            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
    
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            /** boos reactor处理新的连接   或者 worker reactor 处理 已存在的连接有数据可读 */
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    
                /** AbstractNioByteChannel中实现,重点 */
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
    

    我们将上面各种事件处打上断点,启动client,看看第一次发起连接会以何种事件进入轮训处:


    image.png

    可以看到客户端第一次发情连接,会触发服务端Accept事件,紧接着会触发UnSafe的读处理,那么为什么是Accept,后面将解释;我们继续:
    服务端对应的UnSafe是NioMessageUnsafe,前面已经跟踪过,那么这里自然会进入下面逻辑:

    @Override
            public void read() {
                assert eventLoop().inEventLoop();
                final ChannelConfig config = config();
                ...
                /** 拿到对应channel的pipeline */
                final ChannelPipeline pipeline = pipeline();
                boolean closed = false;
                Throwable exception = null;
                try {
                    try {
                        for (;;) {
    
                            /** 读取一个连接,委托到外部类NioSocketChannel */
                            int localRead = doReadMessages(readBuf);
                            ...
                        }
                    } catch (Throwable t) {
                        exception = t;
                    }
                    setReadPending(false);
                    int size = readBuf.size();
                    for (int i = 0; i < size; i ++) {
                        /** 每条新连接丢给服务端的channel */
                        pipeline.fireChannelRead(readBuf.get(i));
                    }
    
                    /** 清理资源 */
                    readBuf.clear();
                    pipeline.fireChannelReadComplete();
    
                    if (exception != null) {
                        closed = closeOnReadError(exception);
                        pipeline.fireExceptionCaught(exception);
                    }
    
                    if (closed) {
                        if (isOpen()) {
                            close(voidPromise());
                        }
                    }
                } finally {
                    ...
                }
            }
        }
    

    上面贴出比较核心的逻辑,看到这里:

    int localRead = doReadMessages(readBuf);

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
    
        /** 获取到客户端新连接的 SocketChannel , jdk底层操作,返回jdk底层nio创建的一条channel */
        SocketChannel ch = SocketUtils.accept(javaChannel());
    
        try {
            if (ch != null) {
                /** 将jdk的 SocketChannel 封装成自定义的 NioSocketChannel ,加到list */
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);
    
            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }
        return 0;
    }
    

    读操作很简单,NioMessageUnsafe调用, 因为NioMessageUnsafe是与新连接相关, 因此就是调用jdk的accept()方法,新建立一条连接,同时将jdk的 SocketChannel 封装成自定义的 NioSocketChannel;同时跟踪到NioSocketChannel的构造器里会发现此时会对SelectionKey.OP_READ进行注册,即该channel监听事件是 SelectionKey.OP_READ

    这段代码翻译一下:

    • 1.当一个 client 连接到 server 时, Java 底层的 NIO ServerSocketChannel 会有一个 SelectionKey.OP_ACCEPT 就绪事件, 接着就会调用到 這裡

    • 2.接着就实例化一个 NioSocketChannel, 并且传入 NioServerSocketChannel 对象(即 this), 由此可知, 我们创建的这个 NioSocketChannel 的父 Channel 就是 NioServerSocketChannel 实例 .

    • 3.接下来就经由 Netty 的 ChannelPipeline 机制, 将读取事件逐级发送到各个 handler 中, 于是就会触发前面我们提到的 ServerBootstrapAcceptor.channelRead 方法啦.

    其实总结下就是doReadMessages 方法不断地读取连接,封装成NioSocketChannel放入 readBuf 容器,然后调用 pipeline.fireChannelRead(),将每条新连接丢给服务端的channel

    OK,我们读取连接后,继续跟踪后面一个重要的逻辑

    pipeline.fireChannelRead(readBuf.get(i));

    image.png

    可以看到,这里会调用AbstractChannelHandlerContext的invokeChannelRead方法。
    这里对server启动做个回顾,方便介绍这里的调用链,我们在启动服务端的时候,piepiline里面的链路为:

    head-->ServerBootstrapAcceptor-->tail链路,

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
    
            /** 这里的handler返回的是主.handler(new LoggingHandler(LogLevel.INFO)) 中的handler*/
            ChannelHandler handler = handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
    
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    /** 这里的 childGroup.register 就是将 workerGroup 中的某个 EventLoop 和 NioSocketChannel 关联 */
                    pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
    

    就是上面这段代码。那么这里的invokeChannelRead会通过head往下流转,会进入ServerBootstrapAcceptor的channelRead方法,跟踪一下:


    image.png

    几经流转,确实到了这里:

    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        /** 将msg转换成对应的channel */
        final Channel child = (Channel) msg;
    
        /** 添加用户自定义的childHandler */
        child.pipeline().addLast(childHandler);
    
        /** 设置 NioSocketChannel 对应的 attr和option */
        setChannelOptions(child, childOptions, logger);
    
        for (Entry<AttributeKey<?>, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }
    
        try {
    
            /** 将 workerGroup 中的某个 EventLoop 和 NioSocketChannel 关联 */
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }
    

    那这个逻辑做了什么呢?

    • 1.将msg转换成对应的channel
    • 2.添加用户自定义的childHandler
    • 3.将 workerGroup 中的某个 EventLoop 和 NioSocketChannel 关联

    好像已经看到了workerGroup的身影,我们在server中强调过,server启动过程只包含bossGroup的启动,不涉及workerGroup的启动,那这里是不是就是workGroup的启动呢?
    我们继续:
    第二点说的childHandler就是server里的这段逻辑

    .childHandler(new ChannelInitializer<SocketChannel>() {
         @Override
         public void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline p = ch.pipeline();
             if (sslCtx != null) {
                 p.addLast(sslCtx.newHandler(ch.alloc()));
             }
             p.addLast(new EchoServerHandler());
         }
     });
    

    记住,这里的channel是NioSocketChannel,不是我们启动服务端的那个NioServerSocketChannel了。
    那么到这里,NioSocketChannel绑定的piepiline就是:

    head-->ChannelInitializer-->tail

    继续划重点:

    childGroup.register(child)

    很相似的一段代码,注册channel到group,不就是将一个指定的NioSocketChannel绑定到具体的EvenyLoop么,不也就是在WorkGroup线程池中选着一个去维护NioSocketChannel的相关事件么,好像是很明了,咱们继续看看register做了什么:

    MultithreadEventLoopGroup#register方法:

    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
    public EventExecutor next() {
        return chooser.next();
    }
    

    首先next方法就是调用执行器选择一个具体的EventLoop,紧接着用这个选择的EventLoop执行register操作。继续跟踪:

    @Override
    public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
        if (channel == null) {
            throw new NullPointerException("channel");
        }
        if (promise == null) {
            throw new NullPointerException("promise");
        }
    
        /** 获取 channel 的 unsafe() 底层操作对象, 然后调用它的 register. */
        channel.unsafe().register(this, promise);
        return promise;
    }
    
    image.png

    可以看到这里的UnSafe是NioSocketChannelUnSafe类型,那么直接进入AbstractChannel#AbstractUnsafe的register方法:

    public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        ...
        /** 将eventLoop与channel绑定 */
        AbstractChannel.this.eventLoop = eventLoop;
    
        if (eventLoop.inEventLoop()) {
            register0(promise);
        } else {
            try {
                eventLoop.execute(new Runnable() {
                    @Override
                    public void run() {
                        register0(promise);
                    }
                });
            } catch (Throwable t) {
                ...
            }
        }
    }
    

    嗯哼,核心点依然出现:

    • 1.将eventLoop与channel绑定
    • 2.通过eventLoop.execute方式启动线程
    • 3.以任务式完成注册
    @Override
        protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                /** 将 Channel 对应的 Java NIO SockerChannel 注册到一个 eventLoop 的 Selector 中, 并且将当前 Channel 作为 attachment. */
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                ...
            }
        }
    }
    

    可以看到channel与selector进行了绑定,即selector对该channel进行轮训事件,并且所有该channel的事件都交由绑定的EventLoop执行。继续:

    pipeline.invokeHandlerAddedIfNeeded();

    这一步很熟悉哈,最终会触发啥可想而知:


    image.png

    此时才将用户自定义的handler装车进入piepline,那么这个时候的piepline会是这样:

    head->EchoServerHandler->tail

    继续


    image.png

    可以看到已经建立了连接,那么后面会干什么呢?看到下面就知道又是从head开始传递事件:

    /** 为啥pipeline.fireChannelActive();最终会调用到AbstractNioChannel.doBeginRead(),了解pipeline中的事件传播机制 */
    @Override
    public final ChannelPipeline fireChannelActive() {
        /** 三次握手成功之后,pipeline.fireChannelActive();被调用,然后以head节点为参数,直接一个静态调用 */
        AbstractChannelHandlerContext.invokeChannelActive(head);
        return this;
    }
    

    active传递之后就会进入readIfIsAutoRead,又是一轮piepline传递,继续

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    
        readIfIsAutoRead();
    }
    
    image.png

    进入到了AbstractChannel#AbstractUnsafe的beginRead

    public final void beginRead() {
        assertEventLoop();
        if (!isActive()) {
            return;
        }
    
        try {
            doBeginRead();
        } catch (final Exception e) {
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    pipeline.fireExceptionCaught(e);
                }
            });
            close(voidPromise());
        }
    }
    

    可以看到注册了readInterestOp即SelectionKey.OP_READ事件,将 SelectionKey.OP_READ事件注册到selector中去,表示这条通道已经可以开始处理read事件了,也代表着客户端整个接入过程完毕。


    4.总结
    • 1.客户端发起连接
    • 2.服务端reactor主线程轮训accept事件
    • 3.建立jdk底层的socketChannel 并封装成自己的NioSocketChannel,NioSocketChannel一旦建立,代表连接建立
    • 4.将NioSocketChannel绑定到指定的work EventLoop,并注册到指定的selector上
    • 5.注册读事件,开启处理
    • 6.完毕

    相关文章

      网友评论

          本文标题:2-netty源码分析之Client

          本文链接:https://www.haomeiwen.com/subject/vpbgnftx.html