1、操作系统内核
2、BIO模型
BIO(blocking I/O)是阻塞IO模型,每个客户端连接上之后都会启用一个新的线程来处理。
阻塞是指线程内部方法的阻塞,异步是指线程间的异步执行
(1)、Java实现单线程的BIO模型
# 单线程阻塞情况---server端
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(8899));
byte[] bytes =new byte[1024];
while (true){
System.out.println("等待连接...");
Socket accept = serverSocket.accept();//阻塞方法
System.out.println("连接成功");
int read = accept.getInputStream().read(bytes);
System.out.println("接受到消息-read:"+read);
System.out.println("接受到消息-bytes:"+new String(bytes));
}
}
# 单线程阻塞情况---client端
public static void main(String[] args) throws IOException {
Socket socket = new Socket();
socket.connect(new InetSocketAddress("127.0.0.1",8899));
Scanner scanner = new Scanner(System.in);
System.out.println("请输入内容...");
while(true){
String text = scanner.next();
System.out.println("客户端发送消息:"+text);
socket.getOutputStream().write(text.getBytes(StandardCharsets.UTF_8));;
}
}
如果只有单线程,则启动server
之后,会阻塞在accept
方法这里,等待客户端的连接,客户端连接成功之后,会阻塞在read
方法这里等待接受消息,客户端成功发送消息之后,服务端接受到消息后打印出消息,然后循环继续阻塞在accept
方法这里。如果当前客户端继续发送消息给服务端,服务端也是无法接受到消息,因为阻塞在了accept
方法这里。所以单线程无法实现BIO
(2)、Java实现多线程的BIO模型
# 多线程阻塞情况---server端
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(8899));
byte[] bytes =new byte[1024];
while (true){
System.out.println("等待连接...");
Socket accept = serverSocket.accept();//阻塞方法
System.out.println("连接成功");
new Thread(()->{
try {
while (true){
int read = accept.getInputStream().read(bytes);
System.out.println("线程:"+Thread.currentThread().getName()+"--接受到消息-read:"+read);
System.out.println("线程:"+Thread.currentThread().getName()+"--接受到消息-bytes:"+
new String(bytes,0,read));
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
}
# 多线程阻塞情况---client端
public static void main(String[] args) throws IOException {
Socket socket = new Socket();
socket.connect(new InetSocketAddress("127.0.0.1",8899));
Scanner scanner = new Scanner(System.in);
System.out.println("请输入内容...");
while(true){
String text = scanner.next();
System.out.println("客户端发送消息:"+text);
socket.getOutputStream().write(text.getBytes(StandardCharsets.UTF_8));;
}
}
开辟多线程,accept阻塞在主线程,其他消息的read阻塞在各自的子线程中。
BIO模型
(3)、缺点
BIO的缺点就是不适合大规模的客户端访问,每一个客户端需要开辟一个新的线程去阻塞,浪费资源。
3、NIO模型--Select
NIO
是Non-blocking IO
,在BIO模型
的基础上改进了开辟多个线程专门复杂读写的缺点
NIO模型
有多种实现方式,select
,poll
,epoll
等
(1)、select的模型
select模型的结构图
Select结构图
(2)、select的Java实现
select模型的单线程实现
# select模型的单线程实现
# 服务端
public static void main(String[] args) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(9988));
serverSocketChannel.configureBlocking(false);//设置为非阻塞的
System.out.println("开始监听客户端...");
Selector selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
selector.select();//阻塞方法,进行轮询,如果没有连接请求或者需要读取的请求,则会一直阻塞在这里
Set<SelectionKey> keys = selector.selectedKeys();//轮询完成之后得到所有有标记的请求
System.out.println("轮询得到的key集合长度为:" + keys.size());
for (SelectionKey selectionKey : keys) {
//处理完成之后,将对应的有标记key的socket从集合中删除,下次再轮询获取
keys.remove(selectionKey);
//对不同的key做不同的处理,key的可能情况有:accept read write connect
handle(selectionKey);
}
}
}
static void handle(SelectionKey selectionKey) {
//如果key是accept,表示是一个连接的请求
if (selectionKey.isAcceptable()) {
System.out.println("当前socket是连接请求...");
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = null;
try {
socketChannel = serverSocketChannel.accept();
System.out.println("接受连接");
//设置为非阻塞
socketChannel.configureBlocking(false);
//设置当前的请求为read,因为已经连接上,所以下次肯定是要发送消息,所以服务端需要read发过来的信息
socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
//如果key是read,表示是一个写的请求,需要多写的内容进行读取
if (selectionKey.isReadable()) {
System.out.println("处理线程:" + Thread.currentThread().getName() + "--当前请求为read请求...");
SocketChannel socketChannel = null;
//拿到对应的socket
socketChannel = (SocketChannel) selectionKey.channel();
//读取socket中发送来的数据
ByteBuffer buffer = ByteBuffer.allocate(512);
buffer.clear();
int len = 0;
try {
len = socketChannel.read(buffer);
//打印出socket中发送过来的信息
if (len != -1) {
System.out.println("接受到的消息:" + new String(buffer.array(), 0, len));
}
//写消息给客户端,说明已经接受到消息
ByteBuffer byteBufferWrite = ByteBuffer.wrap("你好客户端,请求已经收到!".getBytes(StandardCharsets.UTF_8));
socketChannel.write(byteBufferWrite);
} catch (IOException e) {
e.printStackTrace();
} finally {
}
}
}
# 客户端
public static void main(String[] args) throws IOException {
Socket socket = new Socket();
socket.connect(new InetSocketAddress("127.0.0.1",9988));
Scanner scanner = new Scanner(System.in);
System.out.println("请输入内容...");
while(true){
String text = scanner.next();
System.out.println("客户端发送消息:"+text);
socket.getOutputStream().write(text.getBytes(StandardCharsets.UTF_8));;
}
}
(3)、select
的缺点
- 1、最大并发数的限制
因为一个进程所能打开的fd(文件描述符)
是有限制的,可以通过FD_SETSIZE
来设置,默认是1024
或者2048
,因此在linux
系统中select
模型的最大并发就被限制到1024
或2048
了(windows系统并没有限制)(poll
模型解决了此问题) - 2、效率低
每次进行select
调用都会进行线性轮训全部的fd集合
,这样就会导致效率线性下降。fd集合
越大,效率越低。(poll
模型任然采用轮训的方式,问题依然存在)
select
模型的时间复杂度为O(n)
,poll
模型的复杂度O(n)
,epoll
模型的复杂度为O(1)
- 3、内核和用户空间内存copy问题
select在解决将fd消息传递给用户空间时,采用了内存copy的方式,这样处理效率比较低。(poll
同样也是使用内存copy的方式,问题依然存在)
3、IO模型--poll
poll
在select的基础上改进了fd
的存储集合,由原来的上限被限制的集合改为没有上限的列表。但是轮询和内存copy的问题并没有实际解决。
4、IO模型--epoll
(1)、epoll
模型的基本信息
(2)、epoll
模型的特点
- 1、所支持的文件描述符的上限是整个系统最大可以打开的文件数(
select
模型是进程最大打开的文件数)
在1G
内存的机器上,最大可以打开10w
左右 - 2、每个
fd文件描述符
上都有callback
函数,只有活跃的socket
才会主动的调用callback
函数,其他的idle
状态socket
不会调用。采用的是通知机制
。
使用红黑树
来存储fd
文件,使用链表
来存储事件 - 3、通过内核与用户空间的
mmap
共用同一块内存,减少内存copy
的消耗
(3)、epoll
模型基本函数
3个API
#建立一个红黑树,用来存储fd,size表示红黑树的大小,最新版已经去掉,按照资源自动分配
int epoll_create(int size);
#对fd的红黑树进行增删改的操作。
#op参数表示动作类型,有三种:EPOLL_CTL_ADD,EPOLL_CTL_MOD,EPOLL_CTL_DEL
#event告诉内核需要监听的事件类型
#-EPOLLIN:表示对应的文件描述符可读
#-EPOLLOUT:表示对应的文件描述符可写
#-EPOLLPRI:表示对应的文件描述符有紧急数据可读(外带数据)
#-EPOLLERR:表示对应的文件描述符发生错误
#-EPOLLHUP:表示对应的文件描述符挂断
#-EPOLLET:表示将EPOLL设置为边缘触发(只要状态变化了才会通知获取,如果没有读完且没有新事件,则不会通知,是一种高速处理模式,相对是的电平出发)
int epoll_ctl(int epfd,int op,int fd,struct epoll_event event);
#等待时间的产生,类似于select模型的select()
#events表示内存得到的事件的集合(全部是就绪的event)
#maxEvents表示每次能处理的最大事件数
#timeout表示超时时间,设置为-1表示阻塞,0则会立即返回
#返回的int表示事件个数
int epoll_wait(int epfd,struct epoll_event *events,int maxevents,int timeout);
(4)、netty
对epoll
的封装
netty
使用两个group
(bossgroup
,workgroup
),bossgroup
只负责客户端的连接,workgroup
负责消息的处理,workgroup
在所有的通道的监听器队列最后面加上自己的处理器,也是通过回调的方式,如果有消息过来,则会自动执行回调方法里的逻辑。
netty
的使用
#服务端
public class NettyServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new MyChannelInboundHandler());
}
});
try {
ChannelFuture future = serverBootstrap.bind(9999);
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
workGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
class MyChannelInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = (ByteBuf)msg;
System.out.println("接受到消息:"+buffer.toString(CharsetUtil.UTF_8));
ctx.writeAndFlush(msg);
// ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("发生异常:"+cause.getCause().toString());
ctx.close();
}
}
#客户端
public class NettyClient {
public static void main(String[] args) {
new NettyClient().start();
}
private void start(){
EventLoopGroup clientWorks = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(clientWorks)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
System.out.println("初始化通道...");
socketChannel.pipeline().addLast(new ClientHandler());
}
});
try {
System.out.println("连接成功");
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",9999).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class ClientHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("通道已经建立...");
final ChannelFuture channelFuture = ctx.writeAndFlush(Unpooled.copiedBuffer("你好".getBytes()));
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
System.out.println("消息发送成功");
}
});
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//读取服务器发回的消息
try {
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("服务端发来消息:"+byteBuf.toString(CharsetUtil.UTF_8));
}finally {
ReferenceCountUtil.release(msg);
}
}
}
4、AIO模型
AIO(Asynchronous IO)是一种异步非阻塞的模型,程序实现一个回调函数,交给操作系统,如果有请求连接上来,操作系统会自动执行这个回调函数。
(1)、模型图
执行流程图:
image.png
(2)、Java实现
单线程实现AIO
#服务器端
public static void main(String[] args) throws IOException {
final AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel
.open()
.bind(new InetSocketAddress(7788));
System.out.println("服务器已经启动...");
serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel client, Object attachment) {
serverSocketChannel.accept(null,this);
try {
System.out.println("客户端:"+client.getRemoteAddress()+"--已经连接");
ByteBuffer buffer = ByteBuffer.allocate(1024);
System.out.println("等待接受消息...");
client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
System.out.println("客户端发来消息--");
attachment.flip();
//打印出从客户端读取的消息
try {
System.out.println("客户端"+client.getRemoteAddress()+"发来消息:"
+new String(attachment.array(),0,result));
}catch (Exception e){
System.out.println("获取客户端地址错误");
}
client.write(ByteBuffer.wrap("消息已收到".getBytes(StandardCharsets.UTF_8)));
//每次读取都会将事件从queue中取出来,所以这里需要重新放进去,以便继续通信,不然只能收到一次客户端的消息
client.read(attachment,attachment,this);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("处理消息失败");
}
});
} catch (IOException e) {
System.out.println("出现异常:"+e.getMessage());
}
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("连接失败");
}
});
//让程序执行下去。不要终止
while (true){
try {
CountDownLatch latch = new CountDownLatch(1);
latch.await();
}catch (Exception e){
System.out.println("await exception");
}
}
}
#客户端
public static void main(String[] args) throws IOException {
Socket socket = new Socket();
socket.connect(new InetSocketAddress("127.0.0.1",7788));
Scanner scanner = new Scanner(System.in);
System.out.println("请输入内容...");
while(true){
String text = scanner.next();
System.out.println("客户端发送消息:"+text);
socket.getOutputStream().write(text.getBytes(StandardCharsets.UTF_8));;
}
}
多线程实现AIO
#服务端代码
public static void main(String[] args) throws IOException, InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
AsynchronousChannelGroup asynchronousChannelGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService,1);
final AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel
.open(asynchronousChannelGroup)
.bind(new InetSocketAddress(9090));
System.out.println("等待客户端连接...");
serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel client, Object attachment) {
serverSocketChannel.accept(null,this);
ByteBuffer buffer = ByteBuffer.allocate(1024);
client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
System.out.println("接受到客户端的消息:"+new String(attachment.array(),0,result));
client.write(ByteBuffer.wrap("消息已接收到".getBytes(StandardCharsets.UTF_8)));
client.read(attachment,attachment,this);
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.out.println("接受消息处理失败");
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("连接失败");
}
});
CountDownLatch latch = new CountDownLatch(1);
latch.await();
}
参考资料
1、(填坑系列) 用aio写server与client进行通信的坑
2、
网友评论