20210814_Nio多线程模型时序图分析
1概述
1.6关于Nio多线程的时序图
1.6.1服务端初始化
从左到右,依次为:Main-->MyBossHaneler-->MyRefactor
[图片上传失败...(image-9715d-1628931332173)]
image-20210814164326402.png
1.6.2MyBossHandler
1.6.3MyReactorTaskRunnable
1.6.3.1轮询Boss事件1
[图片上传失败...(image-e5bc3c-1628931332173)]
image-20210814164506381.png
1.6.3.2轮询Work事件2
[图片上传失败...(image-e58fbc-1628931332173)]
image-20210814164528239.png
1.6.4MyWorkHandler
[图片上传失败...(image-ef36df-1628931332173)]
image-20210814164647998.png
2代码实战(多线程版本)
2.1maven依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
2.2配置
2.3服务端
2.3.1MultiThreadEchoServerReactor
package com.kikop.myreactor.multipthreadapp.server;
import com.kikop.myreactor.multipthreadapp.config.NioDemoConfig;
import com.kikop.myreactor.multipthreadapp.server.accept.MyBossHandler;
import com.kikop.myreactor.multipthreadapp.server.refactor.MyReactorTaskRunnable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
/**
* @author kikop
* @version 1.0
* @project Name: myrefactor_multipthreadapp
* @file Name: EchoServerReactor
* @desc 多线程反应器(一个Runnable任务)
* @date 2021/6/22
* @time 9:30
* @by IDE: IntelliJ IDEA
*/
public class MultiThreadEchoServerReactor {
private ServerSocketChannel serverSocketChannel;
// 2个 selector 选择器
private Selector[] selectors = new Selector[2];
// 2个子反应器线程,boss,work
private MyReactorTaskRunnable[] myReactorTaskRunnables = null;
public MultiThreadEchoServerReactor() throws IOException {
// 1.初始化多个selector选择器
selectors[0] = Selector.open(); //new WindowsSelectorProvider
selectors[1] = Selector.open();
// 2.创建 serverSocketChannel
serverSocketChannel = ServerSocketChannel.open();
// 2.1.开启服务端监听
InetSocketAddress address =
new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
NioDemoConfig.SOCKET_SERVER_PORT);
serverSocketChannel.socket().bind(address);
// 2.2.设置非阻塞
serverSocketChannel.configureBlocking(false);
// 3.创建选择键
// 3.1.构建SelectionKey
// 通过第一个 selector,负责监控新连接OP_ACCEPT事件
// 对应通道:serverSocket
SelectionKey sk = serverSocketChannel.register(selectors[0], SelectionKey.OP_ACCEPT);
// 3.1.创建附件参数
MyBossHandler attachObject = new MyBossHandler(selectors, serverSocketChannel);
// 3.2.绑定附件参数:MyBossHandler
// 附加新连接处理 MyBossHandler 处理器到 SelectionKey(选择键)
sk.attach(attachObject);
// 4.构建连个反应器线程
// 4.1.第一个子反应器,一子反应器负责一个选择器
MyReactorTaskRunnable subReactor1 = new MyReactorTaskRunnable(selectors[0]);
// 4.2.第二个子反应器,一子反应器负责一个选择器
MyReactorTaskRunnable subReactor2 = new MyReactorTaskRunnable(selectors[1]);
myReactorTaskRunnables = new MyReactorTaskRunnable[]{subReactor1, subReactor2};
}
private void startService() {
System.out.println("服务端开始启动...");
// 2个线程,不断轮询、监听
new Thread(myReactorTaskRunnables[0]).start(); // as server.boss,负责:OP_ACCEPT和事件分发
new Thread(myReactorTaskRunnables[1]).start(); // as server.work,负载IO事件读写节和业务逻辑
System.out.println("服务端启动成功!");
}
public static void main(String[] args) throws IOException {
MultiThreadEchoServerReactor server =
new MultiThreadEchoServerReactor();
server.startService();
}
}
2.3.2MyBossHandler(accept连接建立,构建MyWorkHandler)
package com.kikop.myreactor.multipthreadapp.server.accept;
import com.kikop.myreactor.multipthreadapp.server.handler.MyWorkHandler;
import java.io.IOException;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author kikop
* @version 1.0
* @project Name: mynettydemo
* @file Name: BossHandler
* @desc Boss连接处理器, 等同 于netty:bossNioEventLoop
* boosGroup用于Accetpt连接建立事件
* @date 2021/6/22
* @time 10:30
* @by IDE: IntelliJ IDEA
*/
public class BossHandler
implements Runnable {
AtomicInteger next = new AtomicInteger(0);
private Selector[] selectors;
private ServerSocketChannel serverSocket;
public BossHandler(Selector[] selectors, ServerSocketChannel serverSocket) {
this.selectors = selectors;
this.serverSocket = serverSocket;
}
/**
* 事件分发 BossHandler 对应的处理逻辑
* 构建 MyWorkHandler,如果是多线程,则会将此时的 SocketChannel 注册到另外一个 selector
*/
public void run() {
try {
SocketChannel channel = serverSocket.accept();
if (channel != null) {
// 将 channel 动态分配到 selector,通道IO事件就绪时,
// 由 selector 进行分发(MyReactorTaskRunnable不断轮询监听,调用对应的 handlerXXX.run)
// 顺序选择,每次连接对应的选择器
// 第一个 selector连接压力比较大:负责连接监听 ServerSocketChannel +业务处理 SocketChannel
// 第二个 selector 连接压力比较小:只负责业务处理 SocketChannel
new MyWorkHandler(selectors[next.get()], channel);
}
} catch (IOException e) {
e.printStackTrace();
}
if (next.incrementAndGet() == selectors.length) { // 只能是:0,1
next.set(0);
}
}
}
2.3.3MyReactorTaskRunnable(所有事件分发线程)
处理附件类型如下:
如果是 ServerSocketChannel(代表服务端),则 handler为 BossHandler
如果是 SocketChannel(代表连接的某个客户端网络通道),则 handler为 MyWorkHandler
package com.kikop.myreactor.multipthreadapp.server.handler;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.Set;
/**
* @author kikop
* @version 1.0
* @project Name: mynettydemo
* @file Name: MyReactorTask
* @desc boss,work统一的线程,不同的是通道绑定的附件不一样
* @date 2021/6/22
* @time 10:30
* @by IDE: IntelliJ IDEA
*/
public class MyReactorTaskRunnable implements Runnable {
// 每个线程负责一个选择器的查询
final Selector selector;
public MyReactorTaskRunnable(Selector selector) {
this.selector = selector;
}
public void run() {
try {
while (!Thread.interrupted()) { // 非阻塞等待
// 负载某个 seleector 等待就绪事件
selector.select();
// 有读写,获取待就绪事件
Set<SelectionKey> keySet = selector.selectedKeys();
Iterator<SelectionKey> it = keySet.iterator();
while (it.hasNext()) {
// Reactor负责 dispatch收到的事件
SelectionKey sk = it.next();
dispatch(sk);
}
keySet.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
/**
* 获得就绪IO事件并完成事件分发
*
* @param sk
*/
void dispatch(SelectionKey sk) {
// 1.获取 handler
// 1.1.如果是 ServerSocketChannel(代表服务端),则 handler为 BossHandler
// 1.2.如果是 SocketChannel(代表连接的某个客户端网络通道),则 handler为 MyWorkHandler
Runnable handler = (Runnable) sk.attachment();
// 在当前线程中处理,调用之前 attach 绑定到选择键的handler处理器对象
if (handler != null) {
System.out.println("dispatch:"+handler.getClass().toString());
handler.run();
}
}
}
2.3.4MyWorkHandler
package com.kikop.myreactor.multipthreadapp.server.handler;
import com.kikop.util.Logger;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author kikop
* @version 1.0
* @project Name: mynettydemo
* @file Name: MyWorkHandler
* @desc 执行业务处理逻辑(在一个主线程main开辟的线程池中)
* 负责单个通道的同步sync读写
* @date 2021/6/22
* @time 10:30
* @by IDE: IntelliJ IDEA
*/
public class MyWorkHandler implements Runnable {
final SocketChannel channel;
final SelectionKey sk;
final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
static final int RECIEVING = 0, SENDING = 1;
int state = RECIEVING;
// 主线程 main:引入线程池
static ExecutorService pool = Executors.newFixedThreadPool(4);
/**
* @param selector 可能是1或2
* @param c
* @throws IOException
*/
public MyWorkHandler(Selector selector, SocketChannel c) throws IOException {
channel = c;
c.configureBlocking(false);
// 取得选择键,设置感兴趣的IO事件
sk = channel.register(selector, 0);
// 将本 Handler:MultiThreadEchoHandler作为 MyReactorTaskRunnable.dispatch sk选择键的附件
// 方便事件 dispatch
sk.attach(this);
// 向 SelectionKey 选择键注册Read就绪事件
sk.interestOps(SelectionKey.OP_READ);
// 使尚未返回的第一个选择操作立即返回,唤醒的
// 原因是:注册了新的channel或者事件;channel关闭,取消注册;优先级更高的事件触发(如定时器事件),希望及时处理。
selector.wakeup();
}
// MyReactorTaskRunnable.dispatch来触发
public void run() {
// 异步任务,在独立的线程池中执行
pool.execute(new AysncPoolTask(this));
}
/**
* synchronized 感觉没有意义 todo
*/
public synchronized void process() {
try {
if (state == SENDING) {
//写入通道
channel.write(byteBuffer);
//写完后,准备开始从通道读,byteBuffer切换成写模式
byteBuffer.clear();
//写完后,注册read就绪事件
sk.interestOps(SelectionKey.OP_READ);
//写完后,进入接收的状态
state = RECIEVING;
} else if (state == RECIEVING) {
// 从通道读
int length = 0;
// 注意:channel.read 定义1024,kernel 内核有个缓存
while ((length = channel.read(byteBuffer)) > 0) {
Logger.info(new String(byteBuffer.array(), 0, length));
}
//读完后,准备开始写入通道,byteBuffer切换成读模式
byteBuffer.flip();
//读完后,注册 write就绪事件
sk.interestOps(SelectionKey.OP_WRITE);
// 读完后,进入发送的状态
state = SENDING;
}
//处理结束了, 这里不能关闭select key,需要重复使用
//sk.cancel();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
2.3.4.1AysncPoolTask
package com.kikop.myreactor.multipthreadapp.server.handler;
/**
* @author kikop
* @version 1.0
* @project Name: mynettydemo
* @file Name: AysncPoolTask
* @desc 线程程中的某个任务
* @date 2021/6/22
* @time 10:30
* @by IDE: IntelliJ IDEA
*/
public class AysncPoolTask implements Runnable {
MyWorkHandler workHandler;
public AysncPoolTask(MyWorkHandler workHandler) {
this.workHandler = workHandler;
}
public void run() {
this.workHandler.process();
}
}
2.4客户端
2.4.1EchoClient
package com.kikop.myreactor.multipthreadapp.client;
import com.kikop.myreactor.multipthreadapp.config.NioDemoConfig;
import com.kikop.myreactor.singlethreadapp.client.handler.MyTaskRunnable;
import com.kikop.util.Print;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
/**
* @author kikop
* @version 1.0
* @project Name: mynettydemo
* @file Name: EchoClient
* @desc EchoClient
* @date 2021/6/22
* @time 10:30
* @by IDE: IntelliJ IDEA
*/
public class EchoClient {
public void start() throws IOException {
InetSocketAddress address =
new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP, NioDemoConfig.SOCKET_SERVER_PORT);
// 1、获取网络通道
SocketChannel socketChannel = SocketChannel.open(address);
// 2、切换成非阻塞模式
socketChannel.configureBlocking(false);
// 不断的自旋、等待与服务端的连接完成,或者做一些其他的事情
while (!socketChannel.finishConnect()) {
}
Print.tcfo("客户端启动成功!");
// 3.启动业务处理线程
MyTaskRunnable myTaskRunnable = new MyTaskRunnable(socketChannel);
new Thread(myTaskRunnable).start();
}
public static void main(String[] args) throws IOException {
new EchoClient().start();
}
}
2.4.2MyTaskRunnable
package com.kikop.myreactor.multipthreadapp.client.handler;
import com.kikop.myreactor.multipthreadapp.config.NioDemoConfig;
import com.kikop.util.Dateutil;
import com.kikop.util.Logger;
import com.kikop.util.Print;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
/**
* @author kikop
* @version 1.0
* @project Name: mynettydemo
* @file Name: MyTaskRunnable
* @desc 客户端业务处理线程
* @date 2021/6/22
* @time 10:30
* @by IDE: IntelliJ IDEA
*/
public class MyTaskRunnable implements Runnable {
// 客户端选择器(提供通道的IO读、写事件的注册)
private final Selector selector;
private final SocketChannel channel;
/**
* MyTaskRunnable
*
* @param channel
* @throws IOException
*/
public MyTaskRunnable(SocketChannel channel) throws IOException {
selector = Selector.open(); // Reactor初始化
this.channel = channel;
channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
// 获取就绪事件列表,底层用两个数组进行筛选,Buffer有读写事件,操作系统产生中断
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
SelectionKey sk = it.next();
if (sk.isWritable()) { // 通道对应的缓存可写
ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);
Scanner scanner = new Scanner(System.in);
Print.tcfo("请输入发送内容:");
if (scanner.hasNext()) {
SocketChannel socketChannel = (SocketChannel) sk.channel();
String next = scanner.next();
// 先写数据到buffer
buffer.put((Dateutil.getNow() + " >>" + next).getBytes());
// buffer切为读
buffer.flip();
// 发送数据
// 通过 DatagramChannel数据报通道
socketChannel.write(buffer);
// 清空缓存
buffer.clear();
}
}
if (sk.isReadable()) { // 通道对应的缓存可读
// 若选择键的IO事件是“可读”事件,读取数据
SocketChannel socketChannel = (SocketChannel) sk.channel();
//读取数据
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int length = 0;
// 注意:socketChannel.read需要等到一定的数量
while ((length = socketChannel.read(byteBuffer)) > 0) {
byteBuffer.flip(); // 方便下一次写数据到buffer
Logger.info("client recv:" + new String(byteBuffer.array(), 0, length));
// 业务处理完成,清空缓存
byteBuffer.clear();
}
}
// 处理结束了, 这里不能关闭 select key,需要重复使用
// selectionKey.cancel();
}
selected.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
总结
1.1优化演进
模式演进分为2个方面:
1)升级Refactor反应器。引入多个selector选择器,提升选择大量通道的能力。
2)升级Handler处理器。既要使用多线程,又要尽可能的高效率,则可以使用线程池。
总体如下:
1)将负责输入输出处理的IO Handler处理器的执行,放入独立的线程池中。这样,业务处理线程与负责服务监听和IO事件查询的反应器线程相隔离,避免服务器的连接监听收到阻塞。
2)如果服务器为多核CPU,可以将反应器线程拆分为多个子反应器线程;同时,引入多个选择器,每一个反应器线程负责一个选择器,这样,充分释放系统资源的能力;也提高了反应器管理大量连接,提升选择大量通道的能力。
网友评论