个人博客:haichenyi.com。感谢关注
传统的同步阻塞I/O通讯模型,导致的结果就是只要有一方处理数据缓慢,都会影响另外一方的处理性能。按照故障设计原则,一方的处理出现问题,不应该影响到另外一方才对。但是,在同步阻塞的模式下面,这样的情况是无法避免的,很难通过业务层去解决。既然同步无法避免,为了避免就产生了异步。Netty框架就一个完全异步非阻塞的I/O通讯方式
同步阻塞式I/O编程
简单的来说,传统同步阻塞的I/O通讯模式,服务器端处理的方式是,每当有一个新用户接入的时候,就new一个新的线程,一个线程只能处理一个客户端的连接,在高性能方面,并发高的情景下无法满足。伪代码如下:
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @author 海晨忆
* @date 2018/2/9
* @desc
*/
public class SocketServer {
private int port = 8080;
private Socket socket = null;
public SocketServer(int port) {
this.port = port;
}
public void connect() {
ServerSocket server = null;
try {
server = new ServerSocket(port);
while (true) {
socket = server.accept();
new Thread(new Runnable() {
@Override
public void run() {
new TimerServerHandler(socket).run();
}
}).start();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
//释放资源
if (server != null) {
try {
server.close();
} catch (IOException e) {
e.printStackTrace();
}
server = null;
}
}
}
}
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
/**
* @author 海晨忆
* @date 2018/2/9
* @desc
*/
public class TimerServerHandler implements Runnable {
private Socket socket;
public TimerServerHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
out = new PrintWriter(this.socket.getOutputStream(), true);
String currentTime = null;
String body = null;
while (true) {
body = in.readLine();
if (body == null)
break;
}
} catch (IOException e) {
e.printStackTrace();
//释放in,out,socket资源
}
}
}
上面这个就是最原始的服务端IO的代码,这里我就给出的是最简化的,当有新的客户端接入的时候,服务端是怎么处理线程的,可以看出,每当有新的客户端接入的时候,总是回新创建一个线程去服务这个新的客户端
伪异步式编程
后来慢慢演化出一个版本“伪异步”模型,新增加一个线程池或者消息队列,满足一个线程或者多个线程满足N个客户端,通过线程池可以灵活的调用线程资源。通过设置线程池的最大值,防止海量并发接入造成的线程耗尽,它的底层实现依然是同步阻塞模型,伪代码如下:
import com.example.zwang.mysocket.server.TimerServerHandler;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
/**
* @author 海晨忆
* @date 2018/2/9
* @desc
*/
public class SocketServer {
private int port = 8080;
private Socket socket = null;
public SocketServer(int port) {
this.port = port;
}
private void connect() {
ServerSocket server = null;
try {
server = new ServerSocket(port);
TimeServerHandlerExecutePool executePool = new TimeServerHandlerExecutePool(50, 1000);
while (true) {
socket = server.accept();
executePool.execute(new TimerServerHandler(socket));
}
} catch (IOException e) {
e.printStackTrace();
}finally {
//释放资源
}
}
}
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author 海晨忆
* @date 2018/2/9
* @desc
*/
public class TimeServerHandlerExecutePool {
private ExecutorService executor;
public TimeServerHandlerExecutePool(int maxPoolSize, int queueSize) {
executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), maxPoolSize,
120L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize));
}
public void execute(Runnable task) {
executor.execute(task);
}
}
“伪异步”的代码和传统同步的唯一区别就是在于,首先先创建了一个时间服务处理类的线程池,当有新的客户端接入的时候,先将socket请求封装成task,然后调用线程池的execute方法执行,从而避免了每一个新请求创建一个新线程。由于线程池和消息队列都是有限的,因此,无论客户端的并发量多大,它都不会导致线程个数过于大,而造成的内存溢出。相对于传统的同步阻塞,是一种改良。
但是他没有从更本上解决同步的问题,伪异步的问题在于,他还是有一方处理出现问题还是会影响到另一方。因为:
- 当对socket的输入流进行读取操作的时候,它会一直阻塞直到一下三种方式发生:
1. 有数据可读
2. 可读数据已经读取完
3. 发生空指针或者I/O异常。
这意味者,当读取inputstream方处理速度缓慢(不管是什么原因造成的速度缓慢),另一方会一直同步阻塞,直到这一方把数据处理完. - 当调用outputstream的write方法写输出流的时候,它将会被阻塞,直到所有要发送的字节全部写入完毕,或者发生异常。学过TCP/IP相关知识的人都直到,当消息的接收方处理消息缓慢,不能及时的从TCP缓冲区读取数据,这将会导致发送方的TCP缓冲区的size一直减少,直到0.缓冲区为0,那么发消息的一方将无法将消息写入缓冲区,直到缓冲区的size大于0
通过以上。我们了解到读和写的操作都是同步阻塞的,阻塞的时间取决于对方的I/O线程的处理速度和网络I/O的传送速度。从本质上面看,我们无法保证对方的处理速度和网络传送速度。如果,我们的程序依靠与对方的处理速度,那么,他的可靠性将会非常差。
NIO编程
官方叫法new I/O,也就是新的IO编程,更多的人喜欢称它为:Non-block IO即非阻塞IO。
与Socket和serverSocket类对应,NIO提供了SocketChannel和ServerSocketChannel两种不同的套接字通道实现,这两种都支持阻塞式编程和非阻塞式编程。开发人员可以根据自己的需求选择合适的编程模式。一般低负载,低并发的应用程序选择同步阻塞的方式以降低编程的复杂度。高负载,高并发的不用想了,非阻塞就是为了解决这个问题的
- 缓冲区Buffer
Buffer是一个对象,它包含一些写入或者读出的数据。再NIO中加入buffer对象,体现了新库和旧库的一个重要区别。在面向流的io中,可以直接把数据读取或者写入到stream对象中。在NIO库中,所有数据操作都是通过缓冲区处理的。
缓冲区实质上是一个数组,通常是一个字节数组(ByteBuffer),基本数据类型除了boolean没有,其他都有,如ShortBuffer,CharBuffer等等 - 通道Channel
Channel是一个通道,双向通道,网络数据都是通过Channel读取,写入的。是的,没错,Channel它既可以进行读操作,也可以进行写操作。而流只能是一个方向。只能读操作或者只能写操作,而channel是全双工,读写可以同时进行。channel可以分为两大类:网络读写的SelectableChannel和文件操作的FileChannel。我们前面提到的SocketChannel和ServerSocketChannel都是SelectableChannel的子类。 - 多路复用器Selector
selector多路复用器,他是java NIO编程的基础,熟练的掌握selector对于NIO编程至关重要。多路复用器提供选择已经就绪的任务的能力。简单的讲就是他会不断的轮询注册的channel,如果一个Channel发生了读写操作,这个Chnnel就会处于就绪状态,会被selector轮询出来,通过SelectorKey获取就绪Channel集合,进行后续的IO操作。一个selector对应多个Channel
由于原生NIO编码比较麻烦和复杂,我这里就给出了思路的伪代码。下一篇我们将用NIO中的Netty框架实现Socket通信,编码简单,一行代码解决烦人粘包、拆包问题。
/**
* 服务端nio过程的伪代码
*
* @param port 端口号
* @throws IOException IOException
*/
private void init(int port) throws IOException {
//第一步:打开ServerSocketChannel,用于监听客户端连接,它是所有客户端连接的父管道
ServerSocketChannel socketChannel = ServerSocketChannel.open();
//第二步:监听绑定端口,设置连接模式为非阻塞模式,
socketChannel.socket().bind(new InetSocketAddress(InetAddress.getByName("IP"), port));
socketChannel.configureBlocking(false);
//第三步:创建Reactor线程,创建多路复用器,并启动线程。
Selector selector = Selector.open();
new Thread().start();
//第四步:将ServerSocketChannel注册到Reactor线程的多路复用器上,监听accept事件
SelectionKey key = socketChannel.register(selector, SelectionKey.OP_ACCEPT/*,ioHandler*/);
//第五步:多路复用器在线程run方法的无线循环体内轮询准备就绪的key
int num = selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while (it.hasNext()) {
SelectionKey next = it.next();
//deal with io event...
}
//第六步:多路复用器检测到有新客户端接入,处理新的接入请求,完成TCP三次握手,建立物理链路
SocketChannel channel = socketChannel.accept();
//第七步:设置客户端为非阻塞模式
channel.configureBlocking(false);
channel.socket().setReuseAddress(true);
//第八步:将新接入的客户端注册到reactor线程的多路复用器上,监听读操作,读取客户端发送的消息
SelectionKey key1 = socketChannel.register(selector, SelectionKey.OP_ACCEPT/*,ioHandler*/);
//第九步:异步读取客户端消息到缓冲区,
/*int readNumber = channel.read("receivebuff");*/
//第十步:对byteBuffer进行编解码,如果有半包信息指针reset,继续读取到后续的报文,将解码成功消息封装成task,投递到业务线程池,进行业务逻辑编排
Object massage = null;
while (buff.hasRemain()) {
buff.mark();
Object massage1 = decode(btyeBuffer);
if (massage1 == null) {
byteBuffer.reset();
break;
}
massageList.add(massage1);
}
if (!byteBuffer.hasRemain()) {
byteBuffer.clean();
} else {
byteBuffer.compact();
}
if (massageList != null && !massageList.isEmpty()) {
for (Object massage3 : massageList){
handlerTask(massage3);
}
}
//第十一步:将POJO对象encode成ByteBuff,调用SocketChannel的异步write接口,将异步消息发送到客户端
socketChannel.write(buffer);
}
结束!!!
网友评论