图解IO模型——BIO,NIO,AIO
服务架构模式——TPC/PPC模式
服务架构模式——单Reactor模式
服务架构模式——多Reactor模式
服务架构模式——Proactor模式
随着连接数量和请求数量增加,一个Reactor处理不过来时就需要用多Reactor模式,采用一个main reactor + n个sub reactor进行事件分发,main reactor负责分发连接事件,sub reactor负责分发IO事件
main reactor一直在监听连接事件,如果有连接建立,main reactor通过 accept 方法获取已连接的SocketChannel,然后按照一定的算法选取一个sub reactor,并把SocketChannel交给选定的sub reactor处理。
Main Reactor
main reactor只监听连接事件,如果收到连接事件就交给Acceptor处理
public class MainReacator implements Runnable{
public static void main(String[] args) {
new Thread(new MainReacator()).start();
}
@Override
public void run() {
//初始化sub reactor
SubReactor[] subReactors = new SubReactor[5];
try {
for (int i = 0; i < subReactors.length; i++) {
subReactors[i] = new SubReactor();
new Thread(subReactors[i]).start();
}
}
catch (Exception ex){
ex.printStackTrace();
}
try (Selector serverSelector = Selector.open()){ //创建选择器
/*-------------------创建server Channel,监听端口-------------**/
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.configureBlocking(false);
serverSocket.bind(new InetSocketAddress(InetAddress.getLocalHost(), Constant.port), 100); //绑定监听端口,设定最大连接缓冲数为100
/*-------------------注册感兴趣的TCP连接事件-------------**/
Acceptor acceptor = new Acceptor(subReactors, serverSocket);
serverSocket.register(serverSelector, SelectionKey.OP_ACCEPT, acceptor);
/*-------------------serverSelector监听连接事件,处理连接请求-------------**/
while (true){
serverSelector.select(); //阻塞事件
Set<SelectionKey> selected = serverSelector.selectedKeys();
Iterator<SelectionKey> itor = selected.iterator();
while (itor.hasNext()){
SelectionKey key = itor.next();
itor.remove();
try{
Acceptor task = (Acceptor) key.attachment();
if (null != task){
task.handle(); //同步调用
}
}catch (Exception ex){
ex.printStackTrace();
key.cancel(); //一些异常状况会导致死循环,需要cancel掉监听
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
Acceptor
Acceptor按一定的算法(这里采用轮询方式)选取sub reactor, 把新建立的SocketChannel交给sub reactor处理
public class MultiAcceptor implements Handler {
private SubReactor[] subReactors;
private ServerSocketChannel serverSocket;
private int next=0;
public MultiAcceptor(SubReactor[] subReactors, ServerSocketChannel serverSocket) {
this.subReactors = subReactors;
this.serverSocket = serverSocket;
}
@Override
public void handle() {
try {
SocketChannel socketChannel = serverSocket.accept();
socketChannel.configureBlocking(false);
System.out.println("recvd connection:" + socketChannel.getRemoteAddress());
new SubReactorRequestHanlder(subReactors[next], socketChannel);
next = (++next) % subReactors.length;
}
catch (Exception ex){
ex.printStackTrace();
}
}
}
sub reactor
sub reactor只监听read,write事件,处理逻辑和 服务架构模式——Reactor模式中的reactor是一样的
网友评论