前言
之前聊过几种常见的 I/O 模型,不过要说起当红炸子鸡还得是 I/O 多路复用,Java 1.4 引入的 nio package 提供了对这一模式的支持,著名开源框架 Netty 更是这一领域的扛鼎之作。
Netty is an asynchronous event-driven network application framework for rapid
development of maintainable high performance protocol servers & clients.
Netty 官网介绍其是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。这里异步
暂且不说,事件驱动
可是大有文章,今天就聊聊这个吧。
Event Driven
一个典型的网络服务通常都有着如下基础结构:
-
read request
-
decode request
-
process service
-
encode reply
-
send reply
细究起来,每一步的性质和成本各不相同,这种不均衡会极大的影响吞吐量,分治策略通常是解决这一类型问题的最佳方案。我们可以把这其中的每一步都拆分成一个任务,每个任务都以非阻塞的方式运行。这里 I/O 事件充当了触发器的角色,想想前端火的一塌糊涂的响应式编程,不正是这样吗?
Reactor Pattern
基于这个设想,就诞生了 Reactor Pattern。Reactor Pattern 首先是事件驱动的,它有两个主要角色:
-
Reactor,响应 I/O 事件并分发给对应的 Handler
-
Handlers,非阻塞地执行相关操作
从简单到复杂,Reactor Pattern 也有很多版本:单线程、多线程以及主从结构。纯理论总是看得人头疼,我们就以一个 echo server 为例,一点点升级它来说明 Reactor Pattern 的各种版本好了。所谓 echo server,就是客户端发什么服务端就回什么,这里我们约定以#
字符标识一条完整的消息。
单线程
single-thread-reactor.png如上图所示,在单线程版本中, Reactor 和 Handlers 共享一个线程,因此连接的建立、数据的读取和处理以及发送响应均在同一个线程中完成。这个版本的好处是编码简单,弊端也是显而易见的,一旦某个环节是一个长时间执行的任务,所有的操作都会被阻塞;并且,单线程也无法发挥多核 CPU 的优势。
@Slf4j
public class SingleThreadedReactor implements Runnable {
private final int port;
public SingleThreadedReactor(int port) {
this.port = port;
}
@Override
public void run() {
Selector sel = null;
ServerSocketChannel ssc = null;
try {
// 开启Server端通道
ssc = ServerSocketChannel.open();
// 配置非阻塞
ssc.configureBlocking(false);
// 注册一个Acceptor来处理连接
sel = Selector.open();
// Acceptor作为attachment进行挂载
ssc.register(sel, SelectionKey.OP_ACCEPT, new Acceptor(sel, ssc));
// 绑定端口
ssc.bind(new InetSocketAddress(port));
log.info("Server启动成功,正在监听[{}]端口", port);
// 轮询
iAmListening(sel);
} catch (IOException ex) {
log.error("Server启动失败", ex);
} finally {
// close ssc/sel omitted...
}
}
private void iAmListening(Selector sel) {
// 轮询
while (!Thread.interrupted()) {
try {
int num = sel.select();
// 有事件则处理
if (num > 0) {
Iterator<SelectionKey> iter = sel.selectedKeys().iterator();
// 逐个处理
while (iter.hasNext()) {
SelectionKey sk = iter.next();
// 派发事件
dispatch(sk);
// 处理过即移除,避免重复处理
iter.remove();
}
}
} catch (IOException ex) {
log.error("监听失败", ex);
}
}
}
private void dispatch(SelectionKey sk) {
// 事件处理程序都作为attachment挂载
// 在SelectionKey上,并且包装成Runnable
Object attachment = sk.attachment();
if (attachment instanceof Runnable) {
((Runnable) attachment).run();
}
}
}
SingleThreadedReactor#run()
由两大块构成,一是配置服务端的ServerSocketChannel
,这一步的核心是挂载Acceptor
(一个专门用来处理客户端接入的 Handler );二是轮询 I/O 事件,一旦有事件需要处理就进行派发。
@Slf4j
public class Acceptor implements Runnable {
private final Selector sel;
private final ServerSocketChannel ssc;
public Acceptor(Selector sel, ServerSocketChannel ssc) {
this.sel = sel;
this.ssc = ssc;
}
@Override
public void run() {
try {
// 获取与客户端通信的SocketChannel
SocketChannel sc = ssc.accept();
log.info("检测到新的客户端连接: {}", sc.getRemoteAddress());
// Handler同样是一个Runnable,用来
// 处理与客户端之间的 I/O 操作
new Handler(sc, sel);
} catch (IOException ex) {
log.error("处理连接失败", ex);
}
}
}
Acceptor
的职责是处理客户端的接入,一旦有新的客户端加入就建立起通道并将其注册到 Selector 上(通过 Handler 的构造函数)。
@Slf4j
public class Handler implements Runnable {
enum State {
READING,
SENDING
}
private final SocketChannel sc;
private final SelectionKey sk;
// 初始状态为可读
private State state = State.READING;
private final ByteBuffer input = ByteBuffer.allocate(256);
private final ByteBuffer output = ByteBuffer.allocate(256);
public Handler(SocketChannel sc, Selector sel) {
this.sc = sc;
try {
// 配置成非阻塞模式
sc.configureBlocking(false);
// 将自己也注册到Selector上,如此即可供Reactor派发
this.sk = sc.register(sel, SelectionKey.OP_READ, this);
// 注册完毕,让select()调用即刻返回,看看此时有没有事件要处理
sel.wakeup();
} catch (IOException ex) {
throw new RuntimeException("Unable to handler event", ex);
}
}
@Override
public void run() {
try {
// 根据状态做处理
if (state == State.READING) {
read();
} else {
write();
}
} catch (IOException ex) {
throw new RuntimeException("Unable to handler event", ex);
}
}
private void read() throws IOException {
log.info("正在读取客户端上送的信息");
// 读取数据
int oldPos = input.position();
sc.read(input);
int newPos = input.position();
// 以 # 作为结束符,表示一条完整的消息
for (int i = oldPos; i < newPos; i++) {
byte b = input.get(i);
// 已读到一条消息,进行处理
if (b == '#') {
process(i);
log.info("处理完毕,准备发送响应信息");
state = State.SENDING;
sk.interestOps(SelectionKey.OP_WRITE);
}
}
}
private void write() throws IOException {
output.flip();
sc.write(output);
output.clear();
state = State.READING;
sk.interestOps(SelectionKey.OP_READ);
}
private void process(int index) throws IOException {
byte[] bytes = new byte[index + 1];
input.flip();
input.get(bytes);
input.compact();
// # 删掉
String resp = new String(bytes, StandardCharsets.UTF_8)
.replace("#", "");
log.info("接受到客户端[{}]的信息: {}", sc.getRemoteAddress(), resp);
// 附加一点内容
output.put("Server says: ".getBytes(StandardCharsets.UTF_8));
// 其它信息原样返回
output.put(resp.getBytes(StandardCharsets.UTF_8));
}
}
Handler
负责处理 read -> decode -> process -> encode -> send 这一系列流程。在我们的例子里,只要没有读到#
字符,就会一直读下去,直到客户端发送#
字符告诉服务端一条完整的消息已经发送完了,服务端就着手进行处理并发送响应消息。
多线程
multi-thread-reactor.png多线程版本对 Handler 进行了优化,通过增加 worker threads 来处理非 I/O 操作,如此一来即使有耗时操作 Handler 也不会阻塞住 Reactor 了;同时多个 worker threads 也能更好地发挥多核 CPU 的优势。
@Slf4j
public class Handler implements Runnable {
enum State {
READING,
SENDING
}
private final SocketChannel sc;
private final SelectionKey sk;
// 初始状态为可读
private State state = State.READING;
private final ByteBuffer input = ByteBuffer.allocate(256);
private final ByteBuffer output = ByteBuffer.allocate(256);
// 线程池,用来处理所有入站事件
private final ExecutorService executor = new ThreadPoolExecutor(2,
8,
5, TimeUnit.MINUTES,
new ArrayBlockingQueue<>(100),
new ThreadPoolExecutor.CallerRunsPolicy());
public Handler(SocketChannel sc, Selector sel) {
this.sc = sc;
try {
// 配置成非阻塞模式
sc.configureBlocking(false);
// 将自己也注册到Selector上,如此即可供Reactor派发
this.sk = sc.register(sel, SelectionKey.OP_READ, this);
// 注册完毕,让select()调用即刻返回,看看此时有没有事件要处理
sel.wakeup();
} catch (IOException ex) {
throw new RuntimeException("Unable to handler event", ex);
}
}
@Override
public void run() {
try {
// 根据状态做处理
if (state == State.READING) {
// 读取数据
int oldPos = read();
// 提交到线程池,异步处理
executor.execute(() -> {
log.info("当前处理的线程: {}", Thread.currentThread().getName());
try {
process(oldPos);
} catch (IOException ex) {
throw new RuntimeException("Unable to read data", ex);
}
});
} else {
// decode -> process -> encode
// 以上流程已经在 state == State.READING 时处理好了,此时直接发送就可以了,不用异步
write();
}
} catch (IOException ex) {
throw new RuntimeException("Unable to handler event", ex);
}
}
private synchronized int read() throws IOException {
log.info("正在读取客户端上送的信息");
// 读取数据
int oldPos = input.position();
sc.read(input);
return oldPos;
}
private synchronized void write() throws IOException {
log.info("正在给客户端返回响应信息");
output.flip();
sc.write(output);
output.clear();
state = State.READING;
sk.interestOps(SelectionKey.OP_READ);
sk.selector().wakeup();
}
private synchronized void process(int oldPos) throws IOException {
// 计算读取的长度
int newPos = input.position();
// 以 # 作为结束符,表示一条完整的消息
for (int i = oldPos; i < newPos; i++) {
byte b = input.get(i);
// 已读到一条消息,进行处理
if (b == '#') {
byte[] bytes = new byte[i + 1];
input.flip();
input.get(bytes);
input.compact();
// # 删掉
String resp = new String(bytes, StandardCharsets.UTF_8)
.replace("#", "");
log.info("接受到客户端[{}]的信息: {}", sc.getRemoteAddress(), resp);
// 附加一点内容
output.put("Server says: ".getBytes(StandardCharsets.UTF_8));
// 其它信息原样返回
output.put(resp.getBytes(StandardCharsets.UTF_8));
log.info("处理完毕,准备发送响应信息");
state = State.SENDING;
sk.interestOps(SelectionKey.OP_WRITE);
sk.selector().wakeup();
}
}
}
}
多线程版本的Reactor
和Acceptor
和单线程版本是一样的,区别只在 Handler 中引入了线程池,这些 worker threads 包办了流程中的 decode -> process -> encode。
主从结构
master-slave.png主从结构引入了多个 Reactor,将客户端接入和 socket 读、写进行了解耦:MainReactor 负责处理客户端的接入,而 SubReactor(s) 则负责在发生读、写事件时进行数据处理。
@Slf4j
public class MainReactor implements Runnable {
private final int port;
// 子Reactor数量
private final int numberOfSubReactors;
public MainReactor(int port, int numberOfSubReactors) {
this.port = port;
this.numberOfSubReactors = numberOfSubReactors;
}
@Override
public void run() {
Selector sel = null;
ServerSocketChannel ssc = null;
try {
// 开启Server端通道
ssc = ServerSocketChannel.open();
// 配置非阻塞
ssc.configureBlocking(false);
// 注册一个Acceptor来处理连接
sel = Selector.open();
// Acceptor作为attachment进行挂载
ssc.register(sel, SelectionKey.OP_ACCEPT, new Acceptor(ssc, numberOfSubReactors));
// 绑定端口
ssc.bind(new InetSocketAddress(port));
log.info("Server启动成功,正在监听[{}]端口", port);
// 轮询
iAmListening(sel);
} catch (IOException ex) {
log.error("Server启动失败", ex);
} finally {
// close ssc/sel omitted...
}
}
private void iAmListening(Selector sel) {
// 轮询
while (!Thread.interrupted()) {
try {
int num = sel.select();
// 有事件则处理
if (num > 0) {
Iterator<SelectionKey> iter = sel.selectedKeys().iterator();
// 逐个处理
while (iter.hasNext()) {
SelectionKey sk = iter.next();
// 派发事件
dispatch(sk);
// 处理过即移除,避免重复处理
iter.remove();
}
}
} catch (IOException ex) {
log.error("监听失败", ex);
}
}
}
private void dispatch(SelectionKey sk) {
// 事件处理程序都作为attachment挂载
// 在SelectionKey上,并且包装成Runnable
Object attachment = sk.attachment();
if (attachment instanceof Runnable) {
((Runnable) attachment).run();
}
}
}
MainReactor#run()
和多线程版本的大体相同,区别主要在Acceptor
。
@Slf4j
public class Acceptor implements Runnable {
private int next = 0;
private final Selector[] selectors;
private final ServerSocketChannel ssc;
public Acceptor(ServerSocketChannel ssc, int numberOfSubReactors) throws IOException {
this.ssc = ssc;
// 每个SubReactor持有一个Selector,互不干扰,避免了潜在的线程同步需求
Selector[] selectors = new Selector[numberOfSubReactors];
SubReactor[] subReactors = new SubReactor[numberOfSubReactors];
for (int i = 0; i < numberOfSubReactors; i++) {
Selector sel = Selector.open();
selectors[i] = sel;
subReactors[i] = new SubReactor(sel);
// 启动一个独立的线程处理SubReactor
// SubReactor不处理连接请求,只处理OP_READ/OP_WRITE
Thread thread = new Thread(subReactors[i]);
thread.setName("SubReactor-thread-" + i);
thread.start();
}
this.selectors = selectors;
}
@Override
public void run() {
try {
// 获取与客户端通信的SocketChannel
SocketChannel sc = ssc.accept();
log.info("检测到新的客户端连接: {},当前线程: {}", sc.getRemoteAddress(), Thread.currentThread().getName());
// 通过简单的轮询算法,给每一个SocketChannel分配一个Selector
// Selector关联了SubReactor,而SubReactor又关联了一个独立的
// 线程,这样每个SocketChannel上的发生的事件就有对应的线程来处理了
new Handler(sc, selectors[next]);
if (++next == selectors.length) next = 0;
} catch (IOException ex) {
log.error("处理连接失败", ex);
}
}
}
主从结构的Acceptor
就略微有点复杂了,它的主要工作有二:其一是初始化多个 SubReactor,每个 SubReactor 都绑定一个 Selector,并且运行在独立的线程中;其二在建立新的通道时,将其分配给某一个 SubReactor。这样做的好处在于分摊了负载,更好地均衡了 CPU 和 I/O 速率。
@Slf4j
public class SubReactor implements Runnable {
private final Selector sel;
public SubReactor(Selector sel) {
this.sel = sel;
}
@Override
public void run() {
// 轮询
while (!Thread.interrupted()) {
try {
int num = sel.select(1000);
// 有事件则处理
if (num > 0) {
Iterator<SelectionKey> iter = sel.selectedKeys().iterator();
// 逐个处理
while (iter.hasNext()) {
SelectionKey sk = iter.next();
// 派发事件
dispatch(sk);
// 处理过即移除,避免重复处理
iter.remove();
}
}
} catch (IOException ex) {
log.error("监听失败", ex);
}
}
}
private void dispatch(SelectionKey sk) {
if (!sk.isValid()) return;
String ops = "";
if (sk.isReadable()) {
ops = "有新数据可读";
}
if (sk.isWritable()) {
if (ops.length() != 0) {
ops = ops + ","+ "有数据可写";
} else {
ops = "有数据可写";
}
}
log.info("检测到新的事件: {},当前线程: {}", ops, Thread.currentThread().getName());
// 事件处理程序都作为attachment挂载
// 在SelectionKey上,并且包装成Runnable
Object attachment = sk.attachment();
if (attachment instanceof Runnable) {
((Runnable) attachment).run();
}
}
}
SubReactor 和 MainReactor 类似,它也维护了一个事件循环,用来派发 socket 读、写事件。Handler 和多线程版本保持一致,同样使用了线程池来处理非 I/O 任务,完整的示例看这里。
Netty 对 Reactor Pattern 的支持
Netty 对 Reactor Pattern 的几个变种都有支持,简单配置一下 ServerBootstrap 即可实现。
Version | Implementation |
---|---|
单线程 | NioEventLoopGroup group = new NioEventLoopGroup(1); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(group); |
多线程 | NioEventLoopGroup group = new NioEventLoopGroup(3); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(group); |
主从结构 | NioEventLoopGroup bossGroup = new NioEventLoopGroup(2); NioEventLoopGroup workerGroup = new NioEventLoopGroup(4); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup); |
网友评论