Netty中Channel与Unsafe源码解读

作者: 良辰美景TT | 来源:发表于2018-07-20 13:49 被阅读10次

      Channel是netty网络操作抽象类,包括网络的读,写,链路关闭,发起连接等。我们拿出NioServerSocketChannel来进行分析,NioServerSocketChannel的类图如下所示:


    image.png

    Netty使用了聚合的方使来实现channel的功能,先看看看AbstractChannel 都聚合了那些功能,源码如下,关键代码会加上相应的注释:

    public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
    //这是channel的父channel:所谓的父channel是指处理读写的channel有个连接的父channel
        private final Channel parent;
    // id表示channel的唯一标识
        private final ChannelId id;
    //unsafe类里实现了具体的连接与写数据,之所以命名为unsafe是不希望外部使用,并非是不安全的
        private final Unsafe unsafe;
    //管理channelHandler的pipeline。
        private final DefaultChannelPipeline pipeline;
    
        private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
        private final CloseFuture closeFuture = new CloseFuture(this);
    //地址信息
        private volatile SocketAddress localAddress;
        private volatile SocketAddress remoteAddress;
    //eventLoop就是react线程对象啦
        private volatile EventLoop eventLoop;
        private volatile boolean registered;
        private boolean closeInitiated;
    
        /** Cache for the string representation of this channel */
        private boolean strValActive;
        private String strVal;
    
    //可以看出下面的方法都会触发pipeline的链路调用
        @Override
        public ChannelFuture bind(SocketAddress localAddress) {
            return pipeline.bind(localAddress);
        }
    
        @Override
        public ChannelFuture connect(SocketAddress remoteAddress) {
            return pipeline.connect(remoteAddress);
        }
    
        @Override
        public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
            return pipeline.connect(remoteAddress, localAddress);
        }
    
        @Override
        public ChannelFuture disconnect() {
            return pipeline.disconnect();
        }
    
        @Override
        public ChannelFuture close() {
            return pipeline.close();
        }
    
       @Override
        public Channel read() {
            pipeline.read();
            return this;
        }
    
        @Override
        public ChannelFuture write(Object msg) {
            return pipeline.write(msg);
        }
    
        @Override
        public ChannelFuture write(Object msg, ChannelPromise promise) {
            return pipeline.write(msg, promise);
        }
    
    }
    

    AbstractNioChannel 集成了 JDK内部的channel, SelectionKey。 这个类里实现了将Channel注册到对应的Selector上。源码如下:

    public abstract class AbstractNioChannel extends AbstractChannel {
    //jdk内部的 channel对象
        private final SelectableChannel ch;
    //设置读操作位,
        protected final int readInterestOp;
    // 这里定义了selectionKey,里面包含了发生了的事件
        volatile SelectionKey selectionKey;
    // promise相关
        private ChannelPromise connectPromise;
        private ScheduledFuture<?> connectTimeoutFuture;
    //请求的地址信息
        private SocketAddress requestedRemoteAddress;
    
    //构造方法,需要传入下面几个参数
        protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
            super(parent);
            this.ch = ch;
            this.readInterestOp = readInterestOp;
            try {
                ch.configureBlocking(false);
            } catch (IOException e) {
                try {
                    ch.close();
                } catch (IOException e2) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(
                                "Failed to close a partially initialized socket.", e2);
                    }
                }
    
                throw new ChannelException("Failed to enter non-blocking mode.", e);
            }
        }
    
    //通过SelectionKey 设置注册读事件
        @Override
        protected void doBeginRead() throws Exception {
            // Channel.read() or ChannelHandlerContext.read() was called
            final SelectionKey selectionKey = this.selectionKey;
            if (!selectionKey.isValid()) {
                return;
            }
    
            readPending = true;
    
            final int interestOps = selectionKey.interestOps();
            if ((interestOps & readInterestOp) == 0) {
                selectionKey.interestOps(interestOps | readInterestOp);
            }
        }
    
    //将channel注册到 Selector上 
        @Override
        protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                    return;
                } catch (CancelledKeyException e) {
                    if (!selected) {
                        // Force the Selector to select now as the "canceled" SelectionKey may still be
                        // cached and not removed because no Select.select(..) operation was called yet.
                        eventLoop().selectNow();
                        selected = true;
                    } else {
                        // We forced a select operation on the selector before but the SelectionKey is still cached
                        // for whatever reason. JDK bug ?
                        throw e;
                    }
                }
            }
        }
    
    }
    

    NioServerSocketChannel 提供了主要功能是绑定某个端口与接收client端的连接,关键源码如下:

    public class NioServerSocketChannel extends AbstractNioMessageChannel
                                 implements io.netty.channel.socket.ServerSocketChannel {
    
    //这个方法属属于NioServerSocketChannel 类方法 调用provider生成jdk底层的ServerSocketChannel 对象 
        private static ServerSocketChannel newSocket(SelectorProvider provider) {
            try {
                /**
                 *  Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
                 *  {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
                 *
                 *  See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
                 */
                return provider.openServerSocketChannel();
            } catch (IOException e) {
                throw new ChannelException(
                        "Failed to open a server socket.", e);
            }
        }
    
    //执行bind操作
        @Override
        protected void doBind(SocketAddress localAddress) throws Exception {
            if (PlatformDependent.javaVersion() >= 7) {
    //javaChannel()方法返回的实际上就是上一个方法生成的ServerSocketChannel 对象
                javaChannel().bind(localAddress, config.getBacklog());
            } else {
                javaChannel().socket().bind(localAddress, config.getBacklog());
            }
        }
    
    //接收客户端的请求会调用到的方法
        @Override
        protected int doReadMessages(List<Object> buf) throws Exception {
    //调用java底层的accept方法
            SocketChannel ch = SocketUtils.accept(javaChannel());
    
            try {
                if (ch != null) {
    //可以看出这里新建了个NioSocketChannel对象,并把当前对象 当成新创建对象的父对象
                    buf.add(new NioSocketChannel(this, ch));
                    return 1;
                }
            } catch (Throwable t) {
                logger.warn("Failed to create a new channel from an accepted socket.", t);
    
                try {
                    ch.close();
                } catch (Throwable t2) {
                    logger.warn("Failed to close a socket.", t2);
                }
            }
    
            return 0;
        }
    
    //将channel注册到Selector上
        @Override
        protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                    return;
                } catch (CancelledKeyException e) {
                    if (!selected) {
                        // Force the Selector to select now as the "canceled" SelectionKey may still be
                        // cached and not removed because no Select.select(..) operation was called yet.
                        eventLoop().selectNow();
                        selected = true;
                    } else {
                        // We forced a select operation on the selector before but the SelectionKey is still cached
                        // for whatever reason. JDK bug ?
                        throw e;
                    }
                }
            }
        }
    
    
    }
    

    从上面源码上分析Netty在设计Channel这个对象的时候主要做了如下几个功能:

    • 对JDK自带Channel进行了包装, 内部持有JDK Channel的引用
    • Channel对象作为中间层,读写操作会触发pipeline调用链。内部通过unsafe对象实现读写的调用。

    Unsafe对象

      Unsafe对象作为channel的内部类,承担着channel网络相关的功能,比如具体的读写操作。其中AbstractUnsafe是AbstractChannel的内部类,部分源码如下:

     protected abstract class AbstractUnsafe implements Unsafe {
    
    //下面是绑定方法的逻辑 传入的是SocketAddress  
            @Override
            public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    // 确认当前channel已经注册
                assertEventLoop();
    
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
     //验证传入的参数
                // See: https://github.com/netty/netty/issues/576
                if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
                    localAddress instanceof InetSocketAddress &&
                    !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
                    !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
                    // Warn a user about the fact that a non-root user can't receive a
                    // broadcast packet on *nix if the socket is bound on non-wildcard address.
                    logger.warn(
                            "A non-root user can't receive a broadcast packet if the socket " +
                            "is not bound to a wildcard address; binding to a non-wildcard " +
                            "address (" + localAddress + ") anyway as requested.");
                }
    
                boolean wasActive = isActive();
                try {
    //具体的绑定操作在doBind方法里执行 这个方法是channel的方法,也就是我们在上面NioServerSocketChannel 里分析的逻辑
                    doBind(localAddress);
                } catch (Throwable t) {
                    safeSetFailure(promise, t);
                    closeIfClosed();
                    return;
                }
    //触发 active事件,在pipline链里传播
                if (!wasActive && isActive()) {
                    invokeLater(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.fireChannelActive();
                        }
                    });
                }
    
                safeSetSuccess(promise);
            }
    
    //往外写数据的操作(这里只往缓存里写数据)
            @Override
            public final void write(Object msg, ChannelPromise promise) {
    //验证是否已经注册并且react线程是否已经准备好
                assertEventLoop();
    //ChannelOutboundBuffer  表示要往外写数据的缓存
                ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                if (outboundBuffer == null) {
                    // If the outboundBuffer is null we know the channel was closed and so
                    // need to fail the future right away. If it is not null the handling of the rest
                    // will be done in flush0()
                    // See https://github.com/netty/netty/issues/2362
                    safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
                    // release message now to prevent resource-leak
                    ReferenceCountUtil.release(msg);
                    return;
                }
    
                int size;
                try {
    //对需要写的数据进行过滤
                    msg = filterOutboundMessage(msg);
    //对需要写的数据进行大小的预估
                    size = pipeline.estimatorHandle().size(msg);
                    if (size < 0) {
                        size = 0;
                    }
                } catch (Throwable t) {
                    safeSetFailure(promise, t);
                    ReferenceCountUtil.release(msg);
                    return;
                }
    //将数据增加到缓存中
                outboundBuffer.addMessage(msg, size, promise);
            }
    
    // flush方法用于将数据写入到网络中
            @Override
            public final void flush() {
                assertEventLoop();
    //往外发送的缓存对象不能为空
                ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                if (outboundBuffer == null) {
                    return;
                }
    
                outboundBuffer.addFlush();
                flush0();
            }
    
    //最终会调用到这个方法
            @SuppressWarnings("deprecation")
            protected void flush0() {
                if (inFlush0) {
                    // Avoid re-entrance
                    return;
                }
    
                final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                if (outboundBuffer == null || outboundBuffer.isEmpty()) {
                    return;
                }
    
                inFlush0 = true;
    //对channel的状态进行验证
                // Mark all pending write requests as failure if the channel is inactive.
                if (!isActive()) {
                    try {
                        if (isOpen()) {
                            outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
                        } else {
                            // Do not trigger channelWritabilityChanged because the channel is closed already.
                            outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                        }
                    } finally {
                        inFlush0 = false;
                    }
                    return;
                }
    
                try {
    //会调用到Channel的doWrite方法,具体实现的源码可以看NioSocketChannel
                    doWrite(outboundBuffer);
                } catch (Throwable t) {
                    if (t instanceof IOException && config().isAutoClose()) {
                        /**
                         * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
                         * failing all flushed messages and also ensure the actual close of the underlying transport
                         * will happen before the promises are notified.
                         *
                         * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
                         * may still return {@code true} even if the channel should be closed as result of the exception.
                         */
                        close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                    } else {
                        try {
                            shutdownOutput(voidPromise(), t);
                        } catch (Throwable t2) {
                            close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                        }
                    }
                } finally {
                    inFlush0 = false;
                }
            }
    
    //开始读方法的逻辑
            @Override
            public final void beginRead() {
                assertEventLoop();
    
                if (!isActive()) {
                    return;
                }
    
                try {
    //会调用channel的doBeginRead方法 可以看上面AbstractNioChannel方法里的注释 
                    doBeginRead();
                } catch (final Exception e) {
                    invokeLater(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.fireExceptionCaught(e);
                        }
                    });
                    close(voidPromise());
                }
            }
    
    //注册方法,channel会往selector里注册关注的事件
            @Override
            public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    //验证数据
                if (eventLoop == null) {
                    throw new NullPointerException("eventLoop");
                }
                if (isRegistered()) {
                    promise.setFailure(new IllegalStateException("registered to an event loop already"));
                    return;
                }
                if (!isCompatible(eventLoop)) {
                    promise.setFailure(
                            new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                    return;
                }
    
                AbstractChannel.this.eventLoop = eventLoop;
    
                if (eventLoop.inEventLoop()) {
    //调用下面的方法
                    register0(promise);
                } else {
                    try {
                        eventLoop.execute(new Runnable() {
                            @Override
                            public void run() {
                                register0(promise);
                            }
                        });
                    } catch (Throwable t) {
                        logger.warn(
                                "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                                AbstractChannel.this, t);
                        closeForcibly();
                        closeFuture.setClosed();
                        safeSetFailure(promise, t);
                    }
                }
            }
    
            private void register0(ChannelPromise promise) {
                try {
                    // check if the channel is still open as it could be closed in the mean time when the register
                    // call was outside of the eventLoop
                    if (!promise.setUncancellable() || !ensureOpen(promise)) {
                        return;
                    }
                    boolean firstRegistration = neverRegistered;
    //会调用到channel里的方法 可以参考上面的AbstractNioChannel类
                    doRegister();
                    neverRegistered = false;
                    registered = true;
    
                    // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                    // user may already fire events through the pipeline in the ChannelFutureListener.
                    pipeline.invokeHandlerAddedIfNeeded();
    
                    safeSetSuccess(promise);
                    pipeline.fireChannelRegistered();
                    // Only fire a channelActive if the channel has never been registered. This prevents firing
                    // multiple channel actives if the channel is deregistered and re-registered.
                    if (isActive()) {
                        if (firstRegistration) {
                            pipeline.fireChannelActive();
                        } else if (config().isAutoRead()) {
                            // This channel was registered before and autoRead() is set. This means we need to begin read
                            // again so that we process inbound data.
                            //
                            // See https://github.com/netty/netty/issues/4805
                            beginRead();
                        }
                    }
                } catch (Throwable t) {
                    // Close the channel directly to avoid FD leak.
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
    
    }
    

    Netty服务端接收客户端数据的调用流程

    读数据的调用流程

      NioEventLoop的run方法会监控SelectionKey对象,当有读事件时,会调用unsafe对象的read()方法,在read方法的逻辑里会触发pipeline对象链的调用,最终调用到设置的各种ChannelHandler

    写数据的调用流程

      通过Channel的writeAndFlush会调用到pipeline的writeAndFlush方法里,在pipeline的调用链里会调用链中的各种ChannelHandler(各以对需要写入的数据进行格式转换)最终通过HeadContext的write方法调用到unsafe里的write逻辑。这里只是把数据写入到ByteBuffer里。通过调用unsafe的flash方法才能最终将数据写入到网络中,也就是上面的分析过程。

    相关文章

      网友评论

        本文标题:Netty中Channel与Unsafe源码解读

        本文链接:https://www.haomeiwen.com/subject/sywzpftx.html