必须明白的几个概念
阻塞(Block)和非阻塞(Non-Block)
阻塞和非阻塞是在缓冲区数据没有准备就绪的情况下处理方式
阻塞:在没有就绪下,就一直等待哪里
非阻塞:在没有就绪下,则直接返回
阻塞和非阻塞是相对于io操作来讲
举例如下图:从机器内存到jvm内存
image.png
同步(Synchronization)和异步(Asynchronous)
同步和异步是处理io事件的采用方式
同步:应用程序参与io的处理
异步:io交给操作系统,应用程序只需要等待通知
异步和同步是相对于时间点,异步指同一时间点做多个处理,同步指同一时间点做一个处理。
读和写
读指input,写指output,输入和输出是相对于内存来讲的
BIO 与 NIO 对比
BIO:面向流(乡村公路),阻塞 IO(多线程)
NIO:面向缓冲(高速公路,多路复用技术),非阻塞IO(反应堆 Reactor),选择器(轮询机制)
image.png
阻塞和非阻塞
NIO是发现缓冲数据没有准备好,直接返回,处理其他事情,因此可以一个线程处理多个输入和输出通道。
BIO 是发现缓冲区数据没有准备好,则等待哪里,因此,可以一个线程处理一个输入和输出通道。
Java AIO
真正实现异步IO
代码实现
BIO
1 服务端
//同步阻塞IO模型
public class BIOServer {
ServerSocket server;
public BIOServer(int port){
try {
server = new ServerSocket(port);
System.out.println("BIO服务已启动,监听端口是:" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
public void listen() throws IOException{
//循环监听
while(true){
//等待客户端连接,阻塞方法
Socket client = server.accept();
InputStream is = client.getInputStream();
//网络客户端把数据发送到网卡,机器所得到的数据读到了JVM内中
byte [] buff = new byte[1024];
int len = is.read(buff);
if(len > 0){
String msg = new String(buff,0,len);
System.out.println("收到" + msg);
}
}
}
public static void main(String[] args) throws IOException {
new BIOServer(8080).listen();
}
}
2 客户端
public class BIOClient {
public static void main(String[] args) throws UnknownHostException, IOException {
Socket client = new Socket("localhost", 8080);
OutputStream os = client.getOutputStream();
//生成一个随机的ID
String name = UUID.randomUUID().toString();
System.out.println("客户端发送数据:" + name);
os.write(name.getBytes());
os.close();
client.close();
}
}
NIO
1 服务端
/**
* NIO的操作过于繁琐,于是才有了Netty
* Netty就是对这一系列非常繁琐的操作进行了封装
*/
public class NIOServerDemo {
private int port = 8080;
//轮询器 Selector 大堂经理
private Selector selector;
//缓冲区 Buffer 等候区
private ByteBuffer buffer = ByteBuffer.allocate(1024);
//初始化完毕
public NIOServerDemo(int port){
try {
this.port = port;
//远程服务器通道
ServerSocketChannel server = ServerSocketChannel.open();
//我得告诉地址
server.bind(new InetSocketAddress(this.port));
//BIO 升级版本 NIO,为了兼容BIO,NIO模型默认是采用阻塞式
server.configureBlocking(false);
//轮训器
selector = Selector.open();
//轮训器属于该通道
server.register(selector, SelectionKey.OP_ACCEPT);
} catch (Exception e) {
e.printStackTrace();
}
}
public void listen(){
System.out.println("listen on " + this.port + ".");
try {
//轮询主线程
while (true){
//通道个数,当没有通道则阻塞,有则继续进行
selector.select();
//每次都拿到所有的号子
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iter = keys.iterator();
//不断地迭代,就叫轮询
//同步体现在这里,因为每次只能拿一个key,每次只能处理一种状态
while (iter.hasNext()){
SelectionKey key = iter.next();
iter.remove();
//每一个key代表一种状态
//数据就绪、数据可读、数据可写 等等等等
process(key);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
//具体办业务的方法,坐班柜员
//在同一时间点,只能干一件事
private void process(SelectionKey key) throws IOException {
//针对于每一种状态给一个反应
if(key.isAcceptable()){
ServerSocketChannel server = (ServerSocketChannel)key.channel();
//这个方法体现非阻塞,不管你数据有没有准备好
//你给我一个状态和反馈
SocketChannel channel = server.accept();
//一定一定要记得设置为非阻塞
channel.configureBlocking(false);
//当数据准备就绪的时候,将状态改为可读
channel.register(selector,SelectionKey.OP_READ);
}
else if(key.isReadable()){
//key.channel 从多路复用器中拿到客户端的引用
SocketChannel channel = (SocketChannel)key.channel();
int len = channel.read(buffer);
if(len > 0){
buffer.flip();
String content = new String(buffer.array(),0,len);
key = channel.register(selector,SelectionKey.OP_WRITE);
//在key上携带一个附件,一会再写出去
key.attach(content);
System.out.println("读取内容:" + content);
}
}
else if(key.isWritable()){
SocketChannel channel = (SocketChannel)key.channel();
String content = (String)key.attachment();
channel.write(ByteBuffer.wrap(("输出:" + content).getBytes()));
channel.close();
}
}
public static void main(String[] args) {
new NIOServerDemo(8080).listen();
}
}
2 客户端
同BIO客户端
AIO
1 服务端
public class AIOServer {
private final int port;
public static void main(String args[]) {
int port = 8000;
new AIOServer(port);
}
public AIOServer(int port) {
this.port = port;
listen();
}
private void listen() {
try {
ExecutorService executorService = Executors.newCachedThreadPool();
AsynchronousChannelGroup threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
//开门营业
//工作线程,用来侦听回调的,事件响应的时候需要回调
final AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(threadGroup);
server.bind(new InetSocketAddress(port));
System.out.println("服务已启动,监听端口" + port);
//准备接受数据
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>(){
final ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
//实现completed方法来回调
//由操作系统来触发
//回调有两个状态,成功
public void completed(AsynchronousSocketChannel result, Object attachment){
System.out.println("IO操作成功,开始获取数据");
try {
buffer.clear();
result.read(buffer).get();
buffer.flip();
result.write(buffer);
buffer.flip();
} catch (Exception e) {
System.out.println(e.toString());
} finally {
try {
result.close();
server.accept(null, this);
} catch (Exception e) {
System.out.println(e.toString());
}
}
System.out.println("操作完成");
}
@Override
//回调有两个状态,失败
public void failed(Throwable exc, Object attachment) {
System.out.println("IO操作是失败: " + exc);
}
});
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException ex) {
System.out.println(ex);
}
} catch (IOException e) {
System.out.println(e);
}
}
}
2 客户端
public class AIOClient {
private final AsynchronousSocketChannel client;
public AIOClient() throws Exception{
client = AsynchronousSocketChannel.open();
}
public void connect(String host,int port)throws Exception{
client.connect(new InetSocketAddress(host,port),null,new CompletionHandler<Void,Void>() {
@Override
public void completed(Void result, Void attachment) {
try {
client.write(ByteBuffer.wrap("这是一条测试数据".getBytes())).get();
System.out.println("已发送至服务器");
} catch (Exception ex) {
ex.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Void attachment) {
exc.printStackTrace();
}
});
final ByteBuffer bb = ByteBuffer.allocate(1024);
client.read(bb, null, new CompletionHandler<Integer,Object>(){
@Override
public void completed(Integer result, Object attachment) {
System.out.println("IO操作完成" + result);
System.out.println("获取反馈结果" + new String(bb.array()));
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
}
);
try {
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException ex) {
System.out.println(ex);
}
}
public static void main(String args[])throws Exception{
new AIOClient().connect("localhost",8000);
}
}
网友评论