# 前情提要:
- 在上一篇文章BIO在聊天室项目中的演化中提到,告知对方消息已经发送完毕的方式有4种
- 关闭Socket连接
- 关闭输出流,
socket.shutdownOutput();
- 使用标志符号,借助字符流,
Reader.readLine()
,该方法会在读取到\r
,\n
或者\r\n
时返回所读取到的内容。 - 通过指定本次发送的数据的字节大小。告知对方从输入流中读取指定大小的字节。
本文使用第四种方案来实现聊天室
- 思路为:
- 客户端在发送消息之前,先计算出本次发送的数据量的字节大小,比如为
N
个字节。那么在向服务器发送数据的前,先约定好流中的前1个字节(或者前X
个字节,根据自己项目的实际情况来决定)为本次发送的数据量的大小。 - 客户端发送消息,先将计算出的字节大小N写入输出流,再将实际的内容写入输出流。
- 服务端在获取到输入流之后,根据约定,先读取前
X
个字节,根据这个字节的值可以知道,本次发送的数据量的大小,那么在读取数据时,只需要读取后续的N
个字节即可。
- 客户端在发送消息之前,先计算出本次发送的数据量的字节大小,比如为
- 温馨提示: 注意看代码注释哟~
# 代码实现
- 客户端
/**
* @author futao
* @date 2020/7/6
*/
public class BioChatClient {
private static final Logger logger = LoggerFactory.getLogger(BioChatClient.class);
private static final ExecutorService SINGLE_THREAD_EXECUTOR = Executors.newSingleThreadExecutor();
/**
* 启动客户端
*/
public void start() {
try { //尝试连接到聊天服务器
Socket socket = new Socket("localhost", Constants.SERVER_PORT);
logger.debug("========== 成功连接到聊天服务器 ==========");
InputStream inputStream = socket.getInputStream();
OutputStream outputStream = socket.getOutputStream();
//从输入流中读取数据
SINGLE_THREAD_EXECUTOR.execute(() -> {
try {
while (true) {
String message = IOUtils.messageReceiver(inputStream);
logger.info("接收到服务端消息:[{}]", message);
}
} catch (IOException e) {
logger.error("发生异常", e);
}
});
while (true) {
//获取用户输入的数据
String message = new Scanner(System.in).nextLine();
if (StringUtils.isBlank(message)) {
break;
}
//将内容转换为字节数组
byte[] contentBytes = message.getBytes(Constants.CHARSET);
//内容字节数组的大小
int length = contentBytes.length;
//第一个字节写入本次传输的数据量的大小
outputStream.write(length);
//写入真正需要传输的内容
outputStream.write(contentBytes);
//刷新缓冲区
outputStream.flush();
if (Constants.KEY_WORD_QUIT.equals(message)) {
//客户端退出
SINGLE_THREAD_EXECUTOR.shutdownNow();
inputStream.close();
outputStream.close();
socket.close();
break;
}
}
} catch (IOException e) {
logger.error("发生异常", e);
}
}
public static void main(String[] args) {
new BioChatClient().start();
}
}
- 从输入流中读取指定大小的数据
/**
* 从输入流中读取指定大小的字节数据并转换成字符串
*
* @param inputStream 输入流
* @return 读取到的字符串
* @throws IOException
*/
public static String messageReceiver(InputStream inputStream) throws IOException {
//本次传输的数据量的大小
int curMessageLength = inputStream.read();
byte[] contentBytes = new byte[curMessageLength];
//读取指定长度的字节
inputStream.read(contentBytes);
return new String(contentBytes);
}
- 服务端
/**
* @author futao
* @date 2020/7/6
*/
public class BioChatServer {
private static final Logger logger = LoggerFactory.getLogger(BioChatServer.class);
/**
* 可同时接入的客户端数量
*/
private static final ExecutorService THREAD_POOL = Executors.newFixedThreadPool(10);
/**
* 当前接入的客户端
*/
private static final Set<Socket> CLIENT_SOCKET_SET = new HashSet<Socket>() {
@Override
public synchronized boolean add(Socket o) {
return super.add(o);
}
@Override
public synchronized boolean remove(Object o) {
return super.remove(o);
}
};
/**
* 启动服务端
*/
public void start() {
try {
//启动服务器,监听端口
ServerSocket serverSocket = new ServerSocket(Constants.SERVER_PORT);
logger.debug("========== 基于BIO的聊天室在[{}]端口启动成功 ==========", Constants.SERVER_PORT);
while (true) {
//监听客户端接入事件
Socket socket = serverSocket.accept();
THREAD_POOL.execute(() -> {
CLIENT_SOCKET_SET.add(socket);
int port = socket.getPort();
logger.debug("客户端[{}]成功接入聊天服务器", port);
try {
InputStream inputStream = socket.getInputStream();
OutputStream outputStream = socket.getOutputStream();
while (true) {
//获取到客户端发送的消息
String message = IOUtils.messageReceiver(inputStream);
logger.info("接收到客户端[{}]发送的消息:[{}]", port, message);
//客户端是否退出
boolean isQuit = IOUtils.isQuit(message, socket, CLIENT_SOCKET_SET);
if (isQuit) {
socket.close();
break;
} else {
//消息转发
IOUtils.forwardMessage(port, message, CLIENT_SOCKET_SET);
}
}
} catch (IOException e) {
logger.error("发生异常", e);
}
});
}
} catch (IOException e) {
logger.error("发生异常", e);
}
}
public static void main(String[] args) {
new BioChatServer().start();
}
}
- 客户端下线与消息转发
/**
* 判断客户端是否下线,并且将需要下线的客户端下线
*
* @param message 消息
* @param socket 客户端Socket
* @param clientSocketSet 当前接入的客户端Socket集合
* @return 是否退出
* @throws IOException
*/
public static boolean isQuit(String message, Socket socket, Set<Socket> clientSocketSet) throws IOException {
boolean isQuit = StringUtils.isBlank(message) || Constants.KEY_WORD_QUIT.equals(message);
if (isQuit) {
clientSocketSet.remove(socket);
int port = socket.getPort();
socket.close();
logger.debug("客户端[{}]下线", port);
}
return isQuit;
}
/**
* 转发消息
*
* @param curSocketPort 当前发送消息的客户端Socket的端口
* @param message 需要转发的消息
* @param clientSocketSet 当前接入的客户端Socket集合
*/
public static void forwardMessage(int curSocketPort, String message, Set<Socket> clientSocketSet) {
if (StringUtils.isBlank(message)) {
return;
}
for (Socket socket : clientSocketSet) {
if (socket.isClosed() || socket.getPort() == curSocketPort) {
continue;
}
if (socket.getPort() != curSocketPort) {
try {
OutputStream outputStream = socket.getOutputStream();
byte[] messageBytes = message.getBytes(Constants.CHARSET);
outputStream.write(messageBytes.length);
//将字符串编码之后写入客户端
outputStream.write(messageBytes);
//刷新缓冲区
outputStream.flush();
} catch (IOException e) {
logger.error("消息转发失败", e);
}
}
}
}
# 测试一下~
- 服务端启动,客户端接入
- 客户端接入
- 客户端发送消息
- 服务端打印并转发消息
- 聊天室内的其他小伙伴收到服务器转发的消息
- 小马哥客户端下线
- 服务器收到小马哥的下线通知
# 总结
- 非常优雅~😊
# 注意
- 本文约定的是第一个字节为消息大小的标记,一个字节可以表示的最大值为255,所以一次最多传输255个字节,如果超过这个值,会造成业务错误,需要注意。
- 所以使用几个字节来作为标识需要从业务的角度来考虑
- 一个字节8位,可表示的最大值为 255 = 255B
- 二个字节16位,可表示的最大值为 65535 = 64KB
- 三个字节24位,可表示的最大值为 16777215 = 16MB
- 四个字节32位,可表示的最大值为 4294967295 = 4GB
- 以此类推....
# 系列文章
image.png欢迎在评论区留下你看文章时的思考,及时说出,有助于加深记忆和理解,还能和像你一样也喜欢这个话题的读者相遇~
网友评论