前面已经做好了一些初始化工作了。
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
// 校验工作,只是校验一下有没有初始化什么的,不管,
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化以及注册,这个方法会做很多时间,看下文
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
final ChannelFuture initAndRegister() {
/**
前面serverBootstrap的channel()方法传入了NioServerSocketChannel类型,初始化了channel工厂,这里用这个工厂创建NioServerSocketChannel实例,
创建NioServerSocketChannel实例会调用NioServerSocketChannel的构造方法,看下文
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
*/
final Channel channel = channelFactory.newChannel();
try {
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
创建NioServerSocketChannel实例。
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
// nio的ServerSocketChannel的open方法也是这么创建ServerSocketChannel对象的,
// 所以provider是nio用来创建channel的底层类
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
// 创建了ServerSocketChannel后,创建NioServerSocketChannel
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
// NioServerSocketChannel的ch变量维护了nio的ServerSocketChannel,
// ServerSocketChannel配置成非堵塞
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
// unsafe其实是NioSocketChannelUnsafe类,里面的方法都是
// doWrite、doRead等操作nio的包装类
unsafe = newUnsafe();
// 管道,我们自定义的handler往管道里添加
pipeline = newChannelPipeline();
}
Pipeline初始化的时候,就设置号了head和tail,结构为:head《=》tail双向链表。
Head实现了ChannelOutBoundHandler和ChannelInboudhandler,所以是输入和输出处理器,
tail则只实现了ChannelInboudhandler,所以是输入处理器
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
// pipeline,维护了一个链表
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
// tail是ChannelInboundHandler接口实现类,也就是In类型handler
tail = new TailContext(this);
// head是ChannelOutboundHandler、ChannelInboundHandler实现类,
//也就是in类型和out类型handler
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler
创建NioServerSocketChannel总结:NioServerSocketChannel内部开启了ServerSocketChannel,并设置为非堵塞,维护了一个pipline管道,管道有head和tail,和unsafe操作nio读写等的操作类。
回到initAndRegister方法
final ChannelFuture initAndRegister() {
// 前面分析了,这里的channel是NioServerSocketChannel,里面维护了pipling等很多东西
final Channel channel = channelFactory.newChannel();
try {
// 现在分析这里,这里也很重要,看下文
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
// 这里,往管道里添加了一个ChannelInitializer,
// ChannelInitializer是inboudhandler输入处理器,它的
// channelRegister方法会调用initChannel方法,在channel
// 注册到select后,会传播调用pipeLine的一个一个
// inboudHandler的channelRegister方法,所以会调用到这里
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
// 这里把我们自己传入的handler,LoggingHandler放入了管道
pipeline.addLast(handler);
}
// 还放入了这个东高西,这个东西很重要,用来处理和客户端连接的,是inboud处理器,看下文
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
ServerBootstrapAcceptor这个类,NioServerSocketChannel关联的select的线程,一直在轮询接受客户端连接,接受了连接之后,就是这里在进行分发给child EventLoopGroup线程池处理。
ServerBootstrapAcceptor(
EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
}
@Override
// 这是NioServerSocketChannel的管道里的Handler,这里只监听客户端连接进来的事件,客户端的读、写会分发给child EventLoopGroup来处理
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 这里只监听客户端连接进来的事件 ,所以这里的channel,是客户端刚建
// 立的NioSocketChannel
final Channel child = (Channel) msg;
// 和客户端建立的NioSocketChannel的管道,把我们传入的
// child handler:MyServerInitializer 放进去,child handler传入的handler是真正
// 处理SocketChannel读和写等交互操作的
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 把和客户端建立的SocketChanel,注册进childGroup线程池,线程池
// 会选一个EventLoop,然后EventLoop启动线程,循环select 监听读写事件,进行读写操作,一个channel对应一个线程
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
到这里initAndRegister方法里的init就分析完了,总结一下:创建了NioServerSocketChannel之后,调用init方法,会往NioServerSocketChannel的管道里放一个ChannelInitializer,ChannelInitializer的初始化方法initChannel会把我们的传入的LogHandler传入进去,并且还加入一个处理客户端连接的handler:ServerBootstrapAcceptor,ServerBootstrapAcceptor是真正对和客户端连接的SocketChannel进行处理的,处理的方式是:SocketChannel的管道会添加我们传入的自定义childHandler(这个handler是我们自己处理业务的),之后把SocketChanel,注册进childGroup线程池,由childGroup线程池分发线程处理, childGroup会选一个EventLoop,然后EventLoop启动线程,循环select,交互操作。
再总结:一个server启动,boss eventLoopGroup只会启动一个线程,监听NioServerSocketChannel注册的select,监听客户端的连接时间,和客户端建立连接之后,从child EventLoopGroup选一个eventLoop,启动一个线程,eventLoop内部维护了一个select,child eventLoop启动的线程循环select,然后分队对read和write等事件进行处理。
一个socketChannel对应一个线程,单线程读写,一个SocketChannel维护一个自己的select,boss eventLoopGroup其实只是开启了一个线程,循环监听接受客户端连接的select(ServerSocketChanel自己维护的select)。
有点绕。。。 不要紧,继续看,看完之后,回头就理解。
init分析完了,分析ChannelFuture regFuture = group().register(channel);
ChannelFuture regFuture = group().register(channel);
// MultithreadEventLoopGroup类
@Override
public ChannelFuture register(Channel channel) {
// next()从group EventLoopGroup,通过前面说的选择器,选择一个EventLoop,执行register注册。
return next().register(channel);
}
// SingleThreadEventLoop类
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
// NioServerSocketChannel注册到这个eventLoop
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// channel和eventLoop关联起来
AbstractChannel.this.eventLoop = eventLoop;
// inEventLoop方法判断,当前线程,和eventLoop内部开启的线程是否是
//同一个,这里eventLoop还没有开启线程呢,不进入这里
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 这个eventLoop是SingleThreadEventLoop对象。
//eventLoop的execute方法就好像线程池execute方法一样,提交一个任务,让线
// 程池运行。看下文
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
// 如果是nioEventLoop自己线程里面,则放入队列,否则,启动新线程。
// 这里,eventLoop里面还没有线程,所以不会进入这里
if (inEventLoop) {
addTask(task);
} else {
// 启动线程
startThread();
// 把任务放入eventLoop的队列
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}
private void doStartThread() {
assert thread == null;
// 这个EventLoop里的executor是ThreadPerTaskExecutor
// 这里真这个启动线程,异步。
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 这个线程的任务就是跑NioEventLoop,也就是
// SingleThreadEventExecutor的run方法
SingleThreadEventExecutor.this.run();
success = true;
}
}
新启动的线程,轮询select,接受客户端的连接
// 轮询select
protected void run() {
for (;;) {
try {
// hasTask有任务,则进入default分支,没任务则进入select分支
// 因为这里,主线程已经把注册任务放入了队列,所以这里队列是有任务的
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
// 这里会一直select轮询,接收客户端的连接
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// 有任务
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
// ioRatio 是io和run task的比值,io指的的监听select用户的连接io,
// ioRatio为100,则说明,一定先select接受用户的连接,之后,才处理任务。默
//认是50,则说明,select用户连接的时间和run task的时间是一半一半,即使任
// 务队列中还有任务没有执行完,也返回轮询select接受用户连接
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
// 处理selectKey,
// 第一次selectedKeys为null,所以这里什么都没做
// 后面执行了select.select()方法后,selectedKeys不为null,则这里会做很多事情
processSelectedKeys();
final long ioTime = System.nanoTime() - ioStartTime;
// 第一次其实进入这里会做很多事情
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
}
前面说了,启动线程之后,addTask(task)往单线程池里添加了注册任务,这里会取出任务执行
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
// 取出任务
Runnable task = pollTask();
if (task == null) {
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
try {
// 执行,也就之前addTask的register0任务,下文分解
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception.", t);
}
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
// 再取
task = pollTask();
// 没有任务则跳出
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
this.lastExecutionTime = lastExecutionTime;
return true;
}
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 这里是把Server channel注册到现在的Group()里的一个eventloop的selector中
// channel.register(select,0,this)
doRegister();
neverRegistered = false;
registered = true;
safeSetSuccess(promise);
// 这里调用pipeline的传播channelRegister方法,会
// 调用一个一个inboud handler的channelRegistered,最后调
// 用到了前面说的channelIniiter的channelRegister方法,然后
// channelRegister方法调用ChannelInitializer方法,然后把我
// 们自定义的handler添加到pipeline,还添加了
// ServerBootstrapAcceptor,用来分发channel连接的
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 这里,channel.register(select,0,this)注册
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
...
}
}
private void processSelectedKeys() {
// selectedKeys 就是第一章里,新建EventLoopGroup中new Child初始化
// EventLoop的时候,openSelect初始化select的时候,nio的select会通过反射把
// selectedKeys对象设置进去,这样Select的select()堵塞的时候,这里的
// selectKeys中的对象就会发生变化。
if (selectedKeys != null) {
// new EventLoop的时候,selectedKeys 初始化过了,进入这里
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
前面select.select()轮询,接收到了selectionKey之后,这里进行处理
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
// selectKey有值则进入这里
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys[i] = null;
// channel register select 的时候会通过:k.attach(att);设置了channnel
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
// 进入这里
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
for (;;) {
i++;
if (selectedKeys[i] == null) {
break;
}
selectedKeys[i] = null;
}
selectAgain();
// Need to flip the optimized selectedKeys to get the right reference to the array
// and reset the index to -1 which will then set to 0 on the for loop
// to start over again.
//
// See https://github.com/netty/netty/issues/1523
selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// k 不合法,不会进入这里
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registerd to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// 接受客户端连接accept时间是这里进行的处理
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 新建的连接,在这里,会传播调用fireChannelRead
//方法,最后,调用到ServerBootstrapAcceptor类的
//channelRead方法,然后把这个channel注册到childGroup线
//程池中的一个EventLoop的select上去,然后这个childCroup
//的线程循环select
unsafe.read();
// channel关闭了
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
// 来了写事件,则强迫刷调缓冲区
ch.unsafe().forceFlush();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
// 这里会回调监听器的operationComplete方法,说明已经连接成功了
unsafe.finishConnect();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
@Override
public void read() {
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 这里一个个调用pipeLine的handle对象,这里会调用前面说的
// ServerHandleAccept,来对客户端连接的SocketChannel分发到child Group线
// 程池。
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
}
}
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
// head为管道的head,前面说了head是inbound也是outbound,read属于输
// 入,所以会一个一个调用所有的inboud的read的方法,从head开始调
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
// 传入的参数ext为head, executor返回的channel对应的EventLoop
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
}
}
// 调用head的channelRead方法,然后fireChannelRead调用管道下一个in类型handler
private void invokeChannelRead(Object msg) {
if (isAdded()) {
try {
// handle返回我们设置的logHandle,调用logHandler的channelRead发方法
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
// 一旦出现异常,则一个一个调用inboud的exceptionCaught,机制和read一样
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
// logHandler的channelRead执行了自己的逻辑之后,ctx.fireChannelRead(msg)往下传递,找下一个handle。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "RECEIVED", msg));
}
ctx.fireChannelRead(msg);
}
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
// 找到下一个in类型的handler,然后执行,如此下去
invokeChannelRead(findContextInbound(), msg);
return this;
}
// 找到下一个inboud
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
也就是说:添加的handler有,ChannelInboundHandler,和ChannelOutboundHandler,输入则一个一个调用ChannelInboundHandler,输出则一个一个调用ChannelOutboundHandler,这里说的输入是说接收数据,输出说写数据出去,出现了异常也是看是输入还是输出的时候发生的异常,然后调用ChannelInboundHandler还是ChannelOutboundHandler的异常处理。
输入的调用的顺序是:head-》-》往下,输出的时候是Tail=》=》一个一个往上。
差不多了,还有一点点细节:比如,register方法是怎么注册的,bind是怎么bind的,但是不重要,整个个流程分析了。
其他的细节问题,碰到问题,再看看,也能看明白。
总结:
一个server启动,boss eventLoopGroup只会启动一个线程,监听NioServerSocketChannel注册的select,监听客户端的连接事件,和客户端建立连接之后,从child EventLoopGroup选一个eventLoop,启动一个线程,eventLoop内部维护了一个select,把这个新建立的连接注册到child EventLoopGroup中的eventLoop的select,child eventLoop启动的线程循环select,然后分队对read和write等事件进行处理。
一个channel维护一个pipeline, 一个eventLoop对应一个select,channel会注册到eventLoop中去,也就是channel对应一个线程,对应一个select,但是这个eventloop可能被许多个channel注册了,也就是,这个select可能监听了许多个channel,也就是:select 和 线程是一对一,select 和 channel是一对多,线程和channel是一对多。
单线程读写channel,所以没有并发问题,但是如果一个channel堵塞了,可能就会影响到和这个channel共用select,共用线程的其他channel得不到处理。
而且:比如 EventLoopGroup bossstrap = new NioEventLoopGroup();
这种写法,其实可以可以这么写: EventLoopGroup bossstrap = new NioEventLoopGroup(1);,省的初始化多个EventLoop,当然空间也占不了多少。
对比nio:nio是一个select监听所有的客户端连接,和读写事件等,netty是一个select只监听客户端连接,但是多个select只监听读写等事件。
所以说,单从select和多线程方面,netty不会比nio效率高太多,nio一个select监听到事件之后,使用线程池处理的话,性能和netty不会差很多,不过nio可能会存在多线程操作的锁的问题。
使用netty最大的必要性是:netty比nio简单,工作量小,自带很多功能,二nio要实现这些功能要重新写,然后netty稳定,修复了nio的一些bug。
网友评论