概述
Reactor反应器模式在高性能网络编程中非常重要,高性能web服务器Nginx、高性能通信框架Netty都是基于反应器模式的。
反应器模式是高性能网络编程的必知、必会的模式。
Reactor 线程模型
反应器模式由Reactor反应器线程、Handlers处理器两大角色组成
- Reactor 反应器
负责查询IO事件,当检测到一个IO事件,将其发送给相应的Handler处理器去处理。这里的IO事件,就是NIO中选择器监控的通道IO事件 - Handlers 处理器
与IO事件(或者选择键)绑定,负责IO事件的处理。完成真正的连接建立、通道的读取、处理业务逻辑、负责将结果写出到通道等
单线程Reactor反应器
单线程Reactor反应器模式,即Reactor反应器和Handers处理器处于一个线程中执行。它是最简单的反应器模式,处理图如下

在反应器模式中,需要用到SelectionKey中的attach和attachment方法,attach和attachment结合使用:在选择键注册完成之后,调用attach方法,将Handler处理器绑定到选择键;当事件发生时,调用attachment方法,可以从选择键取出Handler处理器,将事件分发到Handler处理器中,完成业务处理。
通过示例加深一下印象,创建ReactorDemo.java
package com.zhxin.nettylab.reactor.chapter1;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* @ClassName ReactorDemo
* @Description //单线程Reactor反应器模式 Demo
* @Author singleZhang
* @Email 405780096@qq.com
* @Date 2020/12/4 0004 下午 3:34
**/
public class ReactorDemo implements Runnable{
private static Selector selector;
private static ServerSocketChannel serverSocketChannel;
ReactorDemo() throws IOException {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open(); //打开ServerSocketChannel,获取通道
serverSocketChannel.configureBlocking(false); //设为非阻塞
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(8989)); //将该通道对应的serverSocket绑定到port端口
SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//将通道注册到选择器上,监听"接收连接"事件
sk.attach(new AcceptHandler());
}
public void run(){
try{
// 选择器轮询
while (! Thread.interrupted()){
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while(keyIterator.hasNext()){
SelectionKey key = keyIterator.next();
dispatch(key);
selectionKeys.clear();
}
}
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 反应器的分发
* */
private void dispatch(SelectionKey key){
Runnable rKey = (Runnable) key.attachment();
if(rKey != null){
rKey.run();
}
}
/**
* 新连接 处理器
* */
class AcceptHandler implements Runnable{
public void run(){
System.out.println("开始 新连接 处理");
try{
//接受新连接
SocketChannel socket = serverSocketChannel.accept();
if(socket != null){
// 为新连接创建一个输入输出的Handler处理器
new IOHandler(selector,socket);
}
}catch (Exception e){
e.printStackTrace();
}
}
}
}
创建IOHandler.java
package com.zhxin.nettylab.reactor.chapter1;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
/**
* @ClassName IOHandler
* @Description //输入输出处理器
* @Author singleZhang
* @Email 405780096@qq.com
* @Date 2020/12/4 0004 下午 4:05
**/
public class IOHandler implements Runnable {
private final static int MAX_IN = 1024;
private final static int MAX_OUT = 1024;
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(MAX_IN);
ByteBuffer output = ByteBuffer.allocate(MAX_OUT);
static final int READING = 0, SENDING = 1;
int state = READING;
IOHandler(Selector sel, SocketChannel c)
throws IOException {
socket = c;
c.configureBlocking(false);
// Optionally try first read now
sk = socket.register(sel, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
boolean inputIsComplete() { /* ... */ return true;}
boolean outputIsComplete() { /* ... */ return true;}
void process() { /* ... */ }
public void run(){
try {
if (state == READING) read();
else if (state == SENDING) send();
} catch (IOException ex) { /* ... */ }
}
void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// Normally also do first write now
sk.interestOps(SelectionKey.OP_WRITE);
}
}
void send() throws IOException {
socket.write(output);
if (outputIsComplete()) sk.cancel();
}
}
上边代码中的AcceptorHandler处理器为内部类,在注册serverSocket服务监听连接的接受事件之后,创建一个AcceptorHandler新连接处理器的实例,作为附件,被设置(attach)到了SelectionKey中,它主要作用一是接受新连接,二是在为新连接创建一个输入输出的Handler处理器
当监听到新连接事件发生后,会通过选择器轮询事件分发,取出了之前attach到SelectionKey中的Handler业务处理器,进行socket的各种IO处理,为读写操作创建IOHandler。
IOHandler,顾名思义,就是负责socket的数据输入、业务处理、结果输出。
※代码参考:http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
小结
在高性能服务器应用场景中,单线程反应器模式实际使用的很少,这里做个了解,熟悉一下场景即可。
多线程的Reactor反应器模式
多线程Reactor反应器的演进,分为两个方面:
- 首先是升级Handler处理器。既要使用多线程,又要尽可能的高效率,则可以考虑使用线程池。
- 其次是升级Reactor反应器。可以考虑引入多个Selector选择器,提升选择大量通道的能力。
多线程Reactor反应器模式,通过示例代码加深一下印象,创建MultiThreadEchoServerReactor.java
package com.zhxin.nettylab.reactor.chapter2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @ClassName MultiThreadEchoServerReactor
* @Description // 多线程Reactor反应器 demo
* @Author singleZhang
* @Email 405780096@qq.com
* @Date 2020/12/4 0004 下午 4:25
**/
public class MultiThreadEchoServerReactor {
private ServerSocketChannel serverSocket;
private AtomicInteger next = new AtomicInteger(0);
//selectors集合,引入多个selector选择器
Selector[] selectors = new Selector[2];
//引入多个子反应器
private SubReactor[] subReactors = null;
private MultiThreadEchoServerReactor() throws IOException {
//初始化多个选择器
selectors[0] = Selector.open();
selectors[1] = Selector.open();
serverSocket = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(8989);
serverSocket.socket().bind(address);
serverSocket.configureBlocking(false);//非阻塞
//让第一个选择器负责监听 新连接事件
SelectionKey sk = serverSocket.register(selectors[0],SelectionKey.OP_ACCEPT);
//为选择键绑定handler
sk.attach(new AcceptHandler());
//一个子选择器负责一个反应器
SubReactor subReactor1 = new SubReactor(selectors[0]);
SubReactor subReactor2 = new SubReactor(selectors[1]);
subReactors = new SubReactor[]{subReactor1,subReactor2};
}
private void startService(){
//一个子选择器 对应一个线程
new Thread(subReactors[0]);
new Thread(subReactors[1]);
}
/**
* 新连接 处理器
* */
class AcceptHandler implements Runnable{
public void run(){
try{
//接受新连接,进行业务处理
SocketChannel socket = serverSocket.accept();
if(socket != null){
// MultiThreadEchoHandler
new MultiThreadEchoHandler(selectors[next.get()],socket);
}
if(next.incrementAndGet() == selectors.length){
next.set(0);
}
}catch (Exception e){
/*...*/
}
}
}
/**
* 子反应器
* */
static class SubReactor implements Runnable{
//每子选择器对应的线程负责一个选择器的查询和选择
final Selector selector;
SubReactor(Selector selector){
this.selector = selector;
}
public void run(){
try{
while (!Thread.interrupted()){
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while(keyIterator.hasNext()){
SelectionKey key = keyIterator.next();
dispatch(key);
}
selectionKeys.clear();
}
}catch (Exception e){
/* ... */
}
}
/**
* 反应器的分发
* */
private void dispatch(SelectionKey key){
//调用之前绑定的Handler
Runnable rKey = (Runnable) key.attachment();
if(rKey != null){
rKey.run();
}
}
}
public static void main(String[] args) throws IOException {
MultiThreadEchoServerReactor server = new MultiThreadEchoServerReactor();
server.startService();
}
}
创建业务处理器MultiThreadEchoHandler.java
package com.zhxin.nettylab.reactor.chapter2;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @ClassName MultiThreadEchoHandler
* @Description //MultiThreadEchoHandler 新连接接收后的业务处理器
* @Author singleZhang
* @Email 405780096@qq.com
* @Date 2020/12/4 0004 下午 4:52
**/
public class MultiThreadEchoHandler implements Runnable {
private final SocketChannel channel;
private final SelectionKey sk;
private final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
private static final int RECIEVING = 0, SENDING = 1;
private int state = RECIEVING;
//引入线程池
private static ExecutorService pool = Executors.newFixedThreadPool(4);
MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {
channel = c;
c.configureBlocking(false);
//仅仅取得选择键,后设置感兴趣的IO事件
sk = channel.register(selector, 0);
//将本Handler作为sk选择键的附件,方便事件dispatch
sk.attach(this);
//向sk选择键注册Read就绪事件
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
public void run(){
//异步任务,在独立的线程池中执行
pool.execute(new AsyncTask());
}
//异步任务,不在Reactor线程中执行
private synchronized void asyncRun() {
try {
if (state == SENDING) {
//写入通道
channel.write(byteBuffer);
//写完后,准备开始从通道读,byteBuffer切换成写模式
byteBuffer.clear();
//写完后,注册read就绪事件
sk.interestOps(SelectionKey.OP_READ);
//写完后,进入接收的状态
state = RECIEVING;
} else if (state == RECIEVING) {
//从通道读
int length = 0;
while ((length = channel.read(byteBuffer)) > 0) {
System.out.println(new String(byteBuffer.array(), 0, length));
}
//读完后,准备开始写入通道,byteBuffer切换成读模式
byteBuffer.flip();
//读完后,注册write就绪事件
sk.interestOps(SelectionKey.OP_WRITE);
//读完后,进入发送的状态
state = SENDING;
}
//处理结束了, 这里不能关闭select key,需要重复使用
//sk.cancel();
} catch (IOException ex) {
ex.printStackTrace();
}
}
//异步任务的内部类
class AsyncTask implements Runnable {
public void run() {
MultiThreadEchoHandler.this.asyncRun();
}
}
}
创建客户端MultiEchoClient.java
package com.zhxin.nettylab.reactor.chapter2;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
/**
* @ClassName MultiEchoClient
* @Description //多线程Reactor反应器 demo 客户端
* @Author singleZhang
* @Email 405780096@qq.com
* @Date 2020/12/4 0004 下午 5:07
**/
public class MultiEchoClient {
public void start() throws IOException {
InetSocketAddress address =
new InetSocketAddress("localhost", 8989);
SocketChannel socketChannel = SocketChannel.open(address);//获取通道(channel)
socketChannel.configureBlocking(false); //切换成非阻塞模式
//不断的自旋、等待连接完成,或者做一些其他的事情
while (!socketChannel.finishConnect()) {
}
System.out.println("客户端启动成功!");
//启动接受线程
Processer processer = new Processer(socketChannel);
new Thread(processer).start();
}
static class Processer implements Runnable {
final Selector selector;
final SocketChannel channel;
Processer(SocketChannel channel) throws IOException {
//Reactor初始化
selector = Selector.open();
this.channel = channel;
channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
SelectionKey sk = it.next();
if (sk.isWritable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
Scanner scanner = new Scanner(System.in);
System.out.println("请输入发送内容:");
if (scanner.hasNext()) {
SocketChannel socketChannel = (SocketChannel) sk.channel();
String next = scanner.next();
buffer.put(("now time:"+System.currentTimeMillis() + " >>" + next).getBytes());
buffer.flip();
// 操作三:通过DatagramChannel数据报通道发送数据
socketChannel.write(buffer);
buffer.clear();
}
}
if (sk.isReadable()) {
// 若选择键的IO事件是“可读”事件,读取数据
SocketChannel socketChannel = (SocketChannel) sk.channel();
//读取数据
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int length = 0;
while ((length = socketChannel.read(byteBuffer)) > 0) {
byteBuffer.flip();
System.out.println("server echo:" + new String(byteBuffer.array(), 0, length));
byteBuffer.clear();
}
}
//处理结束了, 这里不能关闭select key,需要重复使用
//selectionKey.cancel();
}
selected.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
new MultiEchoClient().start();
}
}
好了,以上就是多线程Reactor反应器模式 示例demo代码。
总结
反应器模式和生产者消费者模式对比
相似之处:在一定程度上,反应器模式有点类似生产者消费者模式。在生产者消费者模式中,一个或多个生产者将事件加入到一个队列中,一个或多个消费者主动地从这个队列中提取(Pull)事件来处理。不同之处在于:反应器模式是基于查询的,没有专门的队列去缓冲存储IO事件,查询到IO事件之后,反应器会根据不同IO选择键(事件)将其分发给对应的Handler处理器来处理。
反应器模式和观察者模式(Observer Pattern)对比
相似之处在于:在反应器模式中,当查询到IO事件后,服务处理程序使用单路/多路分发(Dispatch)策略,同步地分发这些IO事件。观察者模式(ObserverPattern)也被称作发布/订阅模式,它定义了一种依赖关系,让多个观察者同时监听某一个主题(Topic)。这个主题对象在状态发生变化时,会通知所有观察者,它们能够执行相应的处理。不同之处在于:在反应器模式中,Handler处理器实例和IO事件(选择键)的订阅关系,基本上是一个事件绑定到一个Handler处理器;每一个IO事件(选择键)被查询后,反应器会将事件分发给所绑定的Handler处理器;而在观察者模式中,同一个时刻,同一个主题可以被订阅过的多个观察者处理。
反应器模式的优点如下:
- 响应快,虽然同一反应器线程本身是同步的,但不会被单个连接的同步IO所阻塞;
- 编程相对简单,最大程度避免了复杂的多线程同步,也避免了多线程的各个进程之间切换的开销;
- 可扩展,可以方便地通过增加反应器线程的个数来充分利用CPU资源。
网友评论