BIO
BIO意为阻塞IO(Blocking IO),即所有的通信方法如Accept,connect,read和write方法均阻塞。线程在大部分时间处于阻塞等待状态,无法支持高负载高并发的场景,资源利用率极为低下。
Java传统BIO是面向流的。流是一种单向的数据传输媒介。根据传输数据的不同,流总的来说分为字符流和字节流两种。其中字节流以Stream结尾,字符流以Reader/Writer结尾。
这些流有如下分类(只列出常用的):
- InputStream:字节输入流
- BufferedInputStream:带缓冲的输入流
- FileInputStream:文件输入流
- ObjectInputStream:对象输入流,读入序列化的对象,要求对象必须实现Serializable接口
- DataInputStream:数据输入流,读取Java基本数据类型
- OutputStream:字节输出流
- BufferedOutputStream:带缓存的输出流
- FileOutputStream:文件输出流
- ObjectOutputStream:对象输出流
- DataOutputStream:数据输出流
- InputStreamReader:字符输出流
- BufferedReader:带缓存的字符输入流
- FileReader:文件字符输入流
- OutputStreamWriter:字节输入流
- BufferedWriter:带缓存的字符输出流
- FileWriter:文件字符输出流
下面是BIO网络通信的示例代码:
BioServer(BIO服务端)
public class BioServer {
public static void main(String[] args) throws Exception {
ServerSocket serverSocket = new ServerSocket(9900);
while (true) {
// 方法阻塞,等待socket建立连接
Socket socket = serverSocket.accept();
InputStream inputStream = socket.getInputStream();
BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream);
byte[] buffer = new byte[1024];
int read;
// 方法阻塞,等待接收到数据,或者是连接关闭
while ((read = bufferedInputStream.read(buffer)) != -1) {
System.out.println(new String(buffer, 0, read));
}
// 使用完毕后需要关闭socket
socket.close();
}
// 使用完毕后必须关闭相关资源,这里演示需要使用了死循环,故不需要关闭
// bufferedInputStream.close();
// inputStream.close();
// serverSocket.close();
}
}
BioClient(BIO客户端)
public class BioClient {
public static void main(String[] args) throws Exception {
Socket socket = new Socket();
// 阻塞方式,直到连接server成功
socket.connect(new InetSocketAddress("127.0.0.1", 9900));
OutputStream outputStream = socket.getOutputStream();
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outputStream);
bufferedOutputStream.write("Hello from client\n".getBytes());
// 由于是缓存流,write数据并不会被立即发送,需要使用flush将缓冲区所有数据写入流
bufferedOutputStream.flush();
bufferedOutputStream.close();
outputStream.close();
socket.close();
}
}
NIO
NIO意为New IO,即新一代IO。其他地方有介绍说Java NIO为非阻塞IO(Non-blocking IO)。这一点其实是不对的,因为Java的NIO既支持阻塞方式也支持非阻塞方式。
NIO相对传统IO,编程模型上做出了较大改变。NIO是面向通道(channel)和缓冲区(buffer)的。Channel和流最大的不同之处在于channel支持双向数据传递,而流只支持单向。
除此之外NIO还支持零拷贝。零拷贝即CPU不参与数据复制的过程,可以避免用户空间和操作系统内核空间的上下文切换。零拷贝不仅需要应用的支持,还需要操作系统的支持。下面举一个例子,我们从文件系统读取一个文件,从socket发送出去。如果用传统方式数据流经如下步骤:
文件系统 -> 内核读缓存 -> 应用缓存(Java) -> 内核Socket缓存 -> 网卡缓存
使用零拷贝技术之后,数据流经步骤减少了2次拷贝,如下所示:
文件系统 -> 内核读缓存 -> 内核Socket缓存 -> 网卡缓存
Linux系统支持的零拷贝有两种:
- mmap内存映射:DMA控制器从外部存储加载数据到内核缓存,然后将应用内缓存通过地址映射的方式,映射到内核缓存。这样可以省去数据从内核缓存复制到应用缓存的步骤。
- sendfile:DMA控制器从外部存储加载数据到内核缓存,然后系统直接将内核缓存中的数据复制到socket缓存。这样可以省去数据从应用缓存复制到socket缓存的步骤。
mmap的Java NIO使用方式:
MappedByteBuffer mappedByteBuffer = new RandomAccessFile(file, "r").getChannel().map(FileChannel.MapMode.READ_ONLY, 0, len);
该方法返回的mappedByteBuffer
是文件数据的内核缓存在Java应用中的映射。
sendfile的Java NIO使用方式:
FileChannel sourceChannel = new RandomAccessFile("/path/to/file", "rw").getChannel();
SocketChannel socketChannel = SocketChannel.open(...);
sourceChannel.transferTo(0, sourceChannel.size(), socketChannel);
FileChannel
的transferTo
和transferFrom
使用了零拷贝技术。transferTo
负责将文件FileChannel
的内容发往其他channel,transferFrom
从其他channel读取内容,发送给FileChannel
。
通过上面我们可以得出结论,减少内核缓存和应用缓存(主要指Java应用堆内存)之间的复制操作可以提高IO操作效率。为了满足这种场景,NIO的ByteBuffer提供了两种:堆内buffer和堆外buffer。堆内buffer使用一个字节数组作为数据载体,堆外内存不属于Java虚拟机,归操作系统管理。使用堆外内存能够显著提高IO性能。
ByteBuffer分配堆内内存和堆外内存的方式如下:
// 分配1024字节堆内内存
ByteBuffer byteBuffer = ByteBuffer.allocate(1024)
// 分配1024字节堆外内存
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024)
阻塞NIO
下面举一个例子,介绍下阻塞模式的NIO网络通信的写法。
NioServer
public class NioServer {
public static void main(String[] args) throws Exception {
// 建立连接并绑定地址端口
ServerSocketChannel channel = ServerSocketChannel.open().bind(new InetSocketAddress("127.0.0.1", 9900));
// 创建一个1024字节大小的字节缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while (true) {
// 接受socket连接关闭
// 方法阻塞,直到socket链接成功
SocketChannel socketChannel = channel.accept();
int read;
// 方法阻塞,直到读取到数据,或者socket连接关闭
while ((read = socketChannel.read(byteBuffer)) != -1) {
System.out.println(new String(byteBuffer.array(), 0, read));
byteBuffer.clear();
}
// 使用完需要关闭socketChannel
socketChannel.close();
}
// channel.close();
}
}
NioClient
public class NioClient {
public static void main(String[] args) throws Exception {
// 建立连接
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9900));
ByteBuffer byteBuffer = ByteBuffer.wrap("Hello from client".getBytes());
socketChannel.write(byteBuffer);
socketChannel.close();
}
}
非阻塞NIO
非阻塞NIO所有的数据传输方法比如accept connect read write均不阻塞。可以使用如下配置将阻塞NIO转变为非阻塞NIO:
channel.configureBlocking(false);
需要注意的是,FileChannel
不支持非阻塞模式。
非阻塞NIO的数据传输方法都是非阻塞的,调用之后无论成功或者失败都立刻返回。但问题是,我们无法知道数据是否传送成功,什么时候可以去传送/接收数据,给编程造成了很大的困难。
为了解决这个问题,Java NIO提供了多路复用器:Selector。Selector以轮询的方式检查所有注册到该Selector下的所有channel的事件。一旦有channel准备好了连接,读或者写,Selector可以立刻检测到并处理该事件。
Selector的厉害之处在于能够使用一个线程来处理多个channel的连接和读写事件,避免了线程上下文切换,十分的高效。
Selector并不会主动去监听某个channel的某些事件。需要主动将channel需要被Selector检测的事件注册到Selector。
Channel的事件有如下4种:
- SelectionKey.OP_ACCEPT
- SelectionKey.OP_READ
- SelectionKey.OP_WRITE
- SelectionKey.OP_CONNECT
创建Selector和向Selector注册事件的方法为:
Selector selector = Selector.open();
channel.register(selector, SelectionKey.OP_ACCEPT);
如果需要Selector检测同一个channel的多个事件,可以通过按位或运算符合并事件。例如SelectionKey.OP_WRITE | SelectionKey.OP_READ
。
下面是一个使用Selector的非阻塞NIO网络通信server的例子。该server接受到client发来的数据之后,回复“Hello from server”给客户端。
代码如下所示:
public class SelectorServer {
public static void main(String[] args) throws Exception {
ServerSocketChannel channel = ServerSocketChannel.open().bind(new InetSocketAddress("127.0.0.1", 9900));
// 设置为非阻塞模式
channel.configureBlocking(false);
Selector selector = Selector.open();
// 向selector注册ACCEPT事件,当ServerSocketChannel发生accept事件的时候会被selector select到
channel.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer buffer = ByteBuffer.allocate(1024);
ByteBuffer echoBuffer = ByteBuffer.wrap("Hello from server\n".getBytes());
while (true) {
// 方法阻塞
// selector一旦轮训到任何注册的channel发生的任意事件,立刻返回
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
// 获取SelectionKey
SelectionKey selectionKey = iterator.next();
iterator.remove();
// 如果是Accept事件
if (selectionKey.isAcceptable()) {
// 接受socket连接,非阻塞
SocketChannel socketChannel = channel.accept();
// 注意,这里不要忘了设置socketChannel为非阻塞模式,否则register方法会报错
socketChannel.configureBlocking(false);
// 为socketChannel注册READ事件,channel可以接收数据
socketChannel.register(selector, SelectionKey.OP_READ);
// 如果是read事件
} else if (selectionKey.isReadable()) {
// 从SelectionKey获取附带的channel,即产生read事件的channel
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
int read = socketChannel.read(buffer);
System.out.println(new String(buffer.array(), 0, read));
buffer.clear();
// 设置接下来关心的事件为write
selectionKey.interestOps(SelectionKey.OP_WRITE);
// 如果是write事件
} else if (selectionKey.isWritable()) {
// 和read部分类似,获取触发事件的channel
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
socketChannel.write(echoBuffer);
// 设置接下来关心的事件为read
selectionKey.interestOps(SelectionKey.OP_READ);
echoBuffer.rewind();
}
}
}
// 程序退出前记得关闭selector和ServerSocketChannel,这里演示方便使用了死循环,故不再需要关闭
// selector.close();
// channel.close();
}
}
AIO
AIO全称为Asynchronous IO,是真正的全异步IO。AIO可以实现“调用后不管”,无论IO是否需要等待,操作多久,线程均不会阻塞。等到操作结束时,可以使用专用的线程池,负责处理结果回调。
AIO所有的IO操作均有两个版本:返回Future或者是参数使用CompletionHandler(异步回调)。
例如AsynchronousServerSocketChannel
的accept
方法的两个版本如下:
public abstract <A> void accept(A attachment,
CompletionHandler<AsynchronousSocketChannel,? super A> handler);
public abstract Future<AsynchronousSocketChannel> accept();
CompletionHandler的代码如下所示:
public interface CompletionHandler<V,A> {
/**
* Invoked when an operation has completed.
*
* @param result
* The result of the I/O operation.
* @param attachment
* The object attached to the I/O operation when it was initiated.
*/
void completed(V result, A attachment);
/**
* Invoked when an operation fails.
*
* @param exc
* The exception to indicate why the I/O operation failed
* @param attachment
* The object attached to the I/O operation when it was initiated.
*/
void failed(Throwable exc, A attachment);
}
其中completed
在操作成功时候调用,failed
在操作失败时候调用。
注意:attachment
在调用对应IO操作方法的时候(比如accept
)传入。如果IO操作时候需要使用其他对象,将其作为attachment传入是一个很方便的用法。
代码示例
下面代码为AIO 网络通信server和client。
AioServer
public class AioServer {
public static void main(String[] args) throws Exception {
// 创建AsynchronousChannelGroup
// 本质是一个线程池,负责接受异步回调
AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(4));
// 创建通道并绑定地址端口
AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open(group).bind(new InetSocketAddress("127.0.0.1", 9900));
// 接受socket连接,使用异步回调的方式
channel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel result, Object attachment) {
// 在socket连接成功的时候调用
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
try {
// read方法这里使用返回Future的版本
// 调用future的get方法,阻塞等待数据读取结果
Integer read = result.read(byteBuffer).get();
System.out.println(new String(byteBuffer.array(), 0, read));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
// 再次调用accept方法,接受下一个socket的连接
// 调用一次accept方法AsynchronousServerSocketChannel仅接受一次请求
channel.accept(null, this);
}
@Override
public void failed(Throwable exc, Object attachment) {
// socket连接失败的时候调用
System.out.println("Accept failed");
}
});
// 阻塞主线程,防止程序退出
CountDownLatch countDownLatch = new CountDownLatch(1);
countDownLatch.await();
}
}
AioClient
public class AioClient {
public static void main(String[] args) throws Exception {
AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(4));
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(group);
ByteBuffer buffer = ByteBuffer.wrap("Hello from client".getBytes());
// 连接socket服务器,使用异步回调的方式
channel.connect(new InetSocketAddress("127.0.0.1", 9900), null, new CompletionHandler<Void, Object>() {
@Override
public void completed(Void result, Object attachment) {
// 异步将buffer的数据写入channel
// 由于不关心什么时候能够写入完毕,我们不处理此方法的返回future
channel.write(buffer);
buffer.rewind();
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("Connect failed");
}
});
// 阻塞主线程,防止程序退出
CountDownLatch countDownLatch = new CountDownLatch(1);
countDownLatch.await();
}
}
网友评论