网络编程

作者: 我可能是个假开发 | 来源:发表于2023-01-02 16:55 被阅读0次

一、IO模型

IO模型就是说用什么样的通道进行数据的发送和接收,Java共支持3种网络编程IO模式:

  • BIO
  • NIO
  • AIO

1.BIO

1.1基本介绍

Blocking I/O,同步阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时,服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善(实现多个客户连接服务器)。

BIO.png

适用场景: 连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JKD1.4以前的唯一选择,但程序简单易理解。

缺点:一个客户端连接对应一个处理线程,没办法同时处理多个连接
改进:多线程处理
来一个客户端,开启一个新的线程在后端处理
弊端:C10K问题

connect 10*1000个连接:
服务端扛不住,内存不够
链接太多,服务端资源不够
改进:使用线程池:
但是这样 并发数也就限制在了线程池的数量这里

1.2工作机制

工作机制.png

网络编程的基本模型是 Client/Server模型,也就是两个进程之间进行相互通信,其中服务端提供位置信息(绑定IP地址和端口),客户端通过连接操作向服务端监听的端口地址发起连接请求,基于TCP协议下进行三次握手连接,连接成功后,双方通过网络套接字(Socket)进行通信。

传统的同步阻塞模型开发中,
服务端 ServerSocket负责绑定IP地址,启动监听端口;
客户端Socket负责发起连接操作。
连接成功后,双方通过输入和输出流进行同步阻塞式通信。
基于BIO模式下的通信,客户端-服务端是完全同步,完全耦合的。

BIO.png

1.3 传统BIO编程

需求

实现客户端发消息,服务端收消息。
代码实现

Client:

package com.hcx.bio;

import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket;

/**
 * @author hongcaixia
 * @version 1.0
 * @date 2021/7/13 17:16
 */
public class Client {
    public static void main(String[] args) throws IOException {
        System.out.println("===客户端启动===");
        Socket socket = new Socket("127.0.0.1", 9999);
        OutputStream outputStream = socket.getOutputStream();
        PrintStream printStream = new PrintStream(outputStream);
        //此处没有换行,而服务端是一行一行的读取数据 等不到换行就会报错
        //printStream.print("hello,server");
        printStream.println("hello,server");
        printStream.flush();
    }
}

Server:

package com.hcx.bio;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * @author hongcaixia
 * @version 1.0
 * @date 2021/7/13 16:21
 */
