2-netty源码分析之Client
其实记录netty客户端的启动过程基本跟server端相似,无非是借助Bootstrap组装组件,然后通过connect发起连接。前面的步骤不多记录,看server启动过程基本了解。
那么这里主要记录下connect相关细节,以及client发起connect后,server是如何处理的。
依然从demo出发:
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
p.addLast(new EchoClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
可以看出前半部分基本一致,那就直接从下面这行核心代码出发:
ChannelFuture f = b.connect(HOST, PORT).sync();
1.connect分析
private ChannelFuture doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
final ChannelPromise promise = channel.newPromise();
if (regFuture.isDone()) {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
} else {
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
}
});
}
return promise;
}
其实一眼看去做了这几件事:
- 1.初始化并且注册channel,跟服务端类似,也就SocketChannel不一样等差别
- 2.设置异步回调
- 3.channel注册OK之后,执行doConnect操作,也就是doConnect0方法,这一步通过注册监听回调完成。
那么ChannelFuture何时出发operationComplete方法呢。从initAndRegister跟踪看看。
image.png
启动eventLoop线程执行register逻辑
image.png
跟着debug走到这里进去继续跟踪对于promise的相关设置
image.png
看到了promise.trySuccess()方法,继续走:
image.png image.png
看到了notify方法体,
image.png
OK,再走一步 触发回调的逻辑就出现了:
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
到这里紧接着就是执行开始设置的监听器里的doConnect逻辑了
image.png
,OK回调详细调用链很简单,一步步debug下去自然就很清楚。
无非是:
- 1.将用户执行的线程转化为EventLoop线程
- 2.执行channel register操作
- 3.设置DefaultPromise属性,执行回调方法
2.doConnect
/** 发起连接就在这里啦 */
private static void doConnect0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
/** 注意:这里是eventLoop线程 */
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
if (localAddress == null) {
channel.connect(remoteAddress, promise);
} else {
channel.connect(remoteAddress, localAddress, promise);
}
promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
可以看到这里,一罐的做法,将真正执行发起连接的操作扔进任务队列,交由EventLoop去执行。此处没有设置localAdress,那么紧接着就进入到一下:
image.png
到这里,出现了比较熟悉的piepline,pipeline暂时不详细分析,后面一章会专门记录。那么这里就是交给管道去处理,其实可以猜想就是交给piepline中的handler处理:
image.png
果然如此,找到piepline链路的末节点tail处理执行,那么究竟这个connect具体是那个handler处理的呢?继续:
public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
...
/**
* 调用 findContextOutbound 方法, 从 DefaultChannelPipeline 内的双向链表的 tail 开始,
* 不断向前寻找第一个 outbound 为 true 的 AbstractChannelHandlerContext, 然后调用它的 invokeConnect 方法
*
* 在 DefaultChannelPipeline 的构造器中, 会实例化两个对象: head 和 tail, 并形成了双向链表的头和尾.
* head 是 HeadContext 的实例, 它实现了 ChannelOutboundHandler 接口, 并且它的 outbound 字段为 true.
* 因此在 findContextOutbound 中, 找到的 AbstractChannelHandlerContext 对象其实就是 head.
* 进而在 invokeConnect 方法中, 我们向上转换为 ChannelOutboundHandler --> invokeConnect 方法
*/
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null);
}
return promise;
}
其实可以看到tail节点只不过是connect任务流转的第一个节点,这里会通过findContextOutbound去寻找第一个outbound为true的handler,然后由其去执行invokerConnect方法。还记的之前分析server时讲过:数据从外部流入对应的是inbound,相反则是outbound么.那么这里发起连接找得处理器必然是outbound对应的处理节点,SO 继续跟踪。
image.png
可以看到这里找到的是head节点,看下head类的描述:
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler
果然继承了ChannelOutboundHandler,那么找到它就合理了。那么很自然最终执行connect就会进入HeadContext的connect方法了:
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
/** AbstractNioByteChannel.NioByteUnsafe */
unsafe.connect(remoteAddress, localAddress, promise);
}
最终还不是具体的执行connect,交给了UnSafe去操作,其实这里也很合理,还记得之前分析Server时说过,netty最终与jdk nio底层打交道的所有事件,都是交给了UnSafe去操作的,那么这里就很合理这个设计了。
看看这里的UnSafe具体是哪个:
image.png
这里了是NioSocketChannelUnsafe,这里直接没有覆盖父类AbstractNioUnsafe的connect方法,那么下一步就直接进入了AbstractNioUnsafe的connect了,分析一下:
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
// Already a connect in process.
throw new ConnectionPendingException();
}
boolean wasActive = isActive();
/** doConnect 子类实现 */
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
...
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
public boolean isActive() {
SocketChannel ch = javaChannel();
return ch.isOpen() && ch.isConnected();
}
我们这是刚启动clinet,那么这里的isActive根据代码就知道返回false,可以看到UnSafe调用了外部类AbstractNioChannel的doConnect方法,而这里doConnect的具体执行逻辑交由具体子类实现,很自然走到了NioSocketChannel(服务端对应的是NioServerSocketChannel).
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
SocketUtils:
/**
* 从 NioSocketChannel.newSocket 返回的 SocketChannel 对象;
* 然后是调用 SocketChannel.connect 方法完成 Java NIO 层面上的 Socket 的连接.
*/
public static boolean connect(final SocketChannel socketChannel, final SocketAddress remoteAddress)
throws IOException {
try {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws IOException {
return socketChannel.connect(remoteAddress);
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getCause();
}
}
完成了jdk socket 的connect触发动作,连接发起也就算OK了,发起连接之后,如果此时连接尚未真正建立,设置interestOps为OP_CONNECT,这样就可以监听连接动作了,完成最终操作。
既然client已经发起了连接请求,那么接下来就要分析server如何处理连接请求了。
3.连接接入处理
其实这里的主要工作停留在server端,但是这个会直接决定整个client启动的成果,那么我们将server服务先跑起来,debug停留在reactor轮训那里,然后启动client发起请求。
那么服务端是怎么处理新连接的呢?主要如下:
- 1.轮训出新连接
- 2.处理新连接,其实这里会交给workGroup处理
- 3.注册读事件
前面说把debug停留在轮训处,还记得服务端轮训的diamante段吗,直接里面有这样一个逻辑:
processSelectedKeys();
private void processSelectedKeys() {
if (selectedKeys != null) {
/** 处理优化过的selectedKeys */
processSelectedKeysOptimized();
} else {
/** 正常的处理 */
processSelectedKeysPlain(selector.selectedKeys());
}
}
/**
* 迭代 selectedKeys 获取就绪的 IO 事件, 然后为每个事件都调用 processSelectedKey 来处理它.
*
* 1.对于boss NioEventLoop来说,轮询到的是基本上就是连接事件,后续的事情就通过他的pipeline将连接扔给一个worker NioEventLoop处理
* 2.对于worker NioEventLoop来说,轮询到的基本上都是io读写事件,后续的事情就是通过他的pipeline将读取到的字节流传递给每个channelHandler来处理
*
*
* netty的reactor线程第二步做的事情为处理IO事件,netty使用数组替换掉jdk原生的HashSet来保证IO事件的高效处理,
* 每个SelectionKey上绑定了netty类AbstractChannel对象作为attachment,在处理每个SelectionKey的时候,
* 就可以找到AbstractChannel,然后通过pipeline的方式将处理串行到ChannelHandler,回调到用户方法
*/
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
/** 1.取出IO事件以及对应的channel */
final SelectionKey k = selectedKeys.keys[i];
/** 取出后将数组置为空 */
selectedKeys.keys[i] = null;
final Object a = k.attachment();
/** 处理该channel */
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
/**
* 判断是否该再来次轮询
* 也就是说,对于每个NioEventLoop而言,每隔256个channel从selector上移除的时候,就标记 needsToSelectAgain 为true,我们还是跳回到上面这段代码
*/
if (needsToSelectAgain) {
// See https://github.com/netty/netty/issues/2363
/** 将selectedKeys的内部数组全部清空 */
selectedKeys.reset(i + 1);
/** 重新调用selectAgain重新填装一下 selectionKey */
selectAgain();
i = -1;
}
}
}
/**
* processSelectedKey 中处理了三个事件, 分别是:
* 1.OP_READ, 可读事件, 即 Channel 中收到了新数据可供上层读取.
* 2.OP_WRITE, 可写事件, 即上层可以向 Channel 写入数据.
* 3.OP_CONNECT, 连接建立事件, 即 TCP 连接已经建立, Channel 处于 active 状态.
* 4.OP_ACCEPT,请求连接事件
*/
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
/**
* 事件是 OP_CONNECT, 即 TCP 连接已建立事件.
* 1.我们需要将 OP_CONNECT 从就绪事件集中清除, 不然会一直有 OP_CONNECT 事件.
* 2.调用 unsafe.finishConnect() 通知上层连接已建立
* */
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
/**
* unsafe.finishConnect() 调用最后会调用到 pipeline().
* fireChannelActive(), 产生一个 inbound 事件, 通知 pipeline 中的各个 handler TCP 通道已建立(即 ChannelInboundHandler.channelActive 方法会被调用)
*/
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
/** boos reactor处理新的连接 或者 worker reactor 处理 已存在的连接有数据可读 */
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
/** AbstractNioByteChannel中实现,重点 */
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
我们将上面各种事件处打上断点,启动client,看看第一次发起连接会以何种事件进入轮训处:
image.png
可以看到客户端第一次发情连接,会触发服务端Accept事件,紧接着会触发UnSafe的读处理,那么为什么是Accept,后面将解释;我们继续:
服务端对应的UnSafe是NioMessageUnsafe,前面已经跟踪过,那么这里自然会进入下面逻辑:
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
...
/** 拿到对应channel的pipeline */
final ChannelPipeline pipeline = pipeline();
boolean closed = false;
Throwable exception = null;
try {
try {
for (;;) {
/** 读取一个连接,委托到外部类NioSocketChannel */
int localRead = doReadMessages(readBuf);
...
}
} catch (Throwable t) {
exception = t;
}
setReadPending(false);
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
/** 每条新连接丢给服务端的channel */
pipeline.fireChannelRead(readBuf.get(i));
}
/** 清理资源 */
readBuf.clear();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
if (isOpen()) {
close(voidPromise());
}
}
} finally {
...
}
}
}
上面贴出比较核心的逻辑,看到这里:
int localRead = doReadMessages(readBuf);
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
/** 获取到客户端新连接的 SocketChannel , jdk底层操作,返回jdk底层nio创建的一条channel */
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
/** 将jdk的 SocketChannel 封装成自定义的 NioSocketChannel ,加到list */
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
读操作很简单,NioMessageUnsafe调用, 因为NioMessageUnsafe是与新连接相关, 因此就是调用jdk的accept()方法,新建立一条连接,同时将jdk的 SocketChannel 封装成自定义的 NioSocketChannel;同时跟踪到NioSocketChannel的构造器里会发现此时会对SelectionKey.OP_READ进行注册,即该channel监听事件是 SelectionKey.OP_READ
这段代码翻译一下:
-
1.当一个 client 连接到 server 时, Java 底层的 NIO ServerSocketChannel 会有一个 SelectionKey.OP_ACCEPT 就绪事件, 接着就会调用到 這裡
-
2.接着就实例化一个 NioSocketChannel, 并且传入 NioServerSocketChannel 对象(即 this), 由此可知, 我们创建的这个 NioSocketChannel 的父 Channel 就是 NioServerSocketChannel 实例 .
-
3.接下来就经由 Netty 的 ChannelPipeline 机制, 将读取事件逐级发送到各个 handler 中, 于是就会触发前面我们提到的 ServerBootstrapAcceptor.channelRead 方法啦.
其实总结下就是doReadMessages 方法不断地读取连接,封装成NioSocketChannel放入 readBuf 容器,然后调用 pipeline.fireChannelRead(),将每条新连接丢给服务端的channel
OK,我们读取连接后,继续跟踪后面一个重要的逻辑
image.pngpipeline.fireChannelRead(readBuf.get(i));
可以看到,这里会调用AbstractChannelHandlerContext的invokeChannelRead方法。
这里对server启动做个回顾,方便介绍这里的调用链,我们在启动服务端的时候,piepiline里面的链路为:
head-->ServerBootstrapAcceptor-->tail链路,
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
/** 这里的handler返回的是主.handler(new LoggingHandler(LogLevel.INFO)) 中的handler*/
ChannelHandler handler = handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
/** 这里的 childGroup.register 就是将 workerGroup 中的某个 EventLoop 和 NioSocketChannel 关联 */
pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
就是上面这段代码。那么这里的invokeChannelRead会通过head往下流转,会进入ServerBootstrapAcceptor的channelRead方法,跟踪一下:
image.png
几经流转,确实到了这里:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
/** 将msg转换成对应的channel */
final Channel child = (Channel) msg;
/** 添加用户自定义的childHandler */
child.pipeline().addLast(childHandler);
/** 设置 NioSocketChannel 对应的 attr和option */
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
/** 将 workerGroup 中的某个 EventLoop 和 NioSocketChannel 关联 */
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
那这个逻辑做了什么呢?
- 1.将msg转换成对应的channel
- 2.添加用户自定义的childHandler
- 3.将 workerGroup 中的某个 EventLoop 和 NioSocketChannel 关联
好像已经看到了workerGroup的身影,我们在server中强调过,server启动过程只包含bossGroup的启动,不涉及workerGroup的启动,那这里是不是就是workGroup的启动呢?
我们继续:
第二点说的childHandler就是server里的这段逻辑
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new EchoServerHandler());
}
});
记住,这里的channel是NioSocketChannel,不是我们启动服务端的那个NioServerSocketChannel了。
那么到这里,NioSocketChannel绑定的piepiline就是:
head-->ChannelInitializer-->tail
继续划重点:
childGroup.register(child)
很相似的一段代码,注册channel到group,不就是将一个指定的NioSocketChannel绑定到具体的EvenyLoop么,不也就是在WorkGroup线程池中选着一个去维护NioSocketChannel的相关事件么,好像是很明了,咱们继续看看register做了什么:
MultithreadEventLoopGroup#register方法:
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
public EventExecutor next() {
return chooser.next();
}
首先next方法就是调用执行器选择一个具体的EventLoop,紧接着用这个选择的EventLoop执行register操作。继续跟踪:
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (promise == null) {
throw new NullPointerException("promise");
}
/** 获取 channel 的 unsafe() 底层操作对象, 然后调用它的 register. */
channel.unsafe().register(this, promise);
return promise;
}
image.png
可以看到这里的UnSafe是NioSocketChannelUnSafe类型,那么直接进入AbstractChannel#AbstractUnsafe的register方法:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
/** 将eventLoop与channel绑定 */
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
嗯哼,核心点依然出现:
- 1.将eventLoop与channel绑定
- 2.通过eventLoop.execute方式启动线程
- 3.以任务式完成注册
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
/** 将 Channel 对应的 Java NIO SockerChannel 注册到一个 eventLoop 的 Selector 中, 并且将当前 Channel 作为 attachment. */
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...
}
}
}
可以看到channel与selector进行了绑定,即selector对该channel进行轮训事件,并且所有该channel的事件都交由绑定的EventLoop执行。继续:
pipeline.invokeHandlerAddedIfNeeded();
这一步很熟悉哈,最终会触发啥可想而知:
image.png
此时才将用户自定义的handler装车进入piepline,那么这个时候的piepline会是这样:
head->EchoServerHandler->tail
继续
image.png
可以看到已经建立了连接,那么后面会干什么呢?看到下面就知道又是从head开始传递事件:
/** 为啥pipeline.fireChannelActive();最终会调用到AbstractNioChannel.doBeginRead(),了解pipeline中的事件传播机制 */
@Override
public final ChannelPipeline fireChannelActive() {
/** 三次握手成功之后,pipeline.fireChannelActive();被调用,然后以head节点为参数,直接一个静态调用 */
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
active传递之后就会进入readIfIsAutoRead,又是一轮piepline传递,继续
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
image.png
进入到了AbstractChannel#AbstractUnsafe的beginRead
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
可以看到注册了readInterestOp即SelectionKey.OP_READ事件,将 SelectionKey.OP_READ事件注册到selector中去,表示这条通道已经可以开始处理read事件了,也代表着客户端整个接入过程完毕。
4.总结
- 1.客户端发起连接
- 2.服务端reactor主线程轮训accept事件
- 3.建立jdk底层的socketChannel 并封装成自己的NioSocketChannel,NioSocketChannel一旦建立,代表连接建立
- 4.将NioSocketChannel绑定到指定的work EventLoop,并注册到指定的selector上
- 5.注册读事件,开启处理
- 6.完毕
网友评论