每一种技术的出现,都是为了解决某一个或者某一类问题。让我们先来了解问题的产生。
问题:
使用socket通信实现如下:
1.client连接server
2.client发送"Hi Server,I am client."
3.server收到消息在控制台的打印,并回复"Hi client,I am Server."
4.client收到消息在控制台打印。
5.client断开连接。
1.Simple Solution(方式一)
直接贴代码了
/**
* @description: SimpleSolution server
* @author: sanjin
* @date: 2019/7/8 11:33
*/
public class Server {
public static void main(String[] args) {
// 服务端占用端口
int port = 8000;
// 创建 serversocker
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);
} catch (IOException e) {
e.printStackTrace();
}
if (serverSocket != null) {
while (true) {
InputStream is = null;
OutputStream os = null;
Socket client = null;
try {
// accept()方法会阻塞,直到有client连接后才会执行后面的代码
client = serverSocket.accept();
is = client.getInputStream();
os = client.getOutputStream();
// 3.server收到消息在控制台的打印,并回复"Hi client,I am Server."
byte[] buffer = new byte[5];
int len = 0;
// 使用ByteArrayOutputStream,避免缓冲区过小导致中文乱码
ByteArrayOutputStream baos = new ByteArrayOutputStream();
while ((len = is.read(buffer)) != -1) {
baos.write(buffer,0,len);
}
System.out.println(baos.toString());
// 服务端回复客户端消息
os.write("Hi client,I am Server.".getBytes());
os.flush(); // 刷新缓存,避免消息没有发送出去
client.shutdownOutput();
} catch (IOException e) {
e.printStackTrace();
} finally {
// 程序异常或者执行完成,关闭流,防止占用资源
if (client != null) {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (is != null) {
try {
is.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (os != null) {
try {
os.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
}
}
/**
* @description: SimpleSolution client
* @author: sanjin
* @date: 2019/7/8 11:33
*/
public class Client {
public static void main(String[] args) {
int port = 8000;
Socket client = null;
InputStream is = null;
OutputStream os = null;
try {
// 1.client连接server
client = new Socket("localhost", port);
is = client.getInputStream();
os = client.getOutputStream();
// 2.client发送"Hi Server,I am client."
os.write("Hi Server,I am client.".getBytes());
os.flush();
// 调用shutdownOutput()方法表示客户端传输完了数据,否则服务端的
// read()方法会一直阻塞
// (你可能会问我这不是写了 read()!=-1, -1表示的文本文件的结尾字符串,而对于字节流数据,
// 是没有 -1 标识的,这就会使服务端无法判断客户端是否发送完成,导致read()方法一直阻塞)
client.shutdownOutput();
// 4.client收到消息在控制台打印。
int len = 0;
byte[] buffer = new byte[5];
ByteArrayOutputStream baos = new ByteArrayOutputStream();
while ((len = is.read(buffer)) != -1) {
baos.write(buffer,0,len);
}
System.out.println(baos.toString());
} catch (IOException e) {
e.printStackTrace();
} finally {
// 程序异常或者执行完成,关闭流,防止占用资源
try {
if (is != null) {
is.close();
}
} catch (IOException e) {
e.printStackTrace();
}
try {
if (os != null) {
os.close();
}
} catch (IOException e) {
e.printStackTrace();
}
try {
if (client != null) {
client.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
程序描述图:
1.png
顺便说一下,ProcessOn真的很好用😄
关于Socket编程,有几个注意点:
- 注意使用流时一定要用try-catch-finally,虽然代码确实有点繁琐。
2.客户端如果发送的使中文,在服务端接收数据时候,要注意接收方式:
// 接收数据方式一
byte[] buffer = new byte[5];
int len = 0;
// 使用ByteArrayOutputStream,避免缓冲区过小导致中文乱码
ByteArrayOutputStream baos = new ByteArrayOutputStream();
while ((len = is.read(buffer)) != -1) {
baos.write(buffer, 0, len);
}
System.out.println(baos.toString());
// 接收数据方式一
byte[] buffer = new byte[5];
int len = 0;
// 使用ByteArrayOutputStream,避免缓冲区过小导致中文乱码
ByteArrayOutputStream baos = new ByteArrayOutputStream();
while ((len = is.read(buffer)) != -1) {
// 这种方式会导致中文乱码
System.out.println(new String(buffer, 0, len));
}
如果客户端传输中文使用方式二会导致中文乱码,这是因为我们在读取时候缓冲区大小设置的是5个字节,此处假设客户端传输“小白兔“三个字,常用的汉字一般占3个字节。”小白兔“发送过来后我们的缓冲区只有5个字节,没办法一次读取完,所以要分二次读取,第一次读取5个字节,然后立即进行了打印,汉字”小“会被正常打印,但是汉字”白“只读取了2个字节,打印就会产生乱码。而使用ByteArrayOutputStream
把缓冲区读取的字节全都存放一起,然后一起打印,就不会导致乱码了。
3.shutdownOutput()
方法。当客户端传输”Hi Server,I am client.“,服务端接收数据并打印出来,然后向客户端发送”"Hi client,I am Server."。如果不使用shutdownOutput()
方法会使服务端卡在read()方法。这是因为当客户端数据发送完成后,服务端的判断条件
while ((len = is.read(buffer)) != -1)
不成立,因为只有文本文件的末尾是 -1,而字节流没有末尾标识,这就导致服务端不知道客户端有没有发送完成,使得read()方法阻塞。所以客户端发送完数据后需要发送一个标识来表示”我已经发送完数据了“。而shutdownOutput()
方法就是这个标识。
我们使用socket完成了一个收发的程序。但是它还存在着问题。
1. 不能同时有多个client连接我们的server
服务端与客户端连接使用依靠accept()函数,而我们的服务端程序是单线程,只能等当前的socket执行完成后,才能接收下一个socket的连接。
假设我们同时又2个client连接server会发生什么?(因为我们程序简单,执行的很快,所以我在server种加了Thread.sleep(50*1000))
现象:第二个client会抛出异常:
1.png
下面我们就用多线程解决这个问题。
2.Multithreading Solution(方式二)
我又新加了一个HandlerClient类,实现Runnable接口,用于处理client连接,Client类的代码没有做修改。
/**
* @description: MultithreadingSolution client
* @author: sanjin
* @date: 2019/7/8 11:33
*/
public class Client {
public static void main(String[] args) {
int port = 8000;
Socket client = null;
InputStream is = null;
OutputStream os = null;
try {
// 1.client连接server
client = new Socket("localhost", port);
is = client.getInputStream();
os = client.getOutputStream();
// 2.client发送"Hi Server,I am client."
os.write("Hi Server,I am client.".getBytes());
os.flush();
// 调用shutdownOutput()方法表示客户端传输完了数据,否则服务端的
// read()方法会一直阻塞
// (你可能会问我这不是写了 read()!=-1, -1表示的文本文件的结尾字符串,而对于字节流数据,
// 是没有 -1 标识的,这就会使服务端无法判断客户端是否发送完成,导致read()方法一直阻塞)
client.shutdownOutput();
// 4.client收到消息在控制台打印。
int len = 0;
byte[] buffer = new byte[5];
ByteArrayOutputStream baos = new ByteArrayOutputStream();
while ((len = is.read(buffer)) != -1) {
baos.write(buffer,0,len);
}
System.out.println(baos.toString());
} catch (IOException e) {
e.printStackTrace();
} finally {
// 程序异常或者执行完成,关闭流,防止占用资源
try {
if (is != null) {
is.close();
}
} catch (IOException e) {
e.printStackTrace();
}
try {
if (os != null) {
os.close();
}
} catch (IOException e) {
e.printStackTrace();
}
try {
if (client != null) {
client.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* @description: MultithreadingSolution server
* @author: sanjin
* @date: 2019/7/8 11:33
*/
public class Server {
public static void main(String[] args) {
// 服务端占用端口
int port = 8000;
// 创建 serversocker
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);
} catch (IOException e) {
e.printStackTrace();
}
if (serverSocket != null) {
while (true) {
try {
Socket client = serverSocket.accept();
System.out.println("收到client连接,client地址:"+client.getInetAddress());
new Thread(new HandlerClient(client)).start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
/**
* @description: 用于处理client连接
* @author: sanjin
* @date: 2019/7/8 16:28
*/
public class HandlerClient implements Runnable {
private Socket client;
public HandlerClient(Socket client) {
this.client = client;
}
@Override
public void run() {
InputStream is = null;
OutputStream os = null;
try {
is = client.getInputStream();
os = client.getOutputStream();
// 3.server收到消息在控制台的打印,并回复"Hi client,I am Server."
byte[] buffer = new byte[5];
int len = 0;
// 使用ByteArrayOutputStream,避免缓冲区过小导致中文乱码
ByteArrayOutputStream baos = new ByteArrayOutputStream();
while ((len = is.read(buffer)) != -1) {
// 这种方式会导致中文乱码
// System.out.println(new String(buffer, 0, len));
baos.write(buffer, 0, len);
}
System.out.println(baos.toString());
try {
// 增加任务执行时间,用于进行多个client连接测试
Thread.sleep(20*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 服务端回复客户端消息
os.write("Hi client,I am Server.".getBytes());
os.flush(); // 刷新缓存,避免消息没有发送出去
client.shutdownOutput();
} catch (IOException e) {
e.printStackTrace();
} finally {
// 程序异常或者执行完成,关闭流,防止占用资源
if (client != null) {
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (is != null) {
try {
is.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (os != null) {
try {
os.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
运行结果:
1.png
问题:计算机的CPU资源有限,来一个client就会创建一个线程,线程完成任务后再进行销毁,线程的创建、销毁以及线程上下文的切换会消耗很多CPU的资源。并且JVM中线程数过多还有可能抛出内存不足的异常。
所以我们下一步使用线程池来解决这个问题。
程序描述图:
1.png
3.Thread Pool Solution(方式三)
线程池解决方法思路:
1.png
我们再方式二已经完成了多线程方式代码,将它修改成线程池方式非常简单,我们只需要修改Server类就可以了:
/**
* @description: ThreadPoolSolution server
* @author: sanjin
* @date: 2019/7/8 11:33
*/
public class Server {
// 创建线程池
private static ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(
5, // 核心线程数
10, // 最大线程数
200, // keep alive 时间
TimeUnit.HOURS, // keep alive 时间单位
new ArrayBlockingQueue<Runnable>(5) // 工作队列
);
public static void main(String[] args) {
// 服务端占用端口
int port = 8000;
// 创建 serversocker
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(port);
} catch (IOException e) {
e.printStackTrace();
}
if (serverSocket != null) {
while (true) {
try {
Socket client = serverSocket.accept();
System.out.println("收到client连接,client地址:"+client.getInetAddress());
// 多线程方式
// new Thread(new HandlerClient(client)).start();
// 线程池方式
threadPoolExecutor.execute(new HandlerClient(client));
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
不知道大家晕了没,我已经快不行了,但还是要明白我们使用多线程的目的:
解决多个client同时连接的问题。
好了,下面主角登场。
4.NIO(方式三)
关于JavaNIO有一个非常好的英文资料:http://tutorials.jenkov.com/java-nio/index.html
/**
* @description:
* @author: sanjin
* @date: 2019/7/8 19:56
*/
public class NIOClient {
public static void main(String[] args) {
SocketAddress socketAddress = new InetSocketAddress(8000);
SocketChannel socketChannel = null;
try {
socketChannel = SocketChannel.open(socketAddress);
socketChannel.configureBlocking(false);
if (socketChannel.finishConnect()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 客户端发送数据 "Hi Server,I am client."
buffer.clear();
buffer.put("Hi Server,I am client.".getBytes());
buffer.flip();
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
// 客户端接收服务端数据打印在控制台
buffer.clear();
int len = socketChannel.read(buffer);
while (len > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
System.out.print((char) buffer.get());
}
System.out.println();
buffer.clear();
len = socketChannel.read(buffer);
}
if (len == -1) {
socketChannel.close();
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (socketChannel != null) {
socketChannel.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* @description:
* @author: sanjin
* @date: 2019/7/8 19:56
*/
public class NIOServer {
public static void main(String[] args) {
ServerSocketChannel serverSocketChannel = null;
Selector selector = null;
try {
// 初始化一个 serverSocketChannel
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8000));
// 设置serverSocketChannel为非阻塞模式
// 即 select()会立即得到返回
serverSocketChannel.configureBlocking(false);
// 初始化一个 selector
selector = Selector.open();
// 将 serverSocketChannel 与 selector绑定
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 通过操作系统监听变化的socket个数
// 在windows平台通过selector监听(轮询所有的socket进行判断,效率低)
// 在Linux2.6之后通过epool监听(事件驱动方式,效率高)
int count = selector.select(3000);
if (count > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isAcceptable()) {
handleAccept(key);
}
if (key.isReadable()) {
handleRead(key);
}
if (key.isWritable() && key.isValid()) {
handleWrite(key);
}
if (key.isConnectable()) {
System.out.println("isConnectable = true");
}
iterator.remove();
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (serverSocketChannel != null) {
serverSocketChannel.close();
}
} catch (IOException e) {
e.printStackTrace();
}
try {
if (selector != null) {
selector.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private static void handleWrite(SelectionKey key) {
// 获取 client 的 socket
SocketChannel clientChannel = (SocketChannel) key.channel();
// 获取缓冲区
ByteBuffer buffer = (ByteBuffer) key.attachment();
buffer.clear();
buffer.put("Hi client,I am Server.".getBytes());
buffer.flip();
try {
while (buffer.hasRemaining()) {
clientChannel.write(buffer);
}
buffer.compact();
} catch (IOException e) {
e.printStackTrace();
}
}
private static void handleRead(SelectionKey key) {
// 获取 readable 的客户端 socketChannel
SocketChannel clientChannel = (SocketChannel) key.channel();
// 读取客户端发送的消息信息,我们已经在 acceptable 中设置了缓冲区
// 所以直接冲缓冲区读取信息
ByteBuffer buffer = (ByteBuffer) key.attachment();
// 获取 client 发送的消息
try {
int len = clientChannel.read(buffer);
while (len > 0) {
// 设置 limit 位置
buffer.flip();
// 开始读取数据
while (buffer.hasRemaining()) {
byte b = buffer.get();
System.out.print((char) b);
}
System.out.println();
// 清除 position 位置
buffer.clear();
// 从新读取 len
len = clientChannel.read(buffer);
}
if (len == -1) {
clientChannel.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
private static void handleAccept(SelectionKey key) {
// 获得 serverSocketChannel
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
try {
// 获得 socketChannel,就是client的socket
SocketChannel clientChannel = serverSocketChannel.accept();
if (clientChannel == null) return;
// 设置 socketChannel 为无阻塞模式
clientChannel.configureBlocking(false);
// 将其注册到 selector 中,设置监听其是否可读,并分配缓冲区
clientChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocateDirect(512));
} catch (IOException e) {
e.printStackTrace();
}
}
}
网友评论