public class Server {
    public static void main(String[] args) {
        try {
            System.out.println("===服务端启动===");
            //服务端端口注册
            ServerSocket serverSocket = new ServerSocket(9999);
            //监听客户端的socket连接请求
            Socket socket = serverSocket.accept();
            //从socket管道中获得字节输入流对象
            InputStream inputStream = socket.getInputStream();
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
            String msg;
            if ((msg = bufferedReader.readLine()) != null) {
                System.out.println("服务端接收到:" + msg);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

总结

  • 上述代码中,服务端会一直等待客户端的消息,如果客户端没有进行消息的发送,服务端将一直进入阻塞状态。
  • 服务端是按照行获取消息的,所以客户端也必须按照行发送消息,否则服务端将进入等待消息的阻塞状态。

1.4 BIO模式下多发和多收消息

需求
实现客户端和服务器反复收发消息。

代码实现:

Client:

package com.hcx.bio2;

import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket;
import java.util.Scanner;

/**
 * @author hongcaixia
 * @version 1.0
 * @date 2021/7/13 17:16
 */
public class Client {
    public static void main(String[] args) throws IOException {
        System.out.println("===客户端启动===");
        Socket socket = new Socket("127.0.0.1", 9999);
        OutputStream outputStream = socket.getOutputStream();
        PrintStream printStream = new PrintStream(outputStream);
        Scanner scanner = new Scanner(System.in);
        while (true){
            System.out.println("请输入:");
            String msg = scanner.nextLine();
            printStream.println(msg);
            printStream.flush();
        }
    }
}

Server:

package com.hcx.bio2;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * @author hongcaixia
 * @version 1.0
 * @date 2021/7/13 16:21
 */
public class Server {
    public static void main(String[] args) {
        try {
            System.out.println("===服务端启动===");
            //服务端端口注册
            ServerSocket serverSocket = new ServerSocket(9999);
            //监听客户端的socket连接请求
            Socket socket = serverSocket.accept();
            //从socket管道中获得字节输入流对象
            InputStream inputStream = socket.getInputStream();
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
            String msg;
            while ((msg = bufferedReader.readLine()) != null) {
                System.out.println("服务端接收到:" + msg);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
客户端发送.png 服务端接收.png

1.5 BIO模式下接收多个客户端

在服务端引入线程,客户端每发起一个请求,服务端就创建一个新的线程来处理这个客户端的请求,这样就实现了一个客户端一个线程的模型。

代码示例:

Client:

package com.hcx.bio3;

import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket;
import java.util.Scanner;

/**
 * @author hongcaixia
 * @version 1.0
 * @date 2021/7/14 11:13
 */
public class Client {

    public static void main(String[] args) {
        try {
            Socket socket = new Socket("127.0.0.1",9999);
            OutputStream outputStream = socket.getOutputStream();
            PrintStream printStream = new PrintStream(outputStream);
            Scanner scanner = new Scanner(System.in);
            //循环发送消息
            while (true){
                System.out.println("请说:");
                String msg = scanner.nextLine();
                printStream.println(msg);
                printStream.flush();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

Server:

package com.hcx.bio3;

import java.net.ServerSocket;
import java.net.Socket;

/**
 * @author hongcaixia
 * @version 1.0
 * @date 2021/7/14 10:57
 */
public class Server {

    public static void main(String[] args) {
        try {
            ServerSocket serverSocket = new ServerSocket(9999);
            //循环不断接收客户端的socket连接请求
            while (true){
                Socket socket = serverSocket.accept();
                //创建独立的线程与客户端socket通信
                new ServerReaderThread(socket).start();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

ServerReaderThread:

package com.hcx.bio3;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;

/**
 * @author hongcaixia
 * @version 1.0
 * @date 2021/7/14 11:00
 */
public class ServerReaderThread extends Thread {

    private Socket socket;

    public ServerReaderThread(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        try {
            InputStream inputStream = socket.getInputStream();
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
            String msg;
            while ((msg = bufferedReader.readLine()) != null) {
                System.out.println(msg);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

启动一个服务端,多个客户端。

总结:

  • 1.每个Socket接收到,都会创建一个线程,线程的竞争、切换上下文影响性能;
  • 2.每个线程都会占用栈空间和CPU资源;
  • 3.并不是每个都进行IO操作,无意义的线程处理;
  • 4.客户端的并发访问增加时。服务端将呈现1:1的线程开销,访问量越大,系统将发生线程栈溢出,线程创建失败,最终导致进程宕机或者僵死,从而不能对外提供服务。

1.6 伪异步I/O

上述案例中:客户端的并发访问增加时。服务端将呈现1:1的线程开销,访问量越大,系统将发生线程栈溢出,线程创建失败,最终导致进程宕机或者僵死,从而不能对外提供服务。

改进:
采用一个伪异步/O的通信框架,采用线程池和任务队列实现,当客户端接入时,将客户端的 Socket封装成一个Task(该任务实现java.lang. Runnable线程任务接口)交给后端的线程池中进行处理。JDK的线程池维护一个消息队列和N个活跃的线程,对消息队列中 Socket任务进行处理,由于线程池可以设置消息队列的大小和最大线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机。


image.png

代码示例:

Server:

package com.hcx.bio4;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author hongcaixia
 * @version 1.0
 * @date 2021/7/14 15:11
 */
public class Server {
    public static void main(String[] args) {
        try {
            ServerSocket serverSocket = new ServerSocket(9999);

//            while (true) {
//                Socket socket = serverSocket.accept();
//                new ThreadPoolExecutor(3, 1, 120, TimeUnit.SECONDS,
//                        new ArrayBlockingQueue<>(3)).execute(() -> {
//                    //处理接收到的客户端socket通信
//                    try {
//                        //从socket中获取字节输入流
//                        InputStream inputStream = socket.getInputStream();
//                        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
//                        String msg;
//                        if ((msg = bufferedReader.readLine()) != null) {
//                            System.out.println("服务端接收到:" + msg);
//                        }
//                    } catch (IOException e) {
//                        e.printStackTrace();
//                    }
//                });
//            }

            //初始化线程池对象
            HandlerSocketServerPool pool = new HandlerSocketServerPool(3, 10);

            //循环接收客户端socket连接请求
            while (true) {
                Socket socket = serverSocket.accept();
                //把socket封装成任务对象交给线程池处理
                Runnable runnable = new ServerRunnableTarget(socket);
                pool.execute(runnable);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

HandlerSocketServerPool:

package com.hcx.bio4;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author hongcaixia
 * @version 1.0
 * @date 2021/7/14 15:11
 */
public class HandlerSocketServerPool {

    //创建一个线程池成员变量用于存储线程池对象
    private ExecutorService executorService;

    //创建对象时初始化线程池对象
    public HandlerSocketServerPool(int maxThreadNum,int queueSize){
        executorService = new ThreadPoolExecutor(3, maxThreadNum, 120, TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(queueSize));
    }

    //提交任务给线程池的任务队列,等待线程池处理
    public void execute(Runnable target){
        executorService.execute(target);
    }
}

ServerRunnableTarget:

package com.hcx.bio4;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;

/**
 * @author hongcaixia
 * @version 1.0
 * @date 2021/7/14 15:34
 */
public class ServerRunnableTarget implements Runnable{

    private Socket socket;
    public ServerRunnableTarget(Socket socket){
        this.socket = socket;
    }

    @Override
    public void run() {
        //处理接收到的客户端socket通信
        try {
            //从socket中获取字节输入流
            InputStream inputStream = socket.getInputStream();
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
            String msg;
            if((msg = bufferedReader.readLine())!=null){
                System.out.println("服务端接收到:"+msg);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Client:

package com.hcx.bio4;

import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket;
import java.util.Scanner;

/**
 * @author hongcaixia
 * @version 1.0
 * @date 2021/7/14 11:13
 */
public class Client {

    public static void main(String[] args) {
        try {
            Socket socket = new Socket("127.0.0.1",9999);
            OutputStream outputStream = socket.getOutputStream();
            PrintStream printStream = new PrintStream(outputStream);
            Scanner scanner = new Scanner(System.in);
            //循环发送消息
            while (true){
                System.out.println("请说:");
                String msg = scanner.nextLine();
                printStream.println(msg);
                printStream.flush();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

HandlerSocketServerPool pool = new HandlerSocketServerPool(3, 10);当客户端启动到三个时,再启动客户端则会进入队列中等待。

总结:

  • 伪异步io采用了线程池实现,因此避免了为每个请求创建一个独立线程造成线程资源耗尽的问题,但由于底层依然是采用的同步阻塞模型,因此无法从根本上解决问题。
  • 如果单个消息处理的缓慢,或者服务器线程池中的全部线程都被阻塞,那么后续 socket的i/o消息都将在队列中排队。新的 Socket请求将被拒绝,客户端会发生大量连接超时。

1.7 基于BIO的文件上传

代码实现:

Client:

package com.hcx.file;

import java.io.*;
import java.net.Socket;

/**
 * @author hongcaixia
 * @version 1.0
 * @date 2021/7/14 17:02
 */
public class Client {
    public static void main(String[] args) {
        try {
            Socket socket = new Socket("127.0.0.1", 8888);
            OutputStream outputStream = socket.getOutputStream();
            DataOutputStream dos = new DataOutputStream(outputStream);
            //发送上传文件的后缀给服务端
            dos.writeUTF(".txt");
            ///Users/hongcaixia/Documents/11.txt
            InputStream is = new FileInputStream("/Users/hongcaixia/Documents/11.txt");
            //发送数据给服务端
            byte[] buffer = new byte[1024];
            int len;
            while ((len = is.read(buffer))>0){
                dos.write(buffer,0,len);
            }
            dos.flush();
            //通知服务端数据发送完毕
            socket.shutdownOutput();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Server:

package com.hcx.file;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * 接收客户端的文件并保存到磁盘
 *
 * @author hongcaixia
 * @version 1.0
 * @date 2021/7/14 17:02
 */
public class Server {
    public static void main(String[] args) {
        try {
            ServerSocket serverSocket = new ServerSocket(8888);
            while (true) {
                Socket socket = serverSocket.accept();
                new ServerReaderThread(socket).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

ServerReaderThread:

package com.hcx.file;

import java.io.*;
import java.net.Socket;
import java.util.UUID;

/**
 * @author hongcaixia
 * @version 1.0
 * @date 2021/7/14 17:49
 */
public class ServerReaderThread extends Thread{

    private Socket socket;
    public ServerReaderThread(Socket socket){
        this.socket = socket;
    }

    @Override
    public void run() {
        try {
            InputStream inputStream = socket.getInputStream();
            //数据输入流读取客户端发送过来的数据
            DataInputStream dis = new DataInputStream(inputStream);
            //读取客户端发送过来的文件类型
            String suffix = dis.readUTF();
            System.out.println("服务端成功接收到了文件类型:"+suffix);
            //使用字节输出管道把客户端发来的文件写出去
            OutputStream ops = new FileOutputStream("/Users/hongcaixia/Documents/"+ UUID.randomUUID().toString()+suffix);
            //从数据输入流中读取数据,写到字节输出流
            byte[] buffer = new byte[1024];
            int len;
            while ((len = dis.read(buffer))>0){
                ops.write(buffer,0,len);
            }
            ops.close();
            System.out.println("服务端接收文件并保存成功");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

1.8 BIO模式下的端口转发

需求:
实现一个客户端发消息给所有客户端接收(群聊模式)

image.png

代码实现:

Server:

package com.hcx.chat;

import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;

/**
 * BIO模式下的端口转发
 * @author hongcaixia
 * @version 1.0
 * @date 2021/7/14 20:35
 */
public class Server {
    //存储所有在线socket
    public static List<Socket> allOnlineSocket = new ArrayList<>();

    public static void main(String[] args) {
        try {
            ServerSocket serverSocket = new ServerSocket(9999);
            while (true){
                Socket socket = serverSocket.accept();
                //把登陆的客户端socket存到集合中
                allOnlineSocket.add(socket);
                //为每个socket分配独立的线程来处理
                new ServerReaderThread(socket).start();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

ServerReaderThread:

package com.hcx.chat;

import java.io.*;
import java.net.Socket;

/**
 * @author hongcaixia
 * @version 1.0
 * @date 2021/7/14 20:41
 */
public class ServerReaderThread extends Thread{

    private Socket socket;
    public ServerReaderThread(Socket socket){
        this.socket = socket;
    }
    @Override
    public void run() {
        try {
            InputStream inputStream = socket.getInputStream();
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
            String msg;
            while ((msg = bufferedReader.readLine())!=null){
                //服务端接收到客户端的消息,将消息推送给当前所有在线socket
                sendMsgToAllClient(msg);
            }
        } catch (IOException e) {
            System.out.println("当前有人下线了");
            //从在线socket中移除本socket
            Server.allOnlineSocket.remove(socket);
        }
    }

    /**
     * 把当前客户端发来的消息推送给所有在线socket
     * @param msg
     * @throws IOException
     */
    private void sendMsgToAllClient(String msg) throws IOException {
        for (Socket socket1 : Server.allOnlineSocket) {
            OutputStream outputStream = socket1.getOutputStream();
            PrintStream printStream = new PrintStream(outputStream);
            printStream.println(msg);
            printStream.flush();
        }
    }
}

2.NIO

同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求就进行处理。

2.1 基本介绍

  • Java NIO(New IO)也称之为 java non-blocking IO是从Java 1.4版本开始引入的一个新的IO API,可以替代标准的Java IO API。NIO与原来的IO有同样的作用和目的,但是使用的方式完全不同,NIO支持面向缓冲区的、基于通道的IO操作。NIO将以更加高效的方式进行文件的读写操作。NIO可以理解为非阻塞IO,传统的IO的read和write只能阻塞执行,线程在读写IO期间不能干其他事情,比如调用socket.read()时,如果服务器一直没有数据传输过来,线程就一直阻塞,而NIO中可以配置socket为非阻塞模式。

  • NIO 相关类都被放在 java.nio 包及子包下,并且对原 java.io 包中的很多类进行改写。

  • NIO 有三大核心部分:Channel( 通道) ,Buffer( 缓冲区), Selector( 选择器)
  • Java NIO 的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。

  • NIO可以做到用一个线程来处理多个操作。假设有 1000 个请求过来,根据实际情况,可以分配20 或者 80个线程来处理。不像之前的阻塞 IO 那样,非得分配 1000 个。

image.png

适用场景:
连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器间通讯等。编程复杂,JDK1.4开始支持。

2.2 NIO 和 BIO 的比较

  • BIO 以流的方式处理数据,而 NIO 以块的方式处理数据,块 I/O 的效率比流 I/O 高很多
  • BIO 是阻塞的,NIO 则是非阻塞的
  • BIO 基于字节流和字符流进行操作,而 NIO 基于 Channel(通道)和 Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道
NIO BIO
面向缓冲区(Buffer) 面向流(Stream)
非阻塞(Non Blocking IO) 阻塞IO(Blocking IO)
选择器(Selectors)

2.3 NIO核心概念

NIO 有三大核心部分:

  • Channel( 通道)
  • Buffer( 缓冲区)
  • Selector( 选择器)
2.3.1 Buffer

缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成NIO Buffer对象,并提供了一组方法,用来方便的访问该块内存。相比较直接对数组的操作,Buffer API更加容易操作和管理。

2.3.2 Channel

Java NIO的通道类似流,但又有些不同:既可以从通道中读取数据,又可以写数据到通道。但流的(input或output)读写通常是单向的。 通道可以非阻塞读取和写入通道,通道可以支持读取或写入缓冲区,也支持异步地读写。

2.3.3 Selector

Selector是 一个Java NIO组件,可以能够检查一个或多个 NIO 通道,并确定哪些通道已经准备好进行读取或写入。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接,提高效率

image.png

特点:

  • 每个 channel 都会对应一个 Buffer
  • 一个线程对应Selector , 一个Selector对应多个 channel(连接)
  • 程序切换到哪个 channel 是由事件决定的
  • Selector 会根据不同的事件,在各个通道上切换
  • Buffer 就是一个内存块 , 底层是一个数组
  • 数据的读取写入是通过 Buffer完成的 , BIO 中要么是输入流,或者是输出流, 不能双向,但是 NIO 的 Buffer 是可以读也可以写。
  • 通道(Channel)和缓冲区 (Buffer)。通道表示打开到 IO 设备(例如:文件、 套接字)的连接。若需要使用 NIO 系统,需要获取 用于连接 IO 设备的通道以及用于容纳数据的缓冲 区。然后操作缓冲区,对数据进行处理。简而言之,Channel 负责传输, Buffer 负责存取数据

2.4 Buffer

2.4.1.定义
一个用于特定基本数据类型的容器。由 java.nio 包定义的,所有缓冲区都是 Buffer 抽象类的子类。

Java NIO 中的 Buffer 主要用于与 NIO 通道进行交互,数据是从通道读入缓冲区,从缓冲区写入通道中


image.png

2.4.2 Buffer类及其子类

Buffer就像一个数组,可以保存多个相同类型的数据。根据数据类型不同,有以下 Buffer 常用子类:

  • ByteBuffer
  • CharBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer

上述 Buffer 类,他们都采用相似的方法进行管理数据,只是各自管理的数据类型不同而已。都是通过如下方法获取一个 Buffer 对象:

//创建一个容量为capacity的XxxBuffer对象
static XxxBuffer allocate(int capacity) 

2.4.3 缓冲区的基本属性

Buffer 中的重要概念:

  • 容量 (capacity) :作为一个内存块,Buffer具有一定的固定大小,也称为"容量",缓冲区容量不能为负,并且创建后不能更改。
  • 限制 (limit):表示缓冲区中可以操作数据的大小(limit 后数据不能进行读写)。缓冲区的限制不能为负,并且不能大于其容量。 写入模式,限制等于buffer的容量。读取模式下,limit等于写入的数据量
  • 位置 (position):下一个要读取或写入的数据的索引。缓冲区的位置不能为负,并且不能大于其限制
  • 标记 (mark)与重置 (reset):标记是一个索引,通过 Buffer 中的 mark() 方法 指定 Buffer 中一个特定的 position,之后可以通过调用 reset() 方法恢复到这个 position.

标记、位置、限制、容量遵守不变式: 0 <= mark <= position <= limit <= capacity

image.png

2.4.4常见方法

  • Buffer clear() :清空缓冲区并返回对缓冲区的引用
  • Buffer flip() :为将缓冲区的界限设置为当前位置,并将当前位置置为 0
  • int capacity(): 返回 Buffer 的 capacity 大小
  • boolean hasRemaining(): 判断缓冲区中是否还有元素
  • int limit(): 返回 Buffer 的界限(limit) 的位置
  • Buffer limit(int n) :将设置缓冲区界限为 n, 并返回一个具有新 limit 的缓冲区对象
  • Buffer mark() :对缓冲区设置标记
  • int position(): 返回缓冲区的当前位置 position
  • Buffer position(int n) :将设置缓冲区的当前位置为 n , 并返回修改后的 Buffer 对象
  • int remaining() :返回 position 和 limit 之间的元素个数
  • Buffer reset(): 将位置 position 转到以前设置的 mark 所在的位置
  • Buffer rewind(): 将位置设为为 0, 取消设置的 mark

2.4.5 数据操作

Buffer 所有子类提供了两个用于数据操作的方法:

  • get()
  • put()

获取 Buffer中的数据:

get() :读取单个字节
get(byte[] dst):批量读取多个字节到 dst 中
get(int index):读取指定索引位置的字节(不会移动 position)

放入数据到 Buffer中:

put(byte b):将给定单个字节写入缓冲区的当前位置
put(byte[] src):将 src 中的字节写入缓冲区的当前位置
put(int index, byte b):将指定字节写入缓冲区的索引位置(不会移动 position)

使用Buffer读写数据一般遵循以下四个步骤:

  • 1.写入数据到Buffer
  • 2.调用flip()方法,转换为读取模式
  • 3.从Buffer中读取数据
  • 4.调用buffer.clear()方法或者buffer.compact()方法清除缓冲区

2.4.6 代码示例

public class BufferTest {

    @Test
    public void test01() {
        //分配一个缓冲区,容量为10
        ByteBuffer buffer = ByteBuffer.allocate(10);
        System.out.println("缓冲区当前位置:" + buffer.position());//0
        System.out.println("缓冲区界限:" + buffer.limit());//10
        System.out.println("缓冲区容量:" + buffer.capacity());//10

        System.out.println("==============");
        //往缓冲区添加数据
        String name = "hongcx";
        buffer.put(name.getBytes());
        System.out.println("缓冲区当前位置:" + buffer.position());//6
        System.out.println("缓冲区界限:" + buffer.limit());//10
        System.out.println("缓冲区容量:" + buffer.capacity());//10

        System.out.println("==============");
        //为将缓冲区的界限设置为当前位置,并将当前位置置为0 切换为可读模式
        buffer.flip();
        System.out.println("缓冲区当前位置:" + buffer.position());//0
        //前6个位置可读
        System.out.println("缓冲区界限:" + buffer.limit());//6
        System.out.println("缓冲区容量:" + buffer.capacity());//10

        System.out.println("==============");
        //使用get读取数据
        char c = (char) buffer.get();
        System.out.println("读取到的字符是:" + c);//h
        System.out.println("缓冲区当前位置:" + buffer.position());//1
        System.out.println("缓冲区界限:" + buffer.limit());//6
        System.out.println("缓冲区容量:" + buffer.capacity());//10
    }

    @Test
    public void test02() {
        //clear
        ByteBuffer buffer = ByteBuffer.allocate(10);
        String name = "hongcx";
        buffer.put(name.getBytes());
        //清楚缓冲区中的数据 只是把position变成了0,数据还存在,直到后续添加了新的数据 才会覆盖掉
        buffer.clear();
        System.out.println(buffer.position());//0
        System.out.println((char) buffer.get());//h

        System.out.println("=======================");
        //flip
        ByteBuffer buffer2 = ByteBuffer.allocate(10);
        String name2 = "hongcx";
        buffer2.put(name2.getBytes());
        buffer2.flip();
        byte[] bytes = new byte[2];
        buffer2.get(bytes);
        String str = new String(bytes);
        System.out.println(str);//ho
        System.out.println("缓冲区当前位置:" + buffer2.position());//2
        System.out.println("缓冲区界限:" + buffer2.limit());//6
        System.out.println("缓冲区容量:" + buffer2.capacity());//10

        System.out.println("=======================");
        //标记此刻的位置 2 标记之后 后续可以回到此处
        buffer2.mark();
        byte[] bytes1 = new byte[3];
        buffer2.get(bytes1);
        System.out.println(new String(bytes1));//ngc
        System.out.println("缓冲区当前位置:" + buffer2.position());//5
        System.out.println("缓冲区界限:" + buffer2.limit());//6
        System.out.println("缓冲区容量:" + buffer2.capacity());//10

        //reset 回到标记位置
        buffer2.reset();
        if(buffer2.hasRemaining()){
            System.out.println(buffer2.remaining());
        }
    }
}

2.4.7 直接与非直接缓冲区

定义

byte byffer可以是两种类型:

  • 直接内存(也就是非堆内存):JVM将会在IO操作上具有更高的性能,因为它直接作用于本地系统的IO操作
  • 非直接内存(也就是堆内存):即堆内存中的数据,如果要作IO操作,会先从本进程内存复制到直接内存,再利用本地IO处理。

从数据流的角度,非直接内存的作用链:

本地IO-->直接内存-->非直接内存-->直接内存-->本地IO

直接内存:

本地IO-->直接内存-->本地IO

在做IO处理时,比如网络发送大量数据时,直接内存会具有更高的效率。直接内存使用allocateDirect创建,但是它比申请普通的堆内存需要耗费更高的性能。不过,这部分的数据是在JVM之外的,因此它不会占用应用的内存。所以,当有很大的数据要缓存,并且它的生命周期又很长,那么就比较适合使用直接内存。只是一般来说,如果不是能带来很明显的性能提升,还是推荐直接使用堆内存。字节缓冲区是直接缓冲区还是非直接缓冲区可通过调用其isDirect()方法来确定。

直接缓冲区使用场景

  • 有很大的数据需要存储,它的生命周期又很长
  • 适合频繁的IO操作,比如网络并发场景

2.5 Channel

2.5.1 概述

通道(Channel):由java.nio.channels 包定义 的。Channel 表示 IO 源与目标打开的连接。
Channel 类似于传统的“流”。只不过 Channel 本身不能直接访问数据,Channel 只能与 Buffer 进行交互。

1、NIO 的通道类似于流,但有些区别如下:

  • 通道可以同时进行读写,而流只能读或者只能写
  • 通道可以实现异步读写数据
  • 通道可以从缓冲读数据,也可以写数据到缓冲:

2、BIO 中的 stream 是单向的,例如 FileInputStream 对象只能进行读取数据的操作,而 NIO 中的通道(Channel)
是双向的,可以读操作,也可以写操作。

3、Channel 在 NIO 中是一个接口:public interface Channel extends Closeable{}

2.5.2 常用Channel实现类

  • FileChannel:用于读取、写入、映射和操作文件的通道。
  • DatagramChannel:通过 UDP 读写网络中的数据通道。
  • SocketChannel:通过 TCP 读写网络中的数据。
  • ServerSocketChannel:可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个 SocketChannel。

ServerSocketChannel 类似 ServerSocket , SocketChannel 类似 Socket

2.5.3 FileChannel

获取通道的一种方式是对支持通道的对象调用getChannel() 方法。支持通道的类如下:

  • FileInputStream
  • FileOutputStream
  • RandomAccessFile
  • DatagramSocket
  • Socket
  • ServerSocket

获取通道的其他方式:

  • 使用 Files 类的静态方法 newByteChannel() 获取字节通道。
  • 通过通道的静态方法 open() 打开并返回指定通道

常用方法:

  • int read(ByteBuffer dst) 从Channel中读取数据到 ByteBuffer
  • long read(ByteBuffer[] dsts) 将Channel 中的数据“分散”到 ByteBuffer[]
  • int write(ByteBuffer src) 将ByteBuffer中的数据写入到 Channel
  • long write(ByteBuffer[] srcs) 将ByteBuffer[]中的数据“聚集”到 Channel
  • long position() 返回此通道的文件位置
  • FileChannel position(long p) 设置此通道的文件位置
  • long size() 返回此通道的文件的当前大小
  • FileChannel truncate(long s) 将此通道的文件截取为给定大小
  • void force(boolean metaData) 强制将所有对此通道的文件更新写入到存储设备中

2.5.4 代码示例

本地文件写数据:

/**
 * 写数据到文件中
 */
@Test
public void writeFile() {
    try {
        //字节输出流通向目标文件
        FileOutputStream fos = new FileOutputStream("/Users/hongcaixia/a.txt");
        //得到字节输出流对应的通道channel
        FileChannel channel = fos.getChannel();
        //分配缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        buffer.put("hello,channel".getBytes());
        //把缓冲区切换成写模式
        buffer.flip();
        channel.write(buffer);
        channel.close();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

本地文件读数据:

/**
 * 从文件中读取数据
 */
@Test
public void readFile() {
    try {
        //定义文件字节输入流与文件接通
        FileInputStream fis = new FileInputStream("/Users/hongcaixia/a.txt");
        //获取文件字节输入流的文件通道channel
        FileChannel channel = fis.getChannel();
        //定义缓冲区
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        //读取数据到缓冲区
        channel.read(buffer);
        //将缓冲区的界限设置为当前位置,并将当前位置置为 0
        buffer.flip();
        //读取缓冲区的数据
        String str = new String(buffer.array(), 0, buffer.remaining());
        System.out.println(str);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

复制文件:

/**
 * 复制文件
 */
@Test
public void copyFile() throws Exception {
    //源文件
    File srcFile = new File("/Users/hongcaixia/Documents/image/psc.jpg");
    //目标文件
    File destFile = new File("/Users/hongcaixia/Documents/image/pscNew.jpg");
    //获取字节输入输出流
    FileInputStream fis = new FileInputStream(srcFile);
    FileOutputStream fos = new FileOutputStream(destFile);
    //获取文件通道channel
    FileChannel fisChannel = fis.getChannel();
    FileChannel fosChannel = fos.getChannel();
    //分配缓冲区
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    while (true) {
        //清空缓冲区
        buffer.clear();
        //读取数据
        int read = fisChannel.read(buffer);
        if(read==-1){
            break;
        }
        //将缓冲区设置为可读模式
        buffer.flip();
        //写出数据
        fosChannel.write(buffer);
    }
    fisChannel.close();
    fosChannel.close();
}

分散读取(Scatter ):把Channel通道的数据读入到多个缓冲区中去

聚集写入(Gathering ):将多个 Buffer 中的数据“聚集”到 Channel。

使用分散和聚集实现文件复制:

/**
 * 使用分散和聚集复制文件
 */
@Test
public void copyFile1() throws Exception {
    //获取字节输入输出流
    FileInputStream fis = new FileInputStream("/Users/hongcaixia/a.txt");
    FileOutputStream fos = new FileOutputStream("/Users/hongcaixia/b.txt");
    //获取文件通道channel
    FileChannel fisChannel = fis.getChannel();
    FileChannel fosChannel = fos.getChannel();
    //分散读入
    ByteBuffer buffer1 = ByteBuffer.allocate(5);
    ByteBuffer buffer2 = ByteBuffer.allocate(1024);
    ByteBuffer[] buffers = {buffer1,buffer2};
    //从通道中读取数据分散到各个缓冲区
    fisChannel.read(buffers);
    for (ByteBuffer buffer : buffers) {
        //切换到读数据模式
        buffer.flip();
    }
    //聚集写出
    fosChannel.write(buffers);
    fisChannel.close();
    fosChannel.close();
}

transferFrom() :从目标通道中复制原通道数据

/**
 * 使用transferFrom复制文件
 */
@Test
public void testTransferFrom() throws Exception{
    //获取字节输入输出流
    FileInputStream fis = new FileInputStream("/Users/hongcaixia/a.txt");
    FileOutputStream fos = new FileOutputStream("/Users/hongcaixia/b.txt");
    //获取文件通道channel
    FileChannel fisChannel = fis.getChannel();
    FileChannel fosChannel = fos.getChannel();
    //复制数据
    fosChannel.transferFrom(fisChannel,fisChannel.position(),fisChannel.size());
    fisChannel.close();
    fosChannel.close();
}

transferTo() :把原通道数据复制到目标通道

/**
 * 使用transferTo复制文件
 */
@Test
public void testTransferTo() throws Exception{
    //获取字节输入输出流
    FileInputStream fis = new FileInputStream("/Users/hongcaixia/a.txt");
    FileOutputStream fos = new FileOutputStream("/Users/hongcaixia/b.txt");
    //获取文件通道channel
    FileChannel fisChannel = fis.getChannel();
    FileChannel fosChannel = fos.getChannel();
    //复制数据
    fisChannel.transferTo(fisChannel.position(),fisChannel.size(),fosChannel);
    fisChannel.close();
    fosChannel.close();
}

2.6 Selector

概述
选择器(Selector)是 SelectableChannle 对象的多路复用器,Selector 可以同时监控多个 SelectableChannel 的 IO 状况,也就是说,利用 Selector可使一个单独的线程管理多个 Channel。

Selector 是非阻塞 IO 的核心


image.png image.png

特点:

  • 非阻塞的 IO 方式。可以用一个线程,处理多个的客户端连接,就会使用到 Selector(选择器)
  • Selector 能够检测多个注册的通道上是否有事件发生(注意:多个 Channel 以事件的方式可以注册到同一个
    Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管
    理多个通道,也就是管理多个连接和请求。
  • 只有在 连接/通道 真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都
    创建一个线程,不用去维护多个线程
  • 避免了多线程之间的上下文切换导致的开销

使用:

创建Selector:Selector.open()

Selector selector = Selector.open();

向选择器注册通道:SelectableChannel.register(Selector sel, int ops)

//获取通道
ServerSocketChannel ssChannel = ServerSocketChannel.open();
//切换非阻塞模式
ssChannel.configureBlocking(false);
//绑定连接
ssChannel.bind(new InetSocketAddress(9999));
//获取选择器
Selector selector = Selector.open();
//将通道注册到选择器上, 并且指定“监听接收事件”
ssChannel.register(selector, SelectionKey.OP_ACCEPT);

调用 register(Selector sel, int ops) 将通道注册选择器时,选择器对通道的监听事件,需要通过第二个参数 ops 指定。

可以监听的事件类型(可使用 SelectionKey 的四个常量表示):

  • 读 : SelectionKey.OP_READ
  • 写 : SelectionKey.OP_WRITE
  • 连接 : SelectionKey.OP_CONNECT
  • 接收 : SelectionKey.OP_ACCEPT

若注册时不止监听一个事件,则可以使用“位或”操作符连接。int interestSet = SelectionKey.OP_READ|SelectionKey.OP_WRITE

2.7 NIO非阻塞式网络通信

Selector可以实现: 一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 I/O 一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。


image.png

Server:

package com.hcx.nio.selector;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
 * @author hongcaixia
 * @version 1.0
 * @date 2021/7/19 14:42
 */
public class Server {

    public static void main(String[] args) throws Exception {
        //获取通道
        ServerSocketChannel ssChannel = ServerSocketChannel.open();
        //设置为非阻塞模式
        ssChannel.configureBlocking(false);
        //绑定连接端口
        ssChannel.bind(new InetSocketAddress(9999));
        //获取选择器selector
        Selector selector = Selector.open();
        //将通道注册到选择器 并监听接收事件
        ssChannel.register(selector, SelectionKey.OP_ACCEPT);
        //使用selector轮询已经就绪好的事件
        while (selector.select() > 0) {
            //从选择器中获取所有注册好的通道的就绪事件
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                //判断当前事件
                //接收事件
                if (selectionKey.isAcceptable()) {
                    //获取当前接入的客户端通道
                    SocketChannel sChannel = ssChannel.accept();
                    //设置为非阻塞
                    sChannel.configureBlocking(false);
                    //将客户端通道注册到选择器 并监听读取事件
                    sChannel.register(selector, SelectionKey.OP_READ);
                } else if (selectionKey.isReadable()) {
                    //读取事件
                    //获取当前选择器上的读就绪事件
                    SocketChannel sChannel = (SocketChannel) selectionKey.channel();
                    //读取数据
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int len = 0;
                    while ((len = sChannel.read(buffer)) > 0) {
                        buffer.flip();
                        System.out.println(new String(buffer.array(), 0, len));
                        buffer.clear();
                    }
                }
                //处理完毕移除当前事件
                iterator.remove();
            }
        }
    }
}

Client:

package com.hcx.nio.selector;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;

/**
 * @author hongcaixia
 * @version 1.0
 * @date 2021/7/19 15:18
 */
public class Client {
    public static void main(String[] args) throws Exception{
        //获取通道
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",9999));
        //设置为非阻塞
        socketChannel.configureBlocking(false);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        //发送数据到服务端
        Scanner scanner = new Scanner(System.in);
        while (true){
            System.out.println("请输入:");
            String msg = scanner.nextLine();
            buffer.put(("小红:"+msg).getBytes());
            buffer.flip();
            socketChannel.write(buffer);
            buffer.clear();
        }
    }
}
2.7.1 Epoll

1.int epoll_create(int size);

创建一个epoll实例,并返回一个非负数作为文件描述符,用于对epoll接口的所有后续调用。

size:代表可能会容纳size个描述符,但size不是一个最大值,只是提示操作系统它的数量级,现在这个参数基本上已经弃用了。

2.int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

使用文件描述符epfd引用的epoll实例,对目标文件描述符fd执行op操作。

参数epfd表示epoll对应的文件描述符,参数fd表示socket对应的文件描述符。

参数op有以下几个值:

  • EPOLL_CTL_ADD:注册新的fd到epfd中,并关联事件event;

  • EPOLL_CTL_MOD:修改已经注册的fd的监听事件;

  • EPOLL_CTL_DEL:从epfd中移除fd,并且忽略掉绑定的event,这时event可以为null;

参数event是一个结构体

struct epoll_event {
     __uint32_t   events;      /* Epoll events */
     epoll_data_t data;        /* User data variable */
};
    
typedef union epoll_data {
     void        *ptr;
     int          fd;
     __uint32_t   u32;
     __uint64_t   u64;
} epoll_data_t;

events可选值:

  • EPOLLIN :表示对应的文件描述符是可读的;

  • EPOLLOUT:表示对应的文件描述符是可写的;

  • EPOLLERR:表示对应的文件描述符发生了错误;

成功则返回0,失败返回-1

3.int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

等待文件描述符epfd上的事件。

  • epfd:Epoll对应的文件描述符,
  • events:调用者所有可用事件的集合,
  • maxevents:最多等到多少个事件就返回
  • timeout:超时时间

I/O多路复用底层主要用的Linux 内核·函数(select,poll,epoll)来实现,windows不支持epoll实现,windows底层是基于winsock2的select函数实现的(不开源)

select poll epoll(jdk 1.5及以上)
操作方式 遍历 遍历 回调
底层实现 数组 链表 哈希表
IO效率 每次调用都进行线性遍历,时间复杂度为O(n) 每次调用都进行线性遍历,时间复杂度为O(n) 事件通知方式,每当有IO事件就绪,系统注册的回调函数就会被调用,时间复杂度O(1)
最大连接 有上限 无上限 无上限
2.7.2 Redis线程模型

Redis是典型的基于epoll的NIO线程模型(nginx也是),epoll实例收集所有事件(连接与读写事件),由一个服务端线程连续处理所有事件命令。

Redis底层关于epoll的源码实现在redis的src源码目录的ae_epoll.c文件里。

2.8 在线聊天室

Server:

package com.hcx.nio.chat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

/**
 * @author hongcaixia
 * @version 1.0
 * @date 2021/7/19 20:01
 */
public class Server {
    //选择器
    private Selector selector;
    //服务端通道
    private ServerSocketChannel ssChannel;
    //端口
    private static final int PORT = 9999;

    //初始化
    public Server() {
        try {
            //创建选择器
            selector = Selector.open();
            //获取通道
            ssChannel = ServerSocketChannel.open();
            //绑定客户端连接的端口
            ssChannel.bind(new InetSocketAddress(PORT));
            //设置非阻塞
            ssChannel.configureBlocking(false);
            //把通道注册到选择器 指定接收连接事件
            ssChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 监听客户端的消息
     */
    private void listen() {
        try {
            while (selector.select() > 0) {
                //获取选择器中所有注册通道的就绪事件
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey sk = iterator.next();
                    //判断事件类型
                    if (sk.isAcceptable()) {
                        //接入事件
                        //获取当前客户端通道
                        SocketChannel sChannel = ssChannel.accept();
                        //设置为非阻塞模式
                        sChannel.configureBlocking(false);
                        //将客户端通道注册到选择器 并监听读取事件
                        sChannel.register(selector, SelectionKey.OP_READ);
                    } else if (sk.isReadable()) {
                        //可读事件
                        //处理客户端消息 接收并转发
                        readClientData(sk);
                    }
                    //处理完毕移除当前事件
                    iterator.remove();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 接收当前客户端通道的消息并转发到其他所有客户端通道
     *
     * @param sk
     */
    private void readClientData(SelectionKey sk) {
        SocketChannel sChannel = null;
        try {
            sChannel = (SocketChannel) sk.channel();
            //创建缓冲区对象开始接收客户端通道的数据
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int count = sChannel.read(buffer);
            if (count > 0) {
                buffer.flip();
                //获取读到的信息
                String msg = new String(buffer.array(), 0, buffer.remaining());
                System.out.println("接收到了客户端消息:" + msg);
                //把消息推送给其他所有客户端
                sendMsgToAllClient(msg, sChannel);
            }
        } catch (Exception e) {
            try {
                System.out.println("有人离线了:" + sChannel.getRemoteAddress());
                //取消注册
                sk.cancel();
                sChannel.close();
            } catch (Exception e1) {
                e1.printStackTrace();
            }
            e.printStackTrace();
        }
    }

    /**
     * 把当前客户端的消息推送给全部在线注册的channel
     *
     * @param msg
     * @param sChannel
     */
    private void sendMsgToAllClient(String msg, SocketChannel sChannel) throws IOException {
        System.out.println("服务端开始转发消息:当前处理的线程:" + Thread.currentThread().getName());
        for (SelectionKey key : selector.keys()) {
            Channel channel = key.channel();
            //排除自己
            if (channel instanceof SocketChannel && channel != sChannel) {
                ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                ((SocketChannel) channel).write(buffer);
            }
        }
    }

    public static void main(String[] args) {
        //创建服务端对象
        Server server = new Server();
        //监听客户端的消息事件:连接消息 群聊消息 离线消息
        server.listen();
    }
}

Client:

package com.hcx.nio.chat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;

/**
 * @author hongcaixia
 * @version 1.0
 * @date 2021/7/19 20:01
 */
public class Client {

    private Selector selector;
    private static int PORT = 9999;
    private SocketChannel socketChannel;

    //初始化客户端
    public Client() {
        try {
            //创建选择器
            selector = Selector.open();
            //连接服务端
            socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", PORT));
            //设置非阻塞
            socketChannel.configureBlocking(false);
            //监听读事件 读取服务端发来的消息
            socketChannel.register(selector, SelectionKey.OP_READ);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Client client = new Client();
        //定义线程专门负责监听服务端发送的读消息事件
        new Thread(() -> {
            try {
                client.readInfo();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()){
            String s = scanner.nextLine();
            client.sendToServer(s);
        }
    }

    /**
     * 发送消息到服务端
     * @param s
     */
    private void sendToServer(String s) {
        try {
            socketChannel.write(ByteBuffer.wrap(("小红说:"+s).getBytes()));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 读取服务端发送过来的消息
     */
    private void readInfo() throws IOException {
        while (selector.select() > 0) {
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                if (key.isReadable()) {
                    SocketChannel sc = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    sc.read(buffer);
                    System.out.println(new String(buffer.array()).trim());
                }
                iterator.remove();
            }
        }
    }
}

3.AIO

3.1 基本介绍

Java AIO(NIO.2) : 异步非阻塞,基于NIO的,可以称之为NIO2.0,服务器实现模式为一个有效请求一个线程,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理,一般适用于连接数较多且连接时间较长的应用。

image.png

与NIO不同,当进行读写操作时,只须直接调用API的read或write方法即可, 这两种方法均为异步的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作系统主动通知应用程序。

即可以理解为,read/write方法都是异步的,完成后会主动调用回调函数。在JDK1.7中,这部分内容被称作NIO.2,主要在Java.nio.channels包下增加了下面四个异步通道:

  • AsynchronousSocketChannel
  • AsynchronousServerSocketChannel
  • AsynchronousFileChannel
  • AsynchronousDatagramChannel

适用场景:
连接数目较多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。

3.2 实现方式

1.Future

image.png

2.CompletionHandler

image.png

3.代码示例

Server:

package com.hcx.aio;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.HashMap;
import java.util.Map;

/**
 * @author hongcaixia
 * @version 1.0
 * @date 2021/7/20 16:23
 */
public class Server {

    final String LOCALHOST = "localhost";
    final int DEFAULT_PORT = 9999;
    AsynchronousServerSocketChannel serverSocketChannel;

    private void close(Closeable closeable) {
        if (closeable != null) {
            try {
                closeable.close();
                System.out.println("关闭" + closeable);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void start() {
        try {
            //绑定监听端口
            serverSocketChannel = AsynchronousServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(LOCALHOST, DEFAULT_PORT));
            System.out.println("启动服务器,监听端口:" + DEFAULT_PORT);
            while (true) {
                serverSocketChannel.accept(null, new AcceptHandler());
                System.in.read();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            close(serverSocketChannel);
        }
    }

    private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Object> {

        @Override
        public void completed(AsynchronousSocketChannel socketChannel, Object attachment) {
            if (serverSocketChannel.isOpen()) {
                serverSocketChannel.accept(null, this);
            }
            if (socketChannel != null && socketChannel.isOpen()) {
                ClientHandler handler = new ClientHandler(socketChannel);
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                Map<String, Object> map = new HashMap<>();
                map.put("type", "read");
                map.put("buffer", buffer);
                socketChannel.read(buffer, map, handler);
            }
        }

        @Override
        public void failed(Throwable exc, Object attachment) {

        }
    }

    private class ClientHandler implements CompletionHandler<Integer, Object> {

        private AsynchronousSocketChannel socketChannel;

        public ClientHandler(AsynchronousSocketChannel socketChannel) {
            this.socketChannel = socketChannel;
        }

        @Override
        public void completed(Integer result, Object attachment) {
            Map<String, Object> map = (Map<String, Object>) attachment;
            String type = (String) map.get("type");
            if ("read".equals(type)) {
                ByteBuffer buffer = (ByteBuffer) map.get("buffer");
                buffer.flip();
                map.put("type", "write");
                socketChannel.write(buffer, map, this);
                buffer.clear();
            } else if ("write".equals(type)) {
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                map.put("type", "read");
                map.put("buffer", byteBuffer);
                socketChannel.read(byteBuffer, map, this);
            }
        }

        @Override
        public void failed(Throwable exc, Object attachment) {

        }
    }

    public static void main(String[] args) {
        Server server = new Server();
        server.start();
    }
}

Client:

package com.hcx.aio;

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.Future;

/**
 * @author hongcaixia
 * @version 1.0
 * @date 2021/7/20 16:23
 */
public class Client {

    final String LOCALHOST = "localhost";
    final int DEFAULT_PORT = 9999;
    AsynchronousSocketChannel socketChannel;

    private void close(Closeable closeable) {
        if (closeable != null) {
            try {
                closeable.close();
                System.out.println("关闭" + closeable);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void start(){
        try{
            //创建channel
            socketChannel = AsynchronousSocketChannel.open();
            Future<Void> future = socketChannel.connect(new InetSocketAddress(LOCALHOST, DEFAULT_PORT));
            future.get();

            //读取用户输入
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
            while (true){
                String msg = bufferedReader.readLine();
                byte[] bytes = msg.getBytes();
                ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);

                Future<Integer> writeResult = socketChannel.write(byteBuffer);
                writeResult.get();
                byteBuffer.flip();

                Future<Integer> readResult = socketChannel.read(byteBuffer);
                readResult.get();
                String str = new String(byteBuffer.array());
                byteBuffer.clear();
                System.out.println(str);
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            close(socketChannel);
        }
    }

    public static void main(String[] args) {
        Client client = new Client();
        client.start();
    }
}

3.3 模型分析

image.png

AsynchronousServerSocket

它属于一个 AsynchronousChannelGroup,这个通道组,是被多个异步通道共享的资源群组,与线程池类似,系统会利用线程池中的线程,来处理一些handler请求。系统利用这个资源组还做了很多的事情,包括在数据准备好的时候通知我们和利用handler做一些异步的操作。在创建AsynchronousServerSocket时(open()),可以自定义一个通道组,不传参的时候,系统会有一个默认群组。

当客户端请求与服务器建立连接时,系统会异步的调用AcceptHandler来处理连接请求,成功建立连接后,会返回一个AsynchronousSocketChannel对象,每个对象还会有一个ClientHandler来处理读写请求,在请求处理的过程中,并不是在主线程中完成的,而是通道组利用线程池资源,在不同的线程中完成异步处理。

3.4 三种IO比较

BIO NIO AIO
Socket SocketChannel AsynchronousSocketChannel
ServerSocket ServerSocketChannel AsynchronousServerSocketChannel

特点:

  • Java BIO : 同步并阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,当然可以通过线程池机制改善。
  • Java NIO : 同步非阻塞,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理。
  • Java AIO(NIO.2) : 异步非阻塞,服务器实现模式为一个有效请求一个线程,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理。

适用场景:

  • BIO:适用于连接数目少,而且服务器资源对于我们已知的连接来说,比较充足,开发简单;对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序直观简单易理解。
  • NIO:相对BIO来说,开发难度较高,但是客户连接数目比较高。值得我们注意的是,由于NIO是单一的线程轮询来处理数据,需要避免每个任务执行的时间过长,防止其他线程出现过长的等待;适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4开始支持。
  • AIO:接受的连接数目多,相对于NIO来说,是异步出来,可以接受某个任务花费过长的时间,但是开发难度比较高,维护起来也不简单。适用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。

相关文章

网友评论

    本文标题:网络编程

    本文链接:https://www.haomeiwen.com/subject/aiohcdtx.html