一、单线程Reactor线程模型
1.单线程Reactor线程模型:新连接的接入、数据的读写都是用一个线程:
public class SingleReactor implements Runnable{
//单线程reactor 模型:1.一个线程绑定一个selector 和一个serverSocketChannel
ServerSocketChannel serverSocketChannel;
Selector selector;
//OPEN&注册accepet
public void open() throws IOException{
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
//设置非阻塞
serverSocketChannel.configureBlocking(false);
//绑定ip&端口
serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 8080));
//将channel注册到selector并拿到选择键(channel注册的标识)
SelectionKey register = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
register.attach(new AcceptHandle());
}
//轮训注册的事件&分发
@Override
public void run() {
try {
open();
} catch (IOException e1) {
e1.printStackTrace();
}
try {
//循环查询感兴趣的时间
while (!Thread.interrupted()) {
try {
//1.阻塞去查
selector.select();
//2.拿到查询结果(注册的标识)
Set<SelectionKey> keys = selector.selectedKeys();
//3.迭代感兴趣的key
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey next = iterator.next();
dispatch(next);
}
keys.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
//分发&拿到绑定的处理器
void dispatch(SelectionKey next){
//1.拿出SelectionKey
Runnable attachment = (Runnable) next.attachment();
//2.调用handle处理器
if (attachment != null) {
attachment.run();
}
}
//接受连接处理器
//连接处理器&为新连接创造一个输入输出的Handle处理器
class AcceptHandle implements Runnable{
@Override
public void run() {
try {
//1.接受新连接&调用下一个处理器注册读取事件
SocketChannel accept = serverSocketChannel.accept();
new IOEchoHandler(selector,accept);
} catch (IOException e) {
e.printStackTrace();
}
}
}
//处理具体的IO时间处理器
class IOEchoHandler implements Runnable{
SocketChannel channel;//读写channel
SelectionKey register;//channel注册结果返回的标识
static final int RECIEVING = 0, SENDING = 1;
int state = RECIEVING;
//读写数据的缓存区
final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
IOEchoHandler(Selector selector, SocketChannel c) throws IOException {
channel = c;
c.configureBlocking(false);//设置阻塞
//仅仅取得选择键,后设置感兴趣的IO事件
register = channel.register(selector, 0);
//将Handler作为选择键的附件
register.attach(this);
//第二步,注册Read就绪事件
register.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
@Override
public void run() {
try {
if (state == SENDING) {
//写入通道
channel.write(byteBuffer);
//写完后,准备开始从通道读,byteBuffer切换成写模式
byteBuffer.clear();
//写完后,注册read就绪事件
register.interestOps(SelectionKey.OP_READ);
//写完后,进入接收的状态
state = RECIEVING;
} else if (state == RECIEVING) {//一开始是接受情况
//从通道读到byteBuffer
int length = 0;
while ((length = channel.read(byteBuffer)) > 0) {
System.out.println(new String(byteBuffer.array(), 0, length));
}
//读完后,准备开始写入通道,byteBuffer切换成读模式
byteBuffer.flip();
//读完后,注册write就绪事件
register.interestOps(SelectionKey.OP_WRITE);
//读完后,进入发送的状态
state = SENDING;
}
//处理结束了, 这里不能关闭select key,需要重复使用
//sk.cancel();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
new Thread(new SingleReactor()).start();
}
}
二、多线程Reactor线程模型
1.多线程Reactor线程模型:一个线程用于新连接的接入、数据的读写用一个线程池:
class MultiThreadEchoHandler implements Runnable {
final SocketChannel channel;
final SelectionKey sk;
final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
static final int RECIEVING = 0, SENDING = 1;
int state = RECIEVING;
//引入线程池
static ExecutorService pool = Executors.newFixedThreadPool(4);
MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {
channel = c;
c.configureBlocking(false);
//仅仅取得选择键,后设置感兴趣的IO事件
sk = channel.register(selector, 0);
//将本Handler作为sk选择键的附件,方便事件dispatch
sk.attach(this);
//向sk选择键注册Read就绪事件
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
//线程处理handle
public void run() {
//异步任务,在独立的线程池中执行
pool.execute(new AsyncTask());
}
//异步任务,不在Reactor线程中执行
public synchronized void asyncRun() {
try {
if (state == SENDING) {
//写入通道
channel.write(byteBuffer);
//写完后,准备开始从通道读,byteBuffer切换成写模式
byteBuffer.clear();
//写完后,注册read就绪事件
sk.interestOps(SelectionKey.OP_READ);
//写完后,进入接收的状态
state = RECIEVING;
} else if (state == RECIEVING) {
//从通道读
int length = 0;
while ((length = channel.read(byteBuffer)) > 0) {
System.out.println(new String(byteBuffer.array(), 0, length));
}
//读完后,准备开始写入通道,byteBuffer切换成读模式
byteBuffer.flip();
//读完后,注册write就绪事件
sk.interestOps(SelectionKey.OP_WRITE);
//读完后,进入发送的状态
state = SENDING;
}
//处理结束了, 这里不能关闭select key,需要重复使用
//sk.cancel();
} catch (IOException ex) {
ex.printStackTrace();
}
}
//异步任务的内部类
class AsyncTask implements Runnable {
public void run() {
MultiThreadEchoHandler.this.asyncRun();
}
}
}
三、主从线程Reactor线程模型
1.主从线程Reactor线程模型:一个连接池用于新连接的接入、数据的读写用另一个线程池:
class MultiThreadEchoServerReactor {
ServerSocketChannel serverSocket;
AtomicInteger next = new AtomicInteger(0);
//selectors集合,引入多个selector选择器
Selector[] selectors = new Selector[2];
//引入多个子反应器
SubReactor[] subReactors = null;
MultiThreadEchoServerReactor() throws IOException {
//初始化多个selector选择器
selectors[0] = Selector.open();
selectors[1] = Selector.open();
serverSocket = ServerSocketChannel.open();
InetSocketAddress address =
new InetSocketAddress("127.0.0.1",8080);
serverSocket.socket().bind(address);
//非阻塞
serverSocket.configureBlocking(false);
//第一个selector,负责监控新连接事件
SelectionKey sk = serverSocket.register(selectors[0], SelectionKey.OP_ACCEPT);
//附加新连接处理handler处理器到SelectionKey(选择键)
sk.attach(new AcceptorHandler());
//构建两个反应器
//第一个子反应器,一子反应器负责一个选择器
SubReactor subReactor1 = new SubReactor(selectors[0]);
//第二个子反应器,一子反应器负责一个选择器
SubReactor subReactor2 = new SubReactor(selectors[1]);
subReactors = new SubReactor[]{subReactor1, subReactor2};
}
//开启两个反应器线程
private void startService() {
// 一子反应器对应一条线程
new Thread(subReactors[0]).start();
new Thread(subReactors[1]).start();
}
//反应器&分发
class SubReactor implements Runnable {
//每条线程负责一个选择器的查询
final Selector selector;
public SubReactor(Selector selector) {
this.selector = selector;
}
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> keySet = selector.selectedKeys();
Iterator<SelectionKey> it = keySet.iterator();
while (it.hasNext()) {
//Reactor负责dispatch收到的事件
SelectionKey sk = it.next();
dispatch(sk);
}
keySet.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
void dispatch(SelectionKey sk) {
Runnable handler = (Runnable) sk.attachment();
//调用之前attach绑定到选择键的handler处理器对象
if (handler != null) {
handler.run();
}
}
}
// Handler:新连接处理器
class AcceptorHandler implements Runnable {
public void run() {
try {
SocketChannel channel = serverSocket.accept();
if (channel != null)
new MultiThreadEchoHandler(selectors[next.get()], channel);
} catch (IOException e) {
e.printStackTrace();
}
if (next.incrementAndGet() == selectors.length) {
next.set(0);
}
}
}
public static void main(String[] args) throws IOException {
MultiThreadEchoServerReactor server =
new MultiThreadEchoServerReactor();
server.startService();
}
}
网友评论