服务端代码
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class SocketServer {
private ServerSocket server;
private Socket socket;
private final AtomicInteger threadNum = new AtomicInteger();
public SocketServer(int port) throws IOException {
server = new ServerSocket(port, 1);
}
public void start() {
// 服务的线程池执行避免每次连接创建新线程
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
while (true) {
// 当前连接读写线程未释放前不处理新的连接
while (threadNum.get() > 0) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("waiting connection...");
try {
socket = server.accept();
System.out.println("connect success from " + socket.getRemoteSocketAddress());
threadNum.addAndGet(2);
threadPoolExecutor.execute(new ReadThread());
threadPoolExecutor.execute(new WriteThread());
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void close() {
if (!socket.isClosed()) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 读取客户端消息线程
*/
class ReadThread implements Runnable {
@Override
public void run() {
try {
InputStream inputStream = socket.getInputStream();
byte[] bytes = new byte[1024];
int length;
while ((length = inputStream.read(bytes)) != -1) {
String message = new String(bytes, 0, length, StandardCharsets.UTF_8);
System.out.println(socket.getRemoteSocketAddress() + " client message: " + message);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
close();
System.out.println("ReadThread close");
threadNum.decrementAndGet();
}
}
}
/**
* 发送消息线程
*/
class WriteThread implements Runnable {
@Override
public void run() {
try {
OutputStream outputStream = socket.getOutputStream();
Scanner scanner = new Scanner(System.in);
String message = null;
while (!"q".equals(message) && scanner.hasNextLine()) {
message = scanner.nextLine();
outputStream.write(message.getBytes(StandardCharsets.UTF_8));
outputStream.flush();
System.out.println("push: " + message);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
close();
System.out.println("WriteThread close");
threadNum.decrementAndGet();
}
}
}
public static void main(String[] args) throws IOException {
new SocketServer(23333).start();
}
}
客户端代码
import java.io.*;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
public class SocketClient {
private Socket socket;
public SocketClient(String host, int port) throws IOException {
socket = new Socket(host, port);
System.out.println("connect success to " + socket.getRemoteSocketAddress());
}
public void start() {
new ReadThread().start();
new WriteThread().start();
}
public void close() {
if (!socket.isClosed()) {
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 读取服务端消息线程
*/
class ReadThread extends Thread {
@Override
public void run() {
try {
InputStream inputStream = socket.getInputStream();
byte[] bytes = new byte[1024];
int length;
while ((length = inputStream.read(bytes)) != -1) {
String message = new String(bytes, 0, length, StandardCharsets.UTF_8);
System.out.println("server message: " + message);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
close();
System.out.println("ReadThread close");
}
}
}
/**
* 发送消息线程
*/
class WriteThread extends Thread {
@Override
public void run() {
try {
OutputStream outputStream = socket.getOutputStream();
Scanner scanner = new Scanner(System.in);
String message = null;
while (!"q".equals(message) && scanner.hasNextLine()) {
message = scanner.nextLine();
outputStream.write(message.getBytes(StandardCharsets.UTF_8));
outputStream.flush();
System.out.println("push: " + message);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
close();
System.out.println("WriteThread close");
}
}
}
public static void main(String[] args) throws IOException {
new SocketClient("127.0.0.1", 23333).start();
}
}
运行效果
1、先启动服务端
![](https://img.haomeiwen.com/i13688072/47b5835011cf5ada.png)
2、再启动客户端
![](https://img.haomeiwen.com/i13688072/94fda921e802b88e.png)
3、互相发送消息
![](https://img.haomeiwen.com/i13688072/3c8dcac144f1535f.png)
![](https://img.haomeiwen.com/i13688072/7562b55d31a309da.png)
网友评论