1、Reactor模式:NIO网络框架的典型模式
Reactor是网络编程中的一种设计模式,reactor会解耦并发请求的服务并分发给对应的事件处理器来处理。目前,许多流行的开源框架都用到了reactor模式,如:netty、node.js、Cindy等,包括java的nio。
何为Reactor线程模型?
Reactor模式是事件驱动的,有一个或多个并发输入源,有一个Service Handler,有多个Request Handlers;这个Service Handler会同步的将输入的请求(Event)多路复用的分发给相应的Request Handler
data:image/s3,"s3://crabby-images/36751/36751e34b1f341713832e527cfb095ce3a6a7977" alt=""
Reactor模式的三种形式
1、单线程Reactor 模式:
data:image/s3,"s3://crabby-images/d27e6/d27e69eea004d3623863da1fe6ba1c875b48672b" alt=""
这种实现方式,和第一章java NIO中单线程NIO实现是一样的,一个Reactor处理所有的事情。
data:image/s3,"s3://crabby-images/6b477/6b477549e87055ac07015b8c15891dd3bc973436" alt=""
2、多线程 Reactor 模式:
编解码及业务处理使用线程池,这样的话,可以避免IO阻塞(IO阻塞的代价是非常大的)。
data:image/s3,"s3://crabby-images/4ef57/4ef573f4db09fe1cef787002588e84f537eebcb3" alt=""
data:image/s3,"s3://crabby-images/e38f8/e38f8ae165b268f7606978cb08f1969d3dd4bf84" alt=""
3、多Reactors 模式:
把Reactor分为两个,一个负责接收,一个负责读写,业务处理还是用线程池(也可以选择不用线程池,这个看具体业务需求)
data:image/s3,"s3://crabby-images/763cb/763cb26ebad5eebde6a6654277d752910d34a40f" alt=""
data:image/s3,"s3://crabby-images/afc71/afc71608fc946435240361de101ff65851f40493" alt=""
2、Netty中如何使用Reactor模式
- 单线程Reactor 模式
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup ,bossGroup )
- 多线程 Reactor 模式
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup ,bossGroup)
//Handler使用线程池进行处理
- 多Reactors 模式
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup ,workerGroup )
注:new NioEventLoopGroup()默认创建cpu核数*2的线程数
3、Netty EventLoop源码解析
1、NioEventLoopGroup整体结构
data:image/s3,"s3://crabby-images/85e21/85e21032fa53294b994ff5bd9e2f390a9f567e48" alt=""
EventExecutorGroup视图
data:image/s3,"s3://crabby-images/f895b/f895bd73b0cc106fce00a71460b338423bf04bb7" alt=""
new NioEventLoopGroup源码
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//EventExecutorGroup里面有一个EventExecutor数组,保存了多个EventExecutor;
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//初始化EventExecutor数组,数组是NioEventLoop,见下面
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
//EventExecutorChooser.next()定义选择EventExecutor的策略;
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
@Override
public EventExecutor next() {
return chooser.next();
}
NioEventLoopGroup.class
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
NioEventLoop.class
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
//创建selector
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
- EventExecutorGroup里面有一个EventExecutor数组,保存了多个EventExecutor(NIOEventLoop);
- EventExecutorGroup是不干什么事情的,当收到一个请后,他就调用next()获得一个它里面的EventExecutor,再调用这个executor的方法;
- EventExecutorChooserFactory.EventExecutorChooser.next()定义选择EventExecutor的策略(有两种,都是轮询);
2、NioEventLoopGroup创建分析
bossGroup
data:image/s3,"s3://crabby-images/28ec4/28ec45cc0167adb64128cc0e83bf37a724c47fa1" alt=""
workerGrop
data:image/s3,"s3://crabby-images/3f72a/3f72ab42752e2bb6c028dfa57c90736cc1a9e83d" alt=""
3、ServerBootstrap启动流程分析
data:image/s3,"s3://crabby-images/c3512/c35129e3018532af867d2dfcb61f90ec1ecef756" alt=""
4、ServerBootstrap执行流程分析
data:image/s3,"s3://crabby-images/e03e8/e03e843806355fb4c1de127a80db5e5d2801c422" alt=""
// 配置服务端的NIO线程组
// 主线程组, 用于接受客户端的连接,但是不做任何具体业务处理,像老板一样,
//负责接待客户,不具体服务客户
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 工作线程组, 老板线程组会把任务丢给他,让手下线程组去做任务,服务客户
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
//欲加到NioServerSocketChannel Pipeline的handler
.handler(new LoggingHandler(LogLevel.INFO))
//欲加到NioSocketChannel(accept()返回的)Pipeline的handler
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
ch.pipeline().addLast("decoder", new StringDecoder());
ch.pipeline().addLast("encoder", new StringEncoder());
ch.pipeline().addLast(new EchoServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// 绑定端口,开始接收进来的连接
ChannelFuture f = b.bind(port).sync(); // (7)
data:image/s3,"s3://crabby-images/106fb/106fbc885159c84fe5b814a8365b1f996bac98c7" alt=""
网友评论