概述
channel 是 netty 网络 IO 操作抽象出来的一个接口,主要功能有:网络IO的读写,客户端发起连接、主动关闭连接,链路关闭,获取通信双方的网络地址等。
这里我们只简单分析下 AbstractChannel 和 AbstractNioChannel 这两个Channel源码。
AbstractChannel 源码
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
// 父 Channel(NioServerSocketChannel 是没有父channel的)
private final Channel parent;
// Channel 唯一ID
private final ChannelId id;
// Unsafe 对象,封装 ByteBuf 的读写操作
private final Unsafe unsafe;
// 关联的 Pipeline 对象
private final DefaultChannelPipeline pipeline;
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
private final CloseFuture closeFuture = new CloseFuture(this);
// 本地地址和远端地址
private volatile SocketAddress localAddress;
private volatile SocketAddress remoteAddress;
// EventLoop 封装的 Selector
private volatile EventLoop eventLoop;
// 是否注册
private volatile boolean registered;
private boolean closeInitiated;
/** Cache for the string representation of this channel */
private boolean strValActive;
private String strVal;
AbstractChannel 构造
protected AbstractChannel(Channel parent, ChannelId id) {
this.parent = parent;
this.id = id;
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
// Unsafe 实现交给子类实现
protected abstract AbstractUnsafe newUnsafe();
// 创建 DefaultChannelPipeline 对象
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
Unsafe类里实现了具体的连接与写数据。比如:网络的读,写,链路关闭,发起连接等。之所以命名为unsafe是不希望外部使用,并非是不安全的。
DefaultChannelPipeline 只是一个 Handler 的容器,也可以理解为一个Handler链,具体的逻辑由Handler处理,而每个Handler都会分配一个EventLoop,最终的请求还是要EventLoop来执行,而EventLoop中又调用Channel中的内部类Unsafe对应的方法。
新建一个channel会自动创建一个ChannelPipeline。
这里创建 DefaultChannelPipeline,构造中传入当前的 Channel,而读写数据都是在 ChannelPipeline 中进行的,ChannelPipeline 进行读写数据又委托给 Channel 中的 Unsafe 进行操作。
AbstractUnsafe 类结构
从类结构中可以看出 Unsafe 实现了 网络的读,写,连接关闭,发起连接等操作。
AbstractNioChannel 源码
public abstract class AbstractNioChannel extends AbstractChannel {
// 抽象了 SocketChannel 和 ServerSocketChannel 的公共的父类
private final SelectableChannel ch;
// SelectionKey.OP_READ 读事件
protected final int readInterestOp;
// 注册到 selector 上返回的 selectorKey
volatile SelectionKey selectionKey;
// 是否还有未读的数据
boolean readPending;
private final Runnable clearReadPendingRunnable = new Runnable() {
@Override
public void run() {
clearReadPending0();
}
};
/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
// 连接操作的结果
private ChannelPromise connectPromise;
// 连接超时定时任务
private ScheduledFuture<?> connectTimeoutFuture;
// 客户端地址
private SocketAddress requestedRemoteAddress;
....
SelectableChannel 抽象了 java.nio.SocketChannel 和 java.nio.ServerSocketChannel 的公共方法。netty 封装了 NIO 的channel。
注册 Channel
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 这里注册的事件为0
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
javaChannel() :是 Nio 中的 channel。
eventLoop().unwrappedSelector(): 是 Nio 中的 selector。
在java.nio.channels.SelectionKey 类中定义了 4种事件。
// 读操作事件
public static final int OP_READ = 1 << 0;
// 写操作事件
public static final int OP_WRITE = 1 << 2;
// 客户端链接服务的操作事件
public static final int OP_CONNECT = 1 << 3;
// 服务端接受客户端连接事件
public static final int OP_ACCEPT = 1 << 4;
这里注册的是0,说明对任何事件都不感兴趣,仅仅完成注册操作。把当前的 Channel当做附件进行注册。如果注册成功则返回 selectionKey,通过 selectionKey 可用从 Selector 中获取 当前注册的 Channel。
什么情况下才抛出 CancelledKeyException 异常呢?
由于尚未调用select.select(..)操作,因此可能仍在缓存而未删除“已取消”selectionkey,因此强制调用 selector.selectNow() 方法将已经取消的 selectionKey 从 selector 上删除。
只有第一次抛出此异常,才调用 selector.selectNow() 进行取消。 如果调用 selector.selectNow() 还有取消的缓存,可能是jdk的一个bug。
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
该方法首先判断 Channel 是否可用,不可用直接返回。
获取该 selectionKey 注册到 selector 的事件。
如果注册的事件 位运算与 读事件 等于0,则说明该 Channel 没有在 selelctor 上注册读事件,在这里注册读事件。
什么情况下会调用 doBeginRead() 方法?
当 Channel 处于 channelActive 状态后,就会在DefaultChannelPipeline.channelActive 方法中调用 doBeginRead() 方法,在 selector 上注册读事件。
网友评论