笔者个人理解,如有错误之后,请指正。
一个简单的例子,不过是不可以运行的,少了一些类哈
/**
* Echoes back any received data from a client.
*/
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
在分析源码之前,我们可以尝试思考几个问题,带着问题去看源码,才有目的性,才会有一种柳岸花明的感觉。
- 首先,netty是对原生NIO的一层封装,那netty是如何实现对原生NIO的封装的
- 第二,服务器是如何不断处理客户端的请求的
- 第三,bossGroup和workerGroup是如何分工的
入口:ServerBootStrap的bind()
AbstractBootstrap#bind()
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
AbstractBootstrap#doBind()
private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化并且注册
// 重点关注initAndRegister()
final ChannelFuture regFuture = initAndRegister();
// 拿到Channel
final Channel channel = regFuture.channel();
// cause()不为空,表示出错l
if (regFuture.cause() != null) {
return regFuture;
}
// 已经成功
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
AbstractBootstrap#initAndRegister()
final ChannelFuture initAndRegister() {
// 对于ServerBootstrap,对应的是NioServerSocketChannel
Channel channel = null;
try {
// 实例化Channel对象
channel = channelFactory.newChannel();
// 初始化
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 注册Channel
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
上面有一步很关键的代码
// 实例化Channel对象
channel = channelFactory.newChannel();
首先看一下channelFactory,它是一个Channel工厂,它是什么时候被赋值的呢?
AbstractBootstrap#channel()
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
在我们指定Channel的时候,会调用这个方法,服务器启动时传进来的是NioServerSocketChannel.class。
看一下ReflectiveChannelFactory这个类,是一个反射的Channel工厂,通过传进来的Class来创建对应的Channel,还是比较容易理解的。
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
if (clazz == null) {
throw new NullPointerException("clazz");
}
this.clazz = clazz;
}
@Override
public T newChannel() {
try {
return clazz.getConstructor().newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
@Override
public String toString() {
return StringUtil.simpleClassName(clazz) + ".class";
}
}
看回去channelFactory()方法
public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
return channelFactory((ChannelFactory<C>) channelFactory);
}
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
//赋值,ChannelFactory
this.channelFactory = channelFactory;
return self();
}
上面就完成了对channelFactory的赋值,而且这个channelFactory是用来创建NioServerSocketChannel实例的。
知道了channelFactory是怎么赋值的,是用来干嘛的,接着回去。
// 实例化Channel对象
channel = channelFactory.newChannel();
上面这句话的意思就是创建一个NioServerSocketChannel的实例,那自然会调用它的构造方法,那就接下去看看NioServerSocketChannel的构造方法。
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
/**
* 创建一个新的ServerSocketChannel
* @param provider
* @return
*/
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
这一步,跟JDK NIO一样,需要创建一个ServerSocketChannel。
public NioServerSocketChannel(ServerSocketChannel channel) {
// 监听客户端连接
super(null, channel, SelectionKey.OP_ACCEPT);
// 配置
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
接着看父类的构造函数
AbstractNioMessageChannel的构造函数
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
继续往下看
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
// 这里的parent是为null
super(parent);
// 这里ch是ServerSocketChannel
this.ch = ch;
// readInterestOp是子类的传过来SelectionKey.OP_ACCEPT
// 表示这个ch对客户端的连接事件有兴趣
this.readInterestOp = readInterestOp;
try {
// 设置为非阻塞
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
总结一下,从上面channelFactory.newChannel()的过程中,会去创建一个ServerSocketChannel(这与JDK NIO的ServerSocketChannel是一样的),然后还有准备监听SelectionKey.OP_ACCEPT,不过从构造函数看来还没开始注册监听事件。
继续回去看看,回到AbstractBootstrap#initAndRegister()的这个方法
init(channel);
对channel进行初始化,init()是一个抽象方法,由它的子类是实现。
这里对应的子类是ServerBootstrap
@Override
void init(Channel channel) throws Exception {
// 所有的选项
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
// 所有的属性
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
// 管道
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
// 添加管道处理
// 这一步并不是马上执行
// 因为执行addLast方法时,会被判断这个Channel有没有被注册
// 如果没有被注册的话,则会等到注册时再回调这个方法
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
// 处理器
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 增加一个ServerBootstrapAcceptor
// 这里用来处理新的客户端连接
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
上面的方法中,有ServerBootstrapAcceptor处理类,等会再回来看看。
接着又看回去initAndRegister()
final ChannelFuture initAndRegister() {
// 对于ServerBootstrap,对应的是NioServerSocketChannel
Channel channel = null;
try {
// 实例化Channel对象
channel = channelFactory.newChannel();
// 初始化
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 注册Channel
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
下面执行到这一步
// 注册Channel
ChannelFuture regFuture = config().group().register(channel);
这个config().group()其实就是一开始传进来的bossGroup,看看register()方法
这里最终会调用SingleThreadEventLoop的register()方法
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
最后调用AbstractUnsafe这个类
//注册
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 判断EventLoop不能为空
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
// 判断是否已经注册过
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// 赋值
// 对于NioServerSocketChannel来说,这个才给eventLoop赋值
// 指定这个channel对应的eventLoop
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 这里就会开启一个新的线程
eventLoop.execute(new Runnable() {
@Override
public void run() {
// 接着继续看这里
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
register0()
//注册
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// 第一次注册
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
// 已经注册
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
// 这一步,会将之前addLast,但是没有添加进来的Handler,重新添加进来
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
// 回调fireChannelRegistered() Channel已经被注册
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
// 服务器刚开始启动时不会进来
if (isActive()) {
// 如果是第一次注册
if (firstRegistration) {
// 回调
// 活跃状态
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
上面的操作时注册Channel,这个时候会去启动一个线程,这里的eventLoop是NioEventLoopGroup,这里执行execute()方法时,回去调用NioEventLoop的run()方法
/**
* 线程启动的时候会执行这个方法
*/
@Override
protected void run() {
for (;;) {
try {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
// 处理SelectedKeys
processSelectedKeys();
} finally {
// Ensure we always run tasks.
// 运行所有任务
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
// 处理SelectedKeys
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
这里的run()方法先不细说,回头再看看。
run()方法主要做了3件事情:
-
轮询注册到selector的所有的channel的I/O事件
select();
-
处理产生网络I/O事件的channel
processSelectedKeys();
-
运行所有任务
runAllTasks();
到这里,我们就分析玩了initAndRegister()这个方法,然后现在继续返回看doBind()方法吧。
private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化并且注册
final ChannelFuture regFuture = initAndRegister();
// 拿到Channel
final Channel channel = regFuture.channel();
// cause()不为空,表示出错l
if (regFuture.cause() != null) {
return regFuture;
}
// 已经成功
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
接着看doBind0()方法
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
//
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
//
if (regFuture.isSuccess()) {
// 绑定
// 添加监听
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
最终会调用AbstractUnsafe类的bind()方法,其实所有的网络I/O操作最终都是由Unsafe类去完成的。
// 绑定
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
// 激活状态
boolean wasActive = isActive();
try {
// 绑定处理
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
接着NioServerSocketChannel的doBind()方法。
上面还有一步操作pipeline.fireChannelActive(),通知channel处于激活状态,
跟踪进去。
DefaultChannelPipeline的fireChannelActive()
@Override
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
}
private void invokeChannelActive() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelActive();
}
}
DefaultChannelPipeline的channelActive()
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
跟踪进来这么久,发现了一个比较有用的东西
readIfIsAutoRead();
那就接着进去看看呗
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
默认是自动读的,又看到channel去调用网络I/O操作,那最终还是交给UnSafe类去完成的
果不其然,最终调用了DefaultChannelPipeline的read()方法
然后是AbstractUnSafe类的beginRead()方法
@Override
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
// 捕获异常
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
然后再调用AbsrtactNioChannel的doBeginRead()方法
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
// 等于0 说明没有注册读事件
if ((interestOps & readInterestOp) == 0) {
// 监听网络事件
selectionKey.interestOps(interestOps | readInterestOp);
}
}
还记得readInterestOp吗?在最开始的时候,创建NioServerSocketChannel实例的时候,传进去了SelectionKey.OP_ACCEPT,所以这个时候channel就会监听客户端的连接事件了。
selectionKey.interestOps(interestOps | readInterestOp);
总结一下上面的分析,上面的分析涵盖了原生NIO的使用,创建ServerSocketChannel,绑定端口,注册监听事件,轮询I/O事件,处理I/O事件等等。
记得上面提到的ServerBootstrapAcceptor类吗?
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 这里的Channel是NioSocketChannel
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// childGroup是workerGroup
// 这里会把channel交给workerGroup处理
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
呀呀呀呀,感觉讲得不是很好,还有一些细节的东西没有讲到。
有时候看懂代码了,但是讲不清楚,但是需要努力呀。
网友评论