源码分析基于netty 4
接口结构
evenloop
先看看evenloop的接口
evenloop.png
除了jdk原生的ScheduledExecutorService/ExecutorService/Executor接口, netty主要实现了两大接口体系:EventExecutor, EventLoop
EventExecutor继承了ScheduledExecutorService,负责处理netty产生的task,它扩展了submit, inEventLoop等方法
EventLoop是核心接口,loop表示循环,一旦一个Channel 注册到loop, 它将处理Channel所以的I/O操作。
以grop结尾的接口,通过next方法返回一个实际操作的接口
先简单看看evenloop的关键实现
SingleThreadEventExecutor是EventExecutor的一个简单的实现类,在一个线程中执行所有提交的task。
SingleThreadEventExecutor.execute将执行一个task
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
...
}
inEventLoop
判断当前线程是否为loop的线程
如果inEventLoop,直接把task添加到队列中
否则先调用startThread, 该方法如果检测到当前loop线程没启动,会启动loop线程
SingleThreadEventExecutor定义了一个关键的抽象方法run(), 该run方法负责进行loop循环, 处理各种事件/task, NioEventLoop主要的逻辑就在run方法中
Channel
在看看Channel的接口结构
channel.png
图中没有展示,channel中还定义了一个重要的接口Unsafe
Unsafe定义了一些不提供给用户调用的方法, 这些方法实现了实际的网络运输操作
AbstractNioChannel中也定义了NioUnsafe接口, 扩展了一些方法
AbstractNioChannel分为两个不同的体系, AbstractNioByteChannel和 AbstractNioMessageChannel
AbstractNioByteChannel 负责底层的字节操作
可以看一下AbstractNioByteChannel.NioMessageUnsafe 中的read, doWrite
AbstractNioMessageChannel中的read方法, 通过模板方法doReadMessages(readBuf)
将缓存区交给子类处理
实例
一个netty实现的tcp server端小栗子
EventLoopGroup parentGroup = new NioEventLoopGroup(1);
EventLoopGroup childGroup = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
public void initChannel(SocketChannel ch) throws Exception {
...
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
代码不完整, 但已经足够描述问题
源码分析
Reactor
注意上面栗子创建了两个EventLoopGroup, 他们的责任不同, 注意观察他们的工作
Netty基于Multiple Reactors模式。Mutilple Reactors模式有多个reactor:mainReactor和subReactor,其中mainReactor负责客户端的连接请求,并将请求转交给subReactor,后由subReactor负责相应通道的IO请求,非IO请求(具体逻辑处理)的task则会直接写入队列,等待worker threads进行处理。
Multi-reactors3.png.jpeg
Netty的线程模型基于Multiple Reactors模式,借用了mainReactor和subReactor的结构,但它并没有Thread Pool。Netty的subReactor与worker thread是同一个线程。
parentGroup就是mainReactor。
childGroup就是subReactor + Thread Pool。
childGroup聚合在ServerBootstrap.childGroup中
parentGroup聚合在ServerBootstrap.group中
AbstractBootstrap.doBind
final ChannelFuture regFuture = initAndRegister();
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
...
}
initAndRegister
initAndRegister负责初始化channle, 并注册到loop中
final ChannelFuture initAndRegister() {
Channel channel = null;
channel = channelFactory.newChannel();
init(channel);
ChannelFuture regFuture = config().group().register(channel);
}
AbstractBootstrap.channelFactory为ReflectiveChannelFactory对象,他通过反射生成NioServerSocketChannel对象(NioServerSocketChannel对象由ServerBootstrap.channel(NioServerSocketChannel.class)配置)
AbstractNioChannel中有一个int readInterestOp
属性, 会影响到channle注册的事件
注意一下NioServerSocketChannel的构造方法:
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
这里的readInterestOp是OP_ACCEPT,
初始化
init(channel)
会调用到子类ServerBootstrap
void init(Channel channel) throws Exception {
// 初始化options和attr
...
ChannelPipeline p = channel.pipeline();
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// ch.eventLoop()是parentGroup
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
这里给ChannelPipeline添加了ChannelInitializer,ChannelInitializer是一个特殊的handler, 它被添加后就会执行initChannel方法,所以这里工作 主要是给ChannelPipeline添加了ServerBootstrapAcceptor, 注意, ServerBootstrapAcceptor构建参数中的currentChildGroup是childGroup
注册
在看看config().group()中的注册
config().group().register(channel)
注意:config().group()返回的是parent EventLoopGroup
这里会调用的SingleThreadEventLoop.register
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
unsafe()将返回AbstractUnsafe, 看看它的register方法
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
}
}
private void register0(ChannelPromise promise) {
doRegister();
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise); // 设置promise完成状态
...
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
}
inEventLoop和execute前面已经说过了
这里将启动parentGroup了
doRegister调用到子类AbstractNioChannel.doRegister
protected void doRegister() throws Exception {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
}
AbstractChannel中聚合了一个eventloop, 这里是parent EventLoopGroup
AbstractNioChannel还聚合了java.nio.channels.SelectableChannel, javaChannel()正是返回该对象
NioEventLoop中聚合了java.nio.channels.Selector, eventLoop().selector正是该对象
这里实现了jdk原生的channle注册到selector, 并将this作为att, 但还没有注册关注的事件
注册事件是在beginRead中实现, 该方法将调用到AbstractNioChannel.doBeginRead
protected void doBeginRead() throws Exception {
...
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
selectionKey.interestOps()获取已注册的事件, 当前是0, readInterestOp在NioServerSocketChannel创建时已经被赋值为SelectionKey.OP_ACCEPT, 这里终于注册了ACCEPT事件。
绑定端口
启动的最后一步,就是绑定端口,在AbstractBootstrap.doBind0中实现
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
channel.bind
会依次调用DefaultChannelPipeline.bind
, TailContext.bind
, HeadContext.bind
, AbstractUnsafe.bind
,NioServerSocketChannel.doBind
(跳转过程后面会讲):
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
到此,启动完成。
关于Reactor模式, 推荐看看Doug Lea大神的《Scalable IO in Java》
网友评论