# 需求
- 基于NIO实现
- 支持同时多个客户端接入
- 支持客户端发送文本消息到服务器
- 支持客户端自定义群聊名称
- 接收到客户端发送的消息之后,服务器需要将消息转发给目前在线的所有其他客户端
- 支持客户端退出群聊
- 服务端停止服务后,客户端自动断开连接
# 技术介绍
-
Non-blockingI/O
编程模型 -
Channel
通道-
ServerSocketChannel
服务端通道 -
SocketChannel
客户端通道
-
-
ByteBuffer
NIO中使用的读写缓冲区 -
Selector
多路复用器- 将
channel
注册在多路复用器上,并监听相应的事件
- 将
- 多线程
- 线程池
# 代码
温馨提示:注意看代码注释哟~ 跟上节奏,很简单😼
- 服务器
/**
* 基于NIO实现的聊天室服务端
*
* @author futao
* @date 2020/7/8
*/
@Slf4j
public class NioChatServer {
/**
* 用于处理通道上的事件的线程池(可选的)
*/
private static final ExecutorService THREAD_POOL = Executors.newFixedThreadPool(10);
/**
* 启动聊天室
*/
public void start() {
try {
//服务端Socket通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//将通道设置成非阻塞
serverSocketChannel.configureBlocking(false);
//绑定主机与监听端口
serverSocketChannel.bind(new InetSocketAddress("localhost", Constants.SERVER_PORT));
//多路复用器
Selector selector = Selector.open();
//将服务端通道注册到多路复用器上,并设置监听事件接入事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
log.debug("{} 基于NIO的聊天室在[{}]端口启动成功 {}", StringUtils.repeat("=", 30), Constants.SERVER_PORT, StringUtils.repeat("=", 30));
while (true) {
// 触发了事件的通道数量,该方法会阻塞
int eventCountTriggered = selector.select();
if (eventCountTriggered <= 0) {
continue;
}
// 获取到所有触发的事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 遍历事件进行处理
for (SelectionKey selectionKey : selectionKeys) {
// 处理事件
selectionKeyHandler(selectionKey, selector);
}
// 清除事件记录
selectionKeys.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 事件处理器
*
* @param selectionKey 触发的事件信息
* @param selector 多路复用器
*/
private void selectionKeyHandler(SelectionKey selectionKey, Selector selector) {
if (selectionKey.isAcceptable()) {
//如果触发的是SocketChannel接入事件
try {
// ServerSocketChannel上触发的客户端SocketChannel接入
SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept();
log.debug("客户端[{}]成功接入聊天服务器", socketChannel.socket().getPort());
// 将客户端SocketChannel通道设置成非阻塞
socketChannel.configureBlocking(false);
// 将客户端通道注册到多路复用器,并监听这个通道上发生的可读事件
socketChannel.register(selector, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
} else if (selectionKey.isReadable()) {
// 触发的是可读事件
// 获取到可读事件的客户端通道
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
//创建缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);
try {
// 读取通道上的数据写入缓冲区(返回0或者-1说明读到了末尾)
while (socketChannel.read(byteBuffer) > 0) {
}
//切换为读模式
byteBuffer.flip();
// 接收到的消息
String message = String.valueOf(Constants.CHARSET.decode(byteBuffer));
log.info("接收到来自客户端[{}]的数据:[{}]", socketChannel.socket().getPort(), message);
// 是否退出
quit(message, selector, selectionKey);
// 消息转发
forwardMessage(message, selector, selectionKey);
// 清除缓冲区的数据
byteBuffer.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 客户端退出
*
* @param message 消息
* @param selector 多路复用器
* @param selectionKey 触发的selectionKey
*/
public void quit(String message, Selector selector, SelectionKey selectionKey) {
if (StringUtils.isBlank(message) || Constants.KEY_WORD_QUIT.equals(message)) {
int port = ((SocketChannel) selectionKey.channel()).socket().getPort();
// 客户端下线
selectionKey.cancel();
log.debug("客户端[{}]下线", port);
// 因为发生了监听事件和channel的变更,所以需要通知selector重新整理selector所监听的事件
selector.wakeup();
}
}
/**
* 转发消息
*
* @param message 需要转发的消息
* @param selector 多路复用器
* @param curSelectionKey 当前触发的selectionKey
*/
public void forwardMessage(String message, Selector selector, SelectionKey curSelectionKey) {
// 创建缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);
// 数据写入缓冲区
byteBuffer.put(message.getBytes(Constants.CHARSET));
// 切换为读模式
byteBuffer.flip();
// 在首尾进行标记,因为需要给每个客户端发送同样的数据,需要重复读取
byteBuffer.mark();
// 当前注册在多路复用器上的SelectionKey集合
Set<SelectionKey> keys = selector.keys();
for (SelectionKey key : keys) {
// 消息不能转发给自己 and 只转发给客户端SocketChannel
if (curSelectionKey.equals(key) || !(key.channel() instanceof SocketChannel)) {
continue;
}
// 客户端SocketChannel
SocketChannel socketChannel = (SocketChannel) key.channel();
// 如果缓冲区中还有数据就一直写
while (byteBuffer.hasRemaining()) {
try {
// 数据写入通道
socketChannel.write(byteBuffer);
} catch (IOException e) {
e.printStackTrace();
}
}
// 重置到上次mark的地方,即首位
byteBuffer.reset();
}
// 清除缓冲区的数据
byteBuffer.clear();
}
public static void main(String[] args) {
new NioChatServer().start();
}
}
- 客户端
/**
* 基于NIO实现的群聊客户端
*
* @author futao
* @date 2020/7/8
*/
@Getter
@Setter
@Slf4j
public class NioChatClient {
/**
* 用于处理用户输入数据的单个线程线程池,使用线程池是为了便于关闭
*/
private static final ExecutorService USER_INPUT_HANDLER = Executors.newSingleThreadExecutor();
/**
* 用户名
*/
private String userName;
/**
* 启动客户端
*/
public void start() {
try {
// 创建客户端通道
SocketChannel socketChannel = SocketChannel.open();
// 将通道设置为非阻塞
socketChannel.configureBlocking(false);
// 创建多路复用器
Selector selector = Selector.open();
// 将客户端通道注册到多路复用器,并监听可读事件
socketChannel.register(selector, SelectionKey.OP_CONNECT);
// 尝试连接到聊天服务器
socketChannel.connect(new InetSocketAddress("localhost", Constants.SERVER_PORT));
while (true) {
// 阻塞等待通道上的事件触发。返回触发的通道的数量
int eventCountTriggered = selector.select();
if (eventCountTriggered <= 0) {
continue;
}
// 获取到所有触发的事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 遍历事件进行处理
for (SelectionKey selectionKey : selectionKeys) {
// 处理事件
selectionKeyHandler(selectionKey, selector);
}
// 清除事件记录
selectionKeys.clear();
}
} catch (IOException e) {
e.printStackTrace();
} catch (ClosedSelectorException e) {
log.debug("成功退出聊天室...");
}
}
/**
* 处理器
*
* @param selectionKey 触发的selectionKey
* @param selector 多路复用器
*/
private void selectionKeyHandler(SelectionKey selectionKey, Selector selector) {
if (selectionKey.isConnectable()) {
// 触发的是成功接入服务器的事件
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
try {
// 判断此通道上的连接操作是否正在进行中
if (socketChannel.isConnectionPending()) {
// 完成连接套接字通道的过程
socketChannel.finishConnect();
log.debug("成功接入聊天服务器");
// 将通道设置成非阻塞
socketChannel.configureBlocking(false);
// 将通道注册到多路复用器,并监听可读事件
socketChannel.register(selector, SelectionKey.OP_READ);
// 创建缓冲区,用于处理将用户输入的数据写入通道
ByteBuffer byteBuffer = ByteBuffer.allocate(4 * 1024);
// 在新线程中处理用户输入
USER_INPUT_HANDLER.execute(() -> {
while (!Thread.currentThread().isInterrupted()) {
//先清空缓冲区中的数据
byteBuffer.clear();
// 获取用户输入的文本
String message = new Scanner(System.in).nextLine();
// 将数据写入缓冲区
byteBuffer.put(String.format("【%s】: %s", userName, message).getBytes(Constants.CHARSET));
// 将缓冲区设置为读模式
byteBuffer.flip();
try {
// 当缓冲区中还有数据
while (byteBuffer.hasRemaining()) {
// 将数据写入通道
socketChannel.write(byteBuffer);
}
} catch (IOException e) {
e.printStackTrace();
}
// 判断是否退出群聊
if (quit(message, selector, selectionKey)) {
// 跳出循环,结束线程
break;
}
}
try {
// 关闭多路复用器
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
// 关闭线程池
USER_INPUT_HANDLER.shutdown();
});
}
} catch (IOException e) {
e.printStackTrace();
}
} else if (selectionKey.isReadable()) {
// 触发的是可读事件
// 获取到可读事件的通道
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
//创建缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(1024 * 4);
try {
// 将通道上的数据写入缓冲区(返回0或者-1说明读到了末尾)
while (socketChannel.read(byteBuffer) > 0) {
}
// 切换成读模式
byteBuffer.flip();
String message = String.valueOf(Constants.CHARSET.decode(byteBuffer));
byteBuffer.clear();
log.info("接收到数据:[{}]", message);
if (StringUtils.isBlank(message)) {
log.debug("服务器拉胯,下车...");
selector.close();
USER_INPUT_HANDLER.shutdownNow();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 退出群聊
*
* @param message 消息
* @param selector 多路复用器
* @param selectionKey 触发的selectionKey
* @return 是否退出
*/
public boolean quit(String message, Selector selector, SelectionKey selectionKey) {
if (Constants.KEY_WORD_QUIT.equals(message)) {
selectionKey.cancel();
selector.wakeup();
return true;
}
return false;
}
public static void main(String[] args) {
NioChatClient nioChatClient = new NioChatClient();
nioChatClient.setUserName("小9");
nioChatClient.start();
}
}
# 测试
- 接入
- 客户端发送消息
- 消息转发
image.png
image.png
image.png
- 客户端下线
image.png
- 服务器宕机
# 源代码
* https://github.com/FutaoSmile/learn-IO/tree/master/practice/src/main/java/com/futao/practice/chatroom/nio
# 系列文章
欢迎在评论区留下你看文章时的思考,及时说出,有助于加深记忆和理解,还能和像你一样也喜欢这个话题的读者相遇~
网友评论