Java NIO Socket学习笔记
前言
结束了3个月的工控划水生涯终于回归老本行了,最近在给客户编写某系统的上位机。虽然过程比较艰辛,但也深刻意识到自己的不足。因为下位机大多采用TCP进行通信所以服务端不可避免的要提供TCP服务,其实之前有使用过Java NIO 不过现在回头来看也忘的差不多了。这篇文章的目的一方面记录自己在开发过程中遇到的一些问题另一方面也是为了避免以后再进行类似功能开发的时候有迹可循,节省时间。
正文
相关概念
相对于BIO而言NIO有几个新的概念需要了解,相对来说NIO在编码上可能相对复杂一些,但这种复杂是绝对值得的。
Buffer
读写缓冲区,NIO的数据读写都是依靠Buffer来实现的,可以类比为BIO读写数据所创建的byte[]数组。
核心属性
capacity:
Buffer所能存放的最大数据长度
limit:
当Buffer处于写模式时limit=capacity,它表示你可以向Buffer中写入多少数据。当Buffer处于读模式时limit表示你最多能从Buffer中读取到多少数据。
position
见名知意,position表示当前位置,写模式下就是从position开始读取数据的,读取成功后position移动到下一个可读取位置,写模式下position也会根据你写入的数据长度自动跳转到最近可写位置。
常用方法
flip()
flip()
方法将当前Buffer切换到读模式将limit设置为position的位置并将position置零
clear()
clear()
方法将当前Buffer切换到写模式将position置零并将limit设置为capacity
Channel
Channel可以理解为一个连接,Channel创建后要想Selector(见下文)中注册自己并告诉Selector自己对那种操作感兴趣。
Selector
Selector阻塞监测在自己身上注册过的Channel,调用select()
方法返回有动作的Channel的个数,调用selectedKeys()
方法返回一个集合,该集合表示有动作的事件。接下来遍历该集合即可根据事件类型处理IO事件
代码实现
接下来是代码实现以及一些相关问题
首先是Selector和Channel的初始化,他们分别调用自身的open()
静态方法返回自身的实例。
selector = Selector.open();
ServerSocketChannel socketChannel = ServerSocketChannel.open();
//将阻塞状态设置为非阻塞,必须否则注册事件时会报参数异常。
socketChannel.configureBlocking(false);
//绑定服务端口
socketChannel.bind(new InetSocketAddress(53055));
//将自身注册到Selector中以让Selector帮助监测IO事件,这里的 SelectionKey.OP_ACCEPT
//表示自己对ACCEPT事件感兴趣,相对的还有读、写、连接操作,如果想要监听该通道的读事件(即
//客户端有数据发送上来)则必须为其注册读事件
socketChannel.register(selector, SelectionKey.OP_ACCEPT);
然后就可以循环调用Selector的select()
方法监听各通道的事件了。
while (true) {
if (selector.select() > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey next = iterator.next();
iterator.remove();
//刚刚注册的OP_ACCPET事件在这里得到体现
if (next.isAcceptable()) {
System.out.println("one device connected");
ServerSocketChannel server = (ServerSocketChannel)next.channel();
SocketChannel accept = server.accept();
//接收到一个链接后将其设置为非阻塞
accept.configureBlocking(false);
//并为其注册读事件
accept.register(selector,SelectionKey.OP_READ);
}
if (next.isReadable()) {
SocketChannel channel = (SocketChannel) next.channel();
//将Buffer设置为写模式,这个操作必须进行否则可能在下一步读取数据时发生错误
buffer.clear();
int len = 0;
//注意这里的读并不是说Buffer读而是socket在读,相对于Buffer来说其实是在写,
//即从socket中读出数据并写入Buffer中
while ((len=channel.read(buffer))>0){
//将Buffer设置为读模式准备读出数据
buffer.flip();
System.out.println(new String(buffer.array(),0,len));
}
}
}
}
}
这里读事件有两个必须处理的问题,一个是客户端断开连接,另一个是客户端异常断连(进程非正常终止,断电等)。
客户端断连
客户端断连也会触发读事件,并且读取函数的返回值为-1,所以有如下处理
if(len<0){
channel.close();
}
客户端异常断连
客户端异常断连将会在while ((len=channel.read(buffer))>0){
处触发异常,正确对其捕获并处理即可
上文只处理了读,接下来来看下服务端向客户端写入数据
每个管道可为自身注册写事件,那么只要缓冲去为空那么isWritable())
总会返回true
.并且即使不注册写事件我们依然可以向通道中写入数据,接下来实现该功能。
我们将接受连接部分代码稍作改动,将已经建立的连接添加到一个集合中,后续写入数据时以广播方式向全部已经建立连接的通道写入数据。当然也可已使用map或以数组索引为查询依据维护一个可单独控制的连接池。改动后的接受连接部分为
if (next.isAcceptable()) {
System.out.println("one device connected");
ServerSocketChannel server = (ServerSocketChannel) next.channel();
SocketChannel accept = server.accept();
accept.configureBlocking(false);
accept.register(selector, SelectionKey.OP_READ);
//channels为装有SocketChannel的list
//public List<SocketChannel> channels = new ArrayList<>();
channels.add(accept);
}
然后是向客户端写入数据的代码
public void write(String msg){
channels.forEach(channel->{
buffer.clear();
buffer.put(msg.getBytes());
try {
buffer.flip();
channel.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
});
}
为了测试方便使用JavaFX做了一个简单的页面,运行结果如下:
socket.png附录
所有源码
//App.java
import javafx.application.Application;
import javafx.concurrent.Task;
import javafx.fxml.FXMLLoader;
import javafx.scene.Parent;
import javafx.scene.Scene;
import javafx.stage.Stage;
import javafx.stage.WindowEvent;
import java.io.IOException;
public class App extends Application {
public static void main(String[] args) {
launch(args);
}
@Override
public void start(Stage primaryStage) {
try {
Parent load = FXMLLoader.load(getClass().getResource("App.fxml"));
Scene scene = new Scene(load,primaryStage.getWidth(),primaryStage.getHeight());
primaryStage.setScene(scene);
} catch (IOException e) {
e.printStackTrace();
}
primaryStage.show();
primaryStage.setOnCloseRequest(event -> {
System.exit(0);
});
}
}
<!--App.xml-->
<?xml version="1.0" encoding="UTF-8"?>
<?import javafx.geometry.*?>
<?import java.lang.*?>
<?import java.util.*?>
<?import javafx.scene.*?>
<?import javafx.scene.control.*?>
<?import javafx.scene.layout.*?>
<AnchorPane prefHeight="400.0" prefWidth="600.0" xmlns="http://javafx.com/javafx/8" xmlns:fx="http://javafx.com/fxml/1" fx:controller="AppController">
<children>
<VBox layoutX="250.0" layoutY="79.0" prefHeight="200.0" prefWidth="100.0" AnchorPane.bottomAnchor="0.0" AnchorPane.leftAnchor="0.0" AnchorPane.rightAnchor="0.0" AnchorPane.topAnchor="0.0">
<children>
<HBox alignment="CENTER_LEFT" prefHeight="61.0" prefWidth="600.0">
<children>
<TextField fx:id="input" prefHeight="30.0" prefWidth="399.0" />
<Button fx:id="send" mnemonicParsing="false" onAction="#onSend" prefHeight="30.0" prefWidth="137.0" text="发送">
<HBox.margin>
<Insets left="40.0" />
</HBox.margin>
</Button>
</children>
<opaqueInsets>
<Insets />
</opaqueInsets>
</HBox>
<TextArea fx:id="receive" prefHeight="332.0" prefWidth="580.0" />
</children>
<padding>
<Insets left="10.0" right="10.0" />
</padding>
</VBox>
</children>
</AnchorPane>
import javafx.event.ActionEvent;
import javafx.fxml.FXML;
import javafx.scene.control.Button;
import javafx.scene.control.TextArea;
import javafx.scene.control.TextField;
public class AppController implements Controller{
@FXML
Button send;
@FXML
TextArea receive;
@FXML
TextField input;
SocketServer server;
@FXML
public void initialize(){
new Thread(()->{
server = new SocketServer();
try {
server.init();
server.listen();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
Controllers.controllers.put("socket",this);
}
public void onSend(ActionEvent event){
server.write(input.getText());
}
public TextArea getReceive() {
return receive;
}
public Button getSend() {
return send;
}
}
//Controller.java
public interface Controller {
}
//Controllers
import java.util.concurrent.ConcurrentHashMap;
public class Controllers {
public static ConcurrentHashMap<String ,Controller> controllers = new ConcurrentHashMap();
public static <T> T getController(String key,Class<T> clazz){
return (T) controllers.get(key);
}
}
//SocketServer
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class SocketServer {
public Selector selector;
public ByteBuffer buffer = ByteBuffer.allocate(1024);
public List<SocketChannel> channels = new ArrayList<>();
public void init() throws IOException {
selector = Selector.open();
ServerSocketChannel socketChannel = ServerSocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.bind(new InetSocketAddress(53055));
socketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
public void listen() throws Exception {
while (true) {
if (selector.select() > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey next = iterator.next();
iterator.remove();
if (next.isAcceptable()) {
System.out.println("one device connected");
ServerSocketChannel server = (ServerSocketChannel) next.channel();
SocketChannel accept = server.accept();
accept.configureBlocking(false);
accept.register(selector, SelectionKey.OP_READ);
channels.add(accept);
}
if (next.isReadable()) {
SocketChannel channel = (SocketChannel) next.channel();
buffer.clear();
int len = 0;
while ((len = channel.read(buffer)) > 0) {
buffer.flip();
AppController socket = Controllers.getController("socket", AppController.class);
socket.getReceive().appendText(new String(buffer.array(), 0, len));
System.out.println(new String(buffer.array(), 0, len));
}
if (len < 0) {
channel.close();
}
}
}
}
}
}
public void write(String msg){
channels.forEach(channel->{
buffer.clear();
buffer.put(msg.getBytes());
try {
buffer.flip();
channel.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
最后:大幻梦森罗万象狂气断罪眼\ (•◡•) /
网友评论