美文网首页Netty 权威指南笔记专题收藏
Netty 权威指南笔记(六):Channel 解读

Netty 权威指南笔记(六):Channel 解读

作者: albon | 来源:发表于2017-11-06 09:49 被阅读308次

    Netty 权威指南笔记(六):Channel 解读

    《Netty 权威指南》书上使用的源码是 Netty5 的,但是 Netty5 已经被废弃了,所以本文是参照 Netty4.1 的源码解读的。

    JDK 的 NIO 类库中,提供了 SocketChannel 和 ServerSocketChannel 用于非阻塞 I/O 操作。类似于 NIO 的 Channel,Netty 提供了自己的 Channel 和其子类实现。

    功能说明

    io.netty.channel.Channel 是 Netty 的网络操作抽象类,聚合了一组功能,包括但不限于网络读写、客户端发起连接、主动关闭连接,同时也包含了 Netty 框架相关的一些功能,包括获取 Channel 的 EventLoop,获取缓冲区分配器 ByteBufAllocator 和 pipeline 等。

    为了 Netty 不使用 NIO 的原生 Channel,而是要另起炉灶呢?主要原因如下:

    1. JDK 的 SocketChannel 和 ServerSocketChannel 没有统一的 Channel 接口供业务开发者使用。对用户而言,没有统一的操作视图,使用起来不方便。
    2. JDK 的 SocketChannel 和 ServerSocketChannel 是 SPI 类接口,通过继承来扩展很不方便,不如开发一个新的。
    3. Netty 的 Channel 需要能跟 Netty 架构融合在一起。
    4. 自定义 Channel 功能实现会更灵活。

    基于以上原因,Netty 重新设计了 Channel,其主要设计理念如下:

    1. 在 Channel 接口层,采用 Facade 模式统一封装,将网络 I/O 操作、网络 I/O 相关联的其他操作封装起来,统一对外提供。
    2. Channel 接口定义尽量大而全,为 SocketChannel 和 ServerSocketChannel 提供统一的视图,由不同子类实现不同的功能,公共功能在抽象父类中实现,最大程度上实现功能和接口的重用。
    3. 具体实现采用聚合而非包含的方式,Channel 负责统一分配和调度,更加灵活。

    Netty 的 Channel 都有哪些功能呢?

    1. 常见的网络 IO 操作:读、写、绑定端口、连接、关闭连接等。
    2. 获取 EventLoop。
    3. 获取 parent Channel,对于服务端 SocketChannel 来说,parent 就是创建它的 ServerSocketChannel。
    4. 唯一标志 id。
    5. 元数据 metadata,获取 TCP 参数配置等。

    源码分析

    继承关系类图

    Netty4 Channel 类图

    NioServerSocketChannel、NioSocketChannel 两者都继承了 Channel、AbstractChannel、AbstractNioChannel。

    AbstractChannel

    主要成员变量如下所示:

    1. 父 Channel。
    2. 全局唯一 id。
    3. Unsafe 实例。
    4. 当前 Channel 对应的 DefaultChannelPipeline。
    5. EventLoop。
    6. 本地和远程地址。
        private final Channel parent;
        private final ChannelId id;
        private final Unsafe unsafe;
        private final DefaultChannelPipeline pipeline;
        private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
        private final CloseFuture closeFuture = new CloseFuture(this);
    
        private volatile SocketAddress localAddress;
        private volatile SocketAddress remoteAddress;
        private volatile EventLoop eventLoop;
        private volatile boolean registered;
        private boolean closeInitiated;
    
        /** Cache for the string representation of this channel */
        private boolean strValActive;
        private String strVal;
    

    AbstractChannel 中的网络 I/O 操作都是调用 pipeline 中的对应方法,继而由 pipeline 调用 ChannelHandler 进行处理。

        @Override
        public ChannelFuture bind(SocketAddress localAddress) {
            return pipeline.bind(localAddress);
        }
    
        @Override
        public ChannelFuture connect(SocketAddress remoteAddress) {
            return pipeline.connect(remoteAddress);
        }
    
        @Override
        public ChannelFuture write(Object msg) {
            return pipeline.write(msg);
        }
    

    AbstractNioChannel

    主要成员变量有:

    1. SelectableChannel:这是一个 Java NIO SocketChannel 和 ServerSocketChannel 的公共父类,放在这里是因为 AbstractNioChannel 也是 NioSocketChannel 和 NioServerSocketChannel 的公共父类。
    2. readInterestOp:代表 JDK SelectionKey 的 OP_READ。
    3. SelectionKey:Channel 注册到 EventLoop(Selector)时返回的 key,修改它可以改变感兴趣的事件。
    4. connectPromise:代表连接操作结果。
    5. connectTimeoutFuture:连接超时定时器。
    6. requestedRemoteAddress:connect 时的远程地址。
        private final SelectableChannel ch;
        protected final int readInterestOp;
        volatile SelectionKey selectionKey;
        boolean readPending;
    
        /**
         * The future of the current connection attempt.  If not null, subsequent
         * connection attempts will fail.
         */
        private ChannelPromise connectPromise;
        private ScheduledFuture<?> connectTimeoutFuture;
        private SocketAddress requestedRemoteAddress;
    

    AbstractNioChannel 类里比较重要的方法是 doRegister,该方法负责将 Channel 注册到多路复用器 Selector。

        @Override
        protected void doRegister() throws Exception {
            boolean selected = false;
            for (;;) {
                try {
                    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                    return;
                } catch (CancelledKeyException e) {
                    if (!selected) {
                        // Force the Selector to select now as the "canceled" SelectionKey may still be
                        // cached and not removed because no Select.select(..) operation was called yet.
                        eventLoop().selectNow();
                        selected = true;
                    } else {
                        // We forced a select operation on the selector before but the SelectionKey is still cached
                        // for whatever reason. JDK bug ?
                        throw e;
                    }
                }
            }
        }
    

    在 doRegister 方法中,对 ops 字段设置为 0,也就是对任何事件都不感兴趣。真正的设置读操作位是在 doBeginRead 方法中,那么写操作位在何时设置呢?当然是有数据要写,而缓冲区满(或其他不能立即写)的情况。

        @Override
        protected void doBeginRead() throws Exception {
            // Channel.read() or ChannelHandlerContext.read() was called
            final SelectionKey selectionKey = this.selectionKey;
            if (!selectionKey.isValid()) {
                return;
            }
    
            readPending = true;
    
            final int interestOps = selectionKey.interestOps();
            if ((interestOps & readInterestOp) == 0) {
                selectionKey.interestOps(interestOps | readInterestOp);
            }
        }
    

    AbstractNioByteChannel

    AbstractNioByteChannel 是 NioSocketChannel 的父类,只有一个成员变量 flushTask,负责写半包消息。

        private Runnable flushTask;
    

    最主要的方法是 doWrite:

        @Override
        protected void doWrite(ChannelOutboundBuffer in) throws Exception {
            int writeSpinCount = -1;
    
            boolean setOpWrite = false;
            for (;;) {
                Object msg = in.current();
                // 如果没有要写的数据,就清除写标志位,并返回
                if (msg == null) {
                    // Wrote all messages.
                    clearOpWrite();
                    // Directly return here so incompleteWrite(...) is not called.
                    return;
                }
    
                // 对于 ByteBuf 类型、FileRegion 类型分开处理,其他未知类型抛异常
                if (msg instanceof ByteBuf) {
                    ByteBuf buf = (ByteBuf) msg;
                    int readableBytes = buf.readableBytes();
                    if (readableBytes == 0) {
                        in.remove();
                        continue;
                    }
    
                    boolean done = false;
                    long flushedAmount = 0;
                    if (writeSpinCount == -1) {
                        writeSpinCount = config().getWriteSpinCount();
                    }
                    // 只循环写 writeSpinCount 次,为了避免写大块儿数据时,阻塞其他线程过长时间
                    for (int i = writeSpinCount - 1; i >= 0; i --) {
                        int localFlushedAmount = doWriteBytes(buf);
                        // 返回 0 表示写缓冲区满,setOpWrite 为 true 会设置 SelectionKey 的写标志位,在可写时会得到通知。
                        if (localFlushedAmount == 0) {
                            setOpWrite = true;
                            break;
                        }
    
                        flushedAmount += localFlushedAmount;
                        if (!buf.isReadable()) {
                            done = true;
                            break;
                        }
                    }
    
                    in.progress(flushedAmount);
    
                    if (done) {
                        in.remove();
                    } else {
                        // Break the loop and so incompleteWrite(...) is called.
                        break;
                    }
                } else if (msg instanceof FileRegion) {
                    FileRegion region = (FileRegion) msg;
                    boolean done = region.transferred() >= region.count();
    
                    if (!done) {
                        long flushedAmount = 0;
                        if (writeSpinCount == -1) {
                            writeSpinCount = config().getWriteSpinCount();
                        }
    
                        for (int i = writeSpinCount - 1; i >= 0; i--) {
                            long localFlushedAmount = doWriteFileRegion(region);
                            if (localFlushedAmount == 0) {
                                setOpWrite = true;
                                break;
                            }
    
                            flushedAmount += localFlushedAmount;
                            if (region.transferred() >= region.count()) {
                                done = true;
                                break;
                            }
                        }
    
                        in.progress(flushedAmount);
                    }
    
                    if (done) {
                        in.remove();
                    } else {
                        // Break the loop and so incompleteWrite(...) is called.
                        break;
                    }
                } else {
                    // Should not reach here.
                    throw new Error();
                }
            }
            incompleteWrite(setOpWrite);
        }
        // 走到这里,说明还有数据没有发送完毕,需要进一步处理
        protected final void incompleteWrite(boolean setOpWrite) {
            // setOpWrite 为 true,设置 SelectionKey 写标志位
            if (setOpWrite) {
                setOpWrite();
            } else {
                // 否则,启动 flushTask 继续写半包消息
                Runnable flushTask = this.flushTask;
                if (flushTask == null) {
                    flushTask = this.flushTask = new Runnable() {
                        @Override
                        public void run() {
                            flush();
                        }
                    };
                }
                eventLoop().execute(flushTask);
            }
        }
    

    AbstractNioMessageChannel

    AbstractNioMessageChannel 是 NioServerSocketChannel、NioDatagramChannel 的父类。其主要方法也是 doWrite,功能和 AbstractNioByteChannel 的 doWrite 也类似,区别只是后者只处理 ByteBuf 和 FileRegion,前者无此限制,处理所有 Object。

        protected void doWrite(ChannelOutboundBuffer in) throws Exception {
            final SelectionKey key = selectionKey();
            final int interestOps = key.interestOps();
    
            for (;;) {
                Object msg = in.current();
                if (msg == null) {
                    // Wrote all messages.
                    if ((interestOps & SelectionKey.OP_WRITE) != 0) {
                        key.interestOps(interestOps & ~SelectionKey.OP_WRITE);
                    }
                    break;
                }
                try {
                    boolean done = false;
                    for (int i = config().getWriteSpinCount() - 1; i >= 0; i--) {
                        if (doWriteMessage(msg, in)) {
                            done = true;
                            break;
                        }
                    }
    
                    if (done) {
                        in.remove();
                    } else {
                        // Did not write all messages.
                        if ((interestOps & SelectionKey.OP_WRITE) == 0) {
                            key.interestOps(interestOps | SelectionKey.OP_WRITE);
                        }
                        break;
                    }
                } catch (Exception e) {
                    if (continueOnWriteError()) {
                        in.remove(e);
                    } else {
                        throw e;
                    }
                }
            }
        }
        // 处理 msg,由子类实现
        protected abstract boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception;
    

    doWriteMessage 方法在 NioServerSocketChannel 中实现如下所示,是因为 NioServerSocketChannel 只是用来监听端口,接收客户端请求,不负责传输实际数据。

        protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
            throw new UnsupportedOperationException();
        }
    

    doWriteMessage 方法在 NioSctpChannel 中是由具体实现的,从代码中可以看出来,它处理的只是 SctpMessage 类型的数据。

        protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
            SctpMessage packet = (SctpMessage) msg;
            ByteBuf data = packet.content();
            int dataLen = data.readableBytes();
            if (dataLen == 0) {
                return true;
            }
    
            ByteBufAllocator alloc = alloc();
            boolean needsCopy = data.nioBufferCount() != 1;
            if (!needsCopy) {
                if (!data.isDirect() && alloc.isDirectBufferPooled()) {
                    needsCopy = true;
                }
            }
            ByteBuffer nioData;
            if (!needsCopy) {
                nioData = data.nioBuffer();
            } else {
                data = alloc.directBuffer(dataLen).writeBytes(data);
                nioData = data.nioBuffer();
            }
            final MessageInfo mi = MessageInfo.createOutgoing(association(), null, packet.streamIdentifier());
            mi.payloadProtocolID(packet.protocolIdentifier());
            mi.streamNumber(packet.streamIdentifier());
            mi.unordered(packet.isUnordered());
    
            // 写数据
            final int writtenBytes = javaChannel().send(nioData, mi);
            return writtenBytes > 0;
        }
    

    NioServerSocketChannel

    NioServerSocketChannel 是服务端 Channel 的实现类,有一个用于配置 TCP 参数的 ServerSocketChannelConfig。

        private final ServerSocketChannelConfig config;
    

    作为服务端 Channel,其核心方法是端口绑定 doBind 方法、创建 SocketChannel 的 doReadMessages 方法。

        protected void doBind(SocketAddress localAddress) throws Exception {
            if (PlatformDependent.javaVersion() >= 7) {
                javaChannel().bind(localAddress, config.getBacklog());
            } else {
                javaChannel().socket().bind(localAddress, config.getBacklog());
            }
        }
    
        protected int doReadMessages(List<Object> buf) throws Exception {
            SocketChannel ch = SocketUtils.accept(javaChannel());
    
            try {
                if (ch != null) {
                    buf.add(new NioSocketChannel(this, ch));
                    return 1;
                }
            } catch (Throwable t) {
                logger.warn("Failed to create a new channel from an accepted socket.", t);
    
                try {
                    ch.close();
                } catch (Throwable t2) {
                    logger.warn("Failed to close a socket.", t2);
                }
            }
    
            return 0;
        }
    

    对于和服务端 Channel 无关的方法,要果断抛出 UnsupportedOperationException 异常。

       @Override
        protected void doDisconnect() throws Exception {
            throw new UnsupportedOperationException();
        }
    
        @Override
        protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {
            throw new UnsupportedOperationException();
        }
    
        @Override
        protected final Object filterOutboundMessage(Object msg) throws Exception {
            throw new UnsupportedOperationException();
        }
    

    NioSocketChannel

    NioSocketChannel 是客户端 Channel 的实现类,也是只有一个用于配置参数的变量 SocketChannelConfig。

        private final SocketChannelConfig config;
    

    客户端 Channel 的核心方法有连接 doConnect、写半包 doWrite、读操作 doReadBytes,下面我们挨个分析。

    连接操作 doConnect 具体实现如下:

    1. 如果 localAddress 为空,则进行绑定操作。
    2. 调用 socketChannel.connect 进行连接。
    3. 如果连接尚未完成,则注册 OP_CONNECT 事件。
    4. 如果连接失败抛出异常,也要调用 doClose 关闭连接。
        protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
            if (localAddress != null) {
                doBind0(localAddress);
            }
    
            boolean success = false;
            try {
                // 实际上是调用了 socketChannel.connect 方法。
                boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
                if (!connected) {
                    selectionKey().interestOps(SelectionKey.OP_CONNECT);
                }
                success = true;
                return connected;
            } finally {
                if (!success) {
                    doClose();
                }
            }
        }
    

    写操作 doWrite 具体实现如下:

    1. 判断待写数据大小,若为 0 则清除写标志位,并返回。
    2. 从 ChannelOutboundBuffer 里获取待写 ByteBuffer 数组,和待写 ByteBuffer 数量 nioBufferCnt。
    3. 针对 nioBufferCnt 的不同大小进行了区别处理。
    4. 如果 nioBufferCnt 为 0,则调用父类的方法处理,以防有除了 ByteBuffer 之外的数据需要写。
    5. nioBufferCnt 为 1 和大于 1 的处理类似,都是循环写 getWriteSpinCount 次,若写完则结束,未写完则设置后续写半包的方式。这一点和父类 AbstractNioByteChannel 中的处理方法类似。
        protected void doWrite(ChannelOutboundBuffer in) throws Exception {
            for (;;) {
                int size = in.size();
                if (size == 0) {
                    // All written so clear OP_WRITE
                    clearOpWrite();
                    break;
                }
                long writtenBytes = 0;
                boolean done = false;
                boolean setOpWrite = false;
    
                // Ensure the pending writes are made of ByteBufs only.
                ByteBuffer[] nioBuffers = in.nioBuffers();
                int nioBufferCnt = in.nioBufferCount();
                long expectedWrittenBytes = in.nioBufferSize();
                SocketChannel ch = javaChannel();
    
                // Always us nioBuffers() to workaround data-corruption.
                switch (nioBufferCnt) {
                    case 0:
                        // We have something else beside ByteBuffers to write so fallback to normal writes.
                        super.doWrite(in);
                        return;
                    case 1:
                        // 和 default 的区别只是传给 ch.write 的是数组还是单个 ByteBuffer
                        ByteBuffer nioBuffer = nioBuffers[0];
                        for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                            final int localWrittenBytes = ch.write(nioBuffer);
                            if (localWrittenBytes == 0) {
                                setOpWrite = true;
                                break;
                            }
                            expectedWrittenBytes -= localWrittenBytes;
                            writtenBytes += localWrittenBytes;
                            if (expectedWrittenBytes == 0) {
                                done = true;
                                break;
                            }
                        }
                        break;
                    default:
                        for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {
                            final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                            if (localWrittenBytes == 0) {
                                setOpWrite = true;
                                break;
                            }
                            expectedWrittenBytes -= localWrittenBytes;
                            writtenBytes += localWrittenBytes;
                            // expectedWrittenBytes 为 0 表示数据发送完毕
                            if (expectedWrittenBytes == 0) {
                                done = true;
                                break;
                            }
                        }
                        break;
                }
    
                // Release the fully written buffers, and update the indexes of the partially written buffer.
                in.removeBytes(writtenBytes);
    
                if (!done) {
                    // 设置后续写半包方式
                    incompleteWrite(setOpWrite);
                    break;
                }
            }
        }
    

    读操作比较简单,主要是通过 ByteBuf 来从 Channel 中读取数据。

        protected int doReadBytes(ByteBuf byteBuf) throws Exception {
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.attemptedBytesRead(byteBuf.writableBytes());
            return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
        }
    

    总结

    Channel 类体系的设计与其实现功能密不可分,父类中实现的是子类共同的功能。在多层次的抽象类中,每一个层次的抽象类负责实现一种功能。

    当父类提供大而全的接口时,父类可以根据需要去实现,不需要的可以抛出 UnsupportedOperationException 异常。

    相关文章

      网友评论

        本文标题:Netty 权威指南笔记(六):Channel 解读

        本文链接:https://www.haomeiwen.com/subject/xxikmxtx.html