同步:任务一的完成需要依赖任务二,只有等待任务二完成,任务一才算完成。
异步:任务一会通知任务二完成什么任务,但是两个任务是互不等待,都会进行。任务二完成之后会告诉任务一。
阻塞:CPU停下来等待一个慢的操作完成才继续后面的工作。
非阻塞:CPU遇到这个慢的操作会先去执行其他的命令,等慢的动作完成之后在处理慢操作对应的命令。
接下来我们说说同步阻塞,同步非阻塞和异步非阻塞
之前看过一位大牛的博客,他举了个例子来解释三个概念,我觉得收益匪浅。小时候妈妈让去烧水,然后自己拿着水壶去了,在烧水的过程中一直等水烧开,这个就是同步阻塞。后来发现烧水需要很长时间,便在烧水的过程中去干别的事,时不时的来看看水是不是烧开了,这个模型就是同步非阻塞。再后来水壶有了烧开水之后发声的功能,那么烧水的时候,我可以不用时不时的去查看,只要听到声音了就知道水烧开了,这个模型就是异步非阻塞。接下来我们用代码看看下三种模型的具体实现。
BIO:同步阻塞
数据的读取写入必须阻塞在一个线程内等待其完成,在java中这样的模型简单容易理解,每次来一个请求,服务器都会开启一个线程去处理,当在连接数小于1000时,可以让每一个连接专注于自己的 I/O,不用过多考虑系统的过载、限流等问题。在搭配线程池的使用,可以很好的解决服务端连接异常的问题。但是当连接数达到万级别之后,线程之间切换带来请求处理慢的问题逐渐体现。
服务端
public class BioServer {
final static int PROT = 7788;
public static void main(String[] args) throws IOException {
ServerSocket server = null;
server = new ServerSocket(PROT);
System.out.println(" server start .. ");
while(true) {
//进行阻塞
socket = server.accept();
//新建一个线程执行客户端的任务
HandlerExecutorPool executorPool = new HandlerExecutorPool(50, 1000);
executorPool.execute(new ServerHandler(socket));
}
}
}
class HandlerExecutorPool {
private ExecutorService executor;
public HandlerExecutorPool(int maxPoolSize, int queueSize){
this.executor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
maxPoolSize,
120L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(queueSize));
}
public void execute(Runnable task){
this.executor.execute(task);
}
}
class ServerHandler implements Runnable {
private Socket socket;
public ServerHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try {
in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
out = new PrintWriter(this.socket.getOutputStream(), true);
String body = null;
while (true) {
body = in.readLine();
if (body == null) break;
System.out.println("Server :" + body);
out.println("服务器端回送响的应数据.");
}
}catch (Exception e){
}
}
}
客户端
public class Client implements Runnable {
final static String ADDRESS = "127.0.0.1";
final static int PORT = 8088;
public static void main(String[] args) throws IOException {
new Thread(new Client()).start();
}
@Override
public void run() {
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try {
socket = new Socket(ADDRESS, PORT);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
//向服务器端发送数据
while (true) {
out.println("接收到客户端的请求数据...");
out.println("接收到客户端的请求数据1111...");
String response = in.readLine();
System.out.println("Client: " + response);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
NIO:同步非阻塞
jdk1.7以后引入了NIO的变成模式。首先有三个概念需要了解。
buffer缓存区:NIO是将所有数据都用到缓冲区数组中,
InetSocketAddress address = new InetSocketAddress("127.0.0.1", 7788);//创建连接的地址
SocketChannel sc = null;//声明连接通道
ByteBuffer buf = ByteBuffer.allocate(1024);//建立缓冲区
sc = SocketChannel.open();//打开通道
sc.connect(address);//进行连接
while(true){
//定义一个字节数组,然后使用系统录入功能:
byte[] bytes = new byte[1024];
System.in.read(bytes);
buf.put(bytes);//把数据放到缓冲区中
buf.flip();//对缓冲区进行复位
sc.write(buf);//写出数据
buf.clear();//清空缓冲区数据
}
···
Channel 通道
NIO支持网络数据从Channel中读取,Channel是区别与传统的输入输出流的,传统输入输出流只支持单向数据流动,而Channel同时支持读取和写入,有多种状态位可以被识别。
Selector 多路复用选择器
NIO模型中一个连接就是一个Channel,所有的Channel都注册在Selector 中,Selector多路复用器选择器轮询查看Channel的状态位,当Channel发生读写操作时。便处于就绪状态,selector多路选择复用器会将所有处于就绪状态的Channel轮询出来,以继续后面的io操作,一个Selector可以负责上万级别的Channel,没有上限,这也是JDK使用epoll代替了传统的selector实现。
服务端代码
public class NioServer implements Runnable {
private Selector seletor;
//2 建立缓冲区
private ByteBuffer readBuf = ByteBuffer.allocate(1024);
//3
private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
public NioServer(int port) {
try {
//1 打开多路复用器
this.seletor = Selector.open();
//2 打开服务器通道
ServerSocketChannel ssc = ServerSocketChannel.open();
//3 设置服务器通道为非阻塞模式
ssc.configureBlocking(false);
//4 绑定地址
ssc.bind(new InetSocketAddress(port));
//5 把服务器通道注册到多路复用器上,并且监听阻塞事件
ssc.register(this.seletor, SelectionKey.OP_ACCEPT);
System.out.println("Server start, port :" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while (true) {
try {
//1 必须要让多路复用器开始监听
this.seletor.select();
//2 返回多路复用器已经选择的结果集
Iterator<SelectionKey> keys = this.seletor.selectedKeys().iterator();
//3 进行遍历
while (keys.hasNext()) {
//4 获取一个选择的元素
SelectionKey key = keys.next();
//5 直接从容器中移除就可以了
keys.remove();
//6 如果是有效的
if (key.isValid()) {
//7 如果为阻塞状态
if (key.isAcceptable()) {
this.accept(key);
}
//8 如果为可读状态
if (key.isReadable()) {
this.read(key);
}
//9 写数据
if (key.isWritable()) {
this.write(key); //ssc
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void write(SelectionKey key) throws ClosedChannelException {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
ssc.register(this.seletor, SelectionKey.OP_WRITE);
}
private void read(SelectionKey key) {
try {
//1 清空缓冲区旧的数据
this.readBuf.clear();
//2 获取之前注册的socket通道对象
SocketChannel sc = (SocketChannel) key.channel();
//3 读取数据
int count = sc.read(this.readBuf);
//4 如果没有数据
if(count == -1){
key.channel().close();
key.cancel();
return;
}
//5 有数据则进行读取 读取之前需要进行复位方法(把position 和limit进行复位)
this.readBuf.flip();
//6 根据缓冲区的数据长度创建相应大小的byte数组,接收缓冲区的数据
byte[] bytes = new byte[this.readBuf.remaining()];
//7 接收缓冲区数据
this.readBuf.get(bytes);
//8 打印结果
String body = new String(bytes).trim();
System.out.println("Server : " + body);
// 9..可以写回给客户端数据
} catch (IOException e) {
e.printStackTrace();
}
}
private void accept(SelectionKey key) {
try {
//1 获取服务通道
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//2 执行阻塞方法
SocketChannel sc = ssc.accept();
//3 设置阻塞模式
sc.configureBlocking(false);
//4 注册到多路复用器上,并设置读取标识
sc.register(this.seletor, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Thread(new NioServer(8088)).start();;
}
}
客户端
class NioClient{
private static String DEFAULT_HOST = "127.0.0.1";
private static int DEFAULT_PORT = 8088;
private static ClientHandle clientHandle;
public static void start(){
start(DEFAULT_HOST,DEFAULT_PORT);
}
public static synchronized void start(String ip,int port){
if(clientHandle!=null)
clientHandle.stop();
clientHandle = new ClientHandle(ip,port);
new Thread(clientHandle,"Server").start();
}
//向服务器发送消息
public static boolean sendMsg(String msg) throws Exception{
if(msg.equals("q")) return false;
clientHandle.sendMsg(msg);
return true;
}
public static void main(String[] args){
start();
}
}
class ClientHandle implements Runnable{
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean started;
public ClientHandle(String ip,int port) {
this.host = ip;
this.port = port;
try{
//创建选择器
selector = Selector.open();
//打开监听通道
socketChannel = SocketChannel.open();
//如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
socketChannel.configureBlocking(false);//开启非阻塞模式
started = true;
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
}
public void stop(){
started = false;
}
@Override
public void run() {
try{
doConnect();
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
//循环遍历selector
while(started){
try{
//无论是否有读写事件发生,selector每隔1s被唤醒一次
selector.select(1000);
//阻塞,只有当至少一个注册的事件发生的时候才会继续.
// selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key = null;
while(it.hasNext()){
key = it.next();
it.remove();
try{
handleInput(key);
}catch(Exception e){
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
}catch(Exception e){
e.printStackTrace();
System.exit(1);
}
}
//selector关闭后会自动释放里面管理的资源
if(selector != null)
try{
selector.close();
}catch (Exception e) {
e.printStackTrace();
}
}
private void handleInput(SelectionKey key) throws IOException{
if(key.isValid()){
SocketChannel sc = (SocketChannel) key.channel();
if(key.isConnectable()){
if(sc.finishConnect());
else System.exit(1);
}
//读消息
if(key.isReadable()){
//创建ByteBuffer,并开辟一个1M的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取请求码流,返回读取到的字节数
int readBytes = sc.read(buffer);
//读取到字节,对字节进行编解码
if(readBytes>0){
//将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
buffer.flip();
//根据缓冲区可读字节数创建字节数组
byte[] bytes = new byte[buffer.remaining()];
//将缓冲区可读字节数组复制到新建的数组中
buffer.get(bytes);
String result = new String(bytes,"UTF-8");
System.out.println("客户端收到消息:" + result);
}
//没有读取到字节 忽略
// else if(readBytes==0);
//链路已经关闭,释放资源
else if(readBytes<0){
key.cancel();
sc.close();
}
}
}
}
//异步发送消息
private void doWrite(SocketChannel channel,String request) throws IOException{
//将消息编码为字节数组
byte[] bytes = request.getBytes();
//根据数组容量创建ByteBuffer
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
//将字节数组复制到缓冲区
writeBuffer.put(bytes);
//flip操作
writeBuffer.flip();
//发送缓冲区的字节数组
channel.write(writeBuffer);
//****此处不含处理“写半包”的代码
}
private void doConnect() throws IOException{
if(socketChannel.connect(new InetSocketAddress(host,port)));
else socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
public void sendMsg(String msg) throws Exception{
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel, msg);
}
}
AIO:异步非阻塞
AIO基于事件和回调机制,不需要过多的Selector对注册的通道进行轮询即可实现异步读写,从而简化了NIO的编程模型。
服务端
public class AioServer {
public static void main(String[] args) {
// AIO线程复用版
Thread sThread = new Thread(new Runnable() {
@Override
public void run() {
AsynchronousChannelGroup group = null;
try {
group = AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(4));
AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group).bind(new InetSocketAddress(InetAddress.getLocalHost(), 8088));
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel>() {
@Override
public void completed(AsynchronousSocketChannel result, AsynchronousServerSocketChannel attachment) {
server.accept(null, this); // 接收下一个请求
try {
Future<Integer> f = result.write(Charset.defaultCharset().encode("你好,世界"));
f.get();
System.out.println("服务端发送时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
result.close();
} catch (InterruptedException | ExecutionException | IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) {
}
});
group.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
});
sThread.start();
}
}
客户端
class AioClient {
public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
// Socket 客户端
AsynchronousSocketChannel client = AsynchronousSocketChannel.open();
Future<Void> future = client.connect(new InetSocketAddress(InetAddress.getLocalHost(), 8088));
future.get();
ByteBuffer buffer = ByteBuffer.allocate(100);
client.read(buffer, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
System.out.println("客户端打印:" + new String(buffer.array()));
}
@Override
public void failed(Throwable exc, Void attachment) {
exc.printStackTrace();
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
Thread.sleep(10 * 1000);
}
}
网友评论