美文网首页java 成神之路
Netty 之 AbstractChannel 和 Abstra

Netty 之 AbstractChannel 和 Abstra

作者: jijs | 来源:发表于2019-01-27 22:51 被阅读50次

    概述

    channel 是 netty 网络 IO 操作抽象出来的一个接口,主要功能有:网络IO的读写,客户端发起连接、主动关闭连接,链路关闭,获取通信双方的网络地址等。

    这里我们只简单分析下 AbstractChannel 和 AbstractNioChannel 这两个Channel源码。

    AbstractChannel 源码

    public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
        // 父 Channel(NioServerSocketChannel 是没有父channel的)
        private final Channel parent;
        // Channel 唯一ID
        private final ChannelId id;
        // Unsafe 对象,封装 ByteBuf 的读写操作
        private final Unsafe unsafe;
        // 关联的 Pipeline 对象
        private final DefaultChannelPipeline pipeline;
        
        private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
        private final CloseFuture closeFuture = new CloseFuture(this);
        // 本地地址和远端地址
        private volatile SocketAddress localAddress;
        private volatile SocketAddress remoteAddress;
        // EventLoop 封装的 Selector
        private volatile EventLoop eventLoop;
        // 是否注册
        private volatile boolean registered;
        private boolean closeInitiated;
    
        /** Cache for the string representation of this channel */
        private boolean strValActive;
        private String strVal;
    

    AbstractChannel 构造

    protected AbstractChannel(Channel parent, ChannelId id) {
        this.parent = parent;
        this.id = id;
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
    
    // Unsafe 实现交给子类实现
    protected abstract AbstractUnsafe newUnsafe();
    
    // 创建 DefaultChannelPipeline 对象
    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
    

    Unsafe类里实现了具体的连接与写数据。比如:网络的读,写,链路关闭,发起连接等。之所以命名为unsafe是不希望外部使用,并非是不安全的。

    DefaultChannelPipeline 只是一个 Handler 的容器,也可以理解为一个Handler链,具体的逻辑由Handler处理,而每个Handler都会分配一个EventLoop,最终的请求还是要EventLoop来执行,而EventLoop中又调用Channel中的内部类Unsafe对应的方法。
    新建一个channel会自动创建一个ChannelPipeline。

    这里创建 DefaultChannelPipeline,构造中传入当前的 Channel,而读写数据都是在 ChannelPipeline 中进行的,ChannelPipeline 进行读写数据又委托给 Channel 中的 Unsafe 进行操作。

    AbstractUnsafe 类结构


    从类结构中可以看出 Unsafe 实现了 网络的读,写,连接关闭,发起连接等操作。

    AbstractNioChannel 源码

    public abstract class AbstractNioChannel extends AbstractChannel {
    
        // 抽象了 SocketChannel 和 ServerSocketChannel 的公共的父类
        private final SelectableChannel ch;
        // SelectionKey.OP_READ 读事件
        protected final int readInterestOp;
        // 注册到 selector 上返回的 selectorKey
        volatile SelectionKey selectionKey;
        // 是否还有未读的数据
        boolean readPending;
        private final Runnable clearReadPendingRunnable = new Runnable() {
            @Override
            public void run() {
                clearReadPending0();
            }
        };
    
        /**
         * The future of the current connection attempt.  If not null, subsequent
         * connection attempts will fail.
         */
        // 连接操作的结果
        private ChannelPromise connectPromise;
        // 连接超时定时任务
        private ScheduledFuture<?> connectTimeoutFuture;
        // 客户端地址
        private SocketAddress requestedRemoteAddress;
       
        ....
    

    SelectableChannel 抽象了 java.nio.SocketChannel 和 java.nio.ServerSocketChannel 的公共方法。netty 封装了 NIO 的channel。

    注册 Channel

    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 这里注册的事件为0
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }
    

    javaChannel() :是 Nio 中的 channel。
    eventLoop().unwrappedSelector(): 是 Nio 中的 selector。

    在java.nio.channels.SelectionKey 类中定义了 4种事件。

    // 读操作事件
    public static final int OP_READ = 1 << 0;
    // 写操作事件
    public static final int OP_WRITE = 1 << 2;
    // 客户端链接服务的操作事件
    public static final int OP_CONNECT = 1 << 3;
    // 服务端接受客户端连接事件
    public static final int OP_ACCEPT = 1 << 4;
    

    这里注册的是0,说明对任何事件都不感兴趣,仅仅完成注册操作。把当前的 Channel当做附件进行注册。如果注册成功则返回 selectionKey,通过 selectionKey 可用从 Selector 中获取 当前注册的 Channel。

    什么情况下才抛出 CancelledKeyException 异常呢?

    由于尚未调用select.select(..)操作,因此可能仍在缓存而未删除“已取消”selectionkey,因此强制调用 selector.selectNow() 方法将已经取消的 selectionKey 从 selector 上删除。

    只有第一次抛出此异常,才调用 selector.selectNow() 进行取消。 如果调用 selector.selectNow() 还有取消的缓存,可能是jdk的一个bug。

    @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }
    
        readPending = true;
    
        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
    

    该方法首先判断 Channel 是否可用,不可用直接返回。
    获取该 selectionKey 注册到 selector 的事件。
    如果注册的事件 位运算与 读事件 等于0,则说明该 Channel 没有在 selelctor 上注册读事件,在这里注册读事件。

    什么情况下会调用 doBeginRead() 方法?

    当 Channel 处于 channelActive 状态后,就会在DefaultChannelPipeline.channelActive 方法中调用 doBeginRead() 方法,在 selector 上注册读事件。

    相关文章

      网友评论

        本文标题:Netty 之 AbstractChannel 和 Abstra

        本文链接:https://www.haomeiwen.com/subject/fvmbjqtx.html