步骤说明
1、配置服务端端口;
2、构建服务端启动线程;
3、初始化线程参数;
4、打开nio选择器selector;
5、打开ServerSocketChannel;
6、设置ServerChannel为非阻塞;
7、给ServerChannel的socket绑定端口、请求队列长度;
8、将ServerChannel注册到选择器上,并且指定操作键为等待接收连接——OP_ACCEPT;
9、间隔轮询,获取所有选择键;
10、封装一个处理每一个选择键的方法;
11、判断键是否有效;
12、判断键的属性;
13、如果是请求接入(isAcceptable),可以通过key获取channel为ServerSocketChannel;
14、这时可以通过ServerChannel.accept获取SocketChannel;
15、同样将SocketChannel设置为非阻塞;
16、同样将SocketChannel注册到selector,并且设置操作键为等待读取内容——OP_READ;
17、如果键的属性是可读取(isReadable),则可以从key中读取channel为SocketChannel;
18、创建Buffer,用Buffer配合SocketChannel读取数据;
19、将响应数据写入Buffer,再通过SocketChannel写出Buffer中的数据;
代码如下
public class NioTimeServer {
public static void main(String[] args) {
// System.exit(1);
int port = 10002;
if (args != null && args.length > 0) {
port = Integer.valueOf(args[0]);
}
// 启动一个线程来 开启一个 nio server服务
MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
new Thread(timeServer, "NIO-multiplexeerTimeServer-001").start();
}
}
class MultiplexerTimeServer implements Runnable {
private Selector selector;
private ServerSocketChannel servChannel;
private volatile boolean stop;
public MultiplexerTimeServer(int port) {
// 初始化 nio server配置
try {
// 初始化 选择器
selector = Selector.open();
// 打开 server端通道
servChannel = ServerSocketChannel.open();
// 设置为非阻塞
servChannel.configureBlocking(false);
// 为server端通道socekt绑定端口 、 请求的传入连接队列的最大长度为1024字节。
servChannel.socket().bind(new InetSocketAddress(port), 1024);
// 用给定的选择器注册此频道,返回选择键
// 将server端通道注册到选择器上、 用于套接字接受操作的操作设置位
SelectionKey register = servChannel.register(selector, SelectionKey.OP_ACCEPT);
// The servChannel.register SelectionKey key is: class
// sun.nio.ch.SelectionKeyImpltruefalse
System.out.println("The servChannel.register SelectionKey key is: "
+ register.getClass() + register.isValid() + register.isAcceptable());
System.err.println("The time server is start in port :" + port);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
public void stop() {
this.stop = true;
}
@Override
public void run() {
while (!stop) {
try {
// 1秒 轮询 一次 选择器上的 通道
selector.select(1000);
// 获取该选择器上的 所有 可选择的 key
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 遍历key
Iterator<SelectionKey> it = selectionKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
// 可以用异步 线程池 处理该 key
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
// 处理新接入的请求消息
if (key.isAcceptable()) {// 数据接入
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
// 将socket通道同样注册到选择器 并且告知 key的 操作为 op_read
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {// 数据可读
// read the data
SocketChannel sc = (SocketChannel) key.channel();
// 字节缓冲区 允许 1024字节
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
// 判断 buffer区是否有东西可读
if (readBytes > 0) {
// 翻转此缓冲区
// 将限制设置为当前位置,然后位置设置为零。如果标记已定义,则它是丢弃。
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
// 数据读取
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("The time server receive order : " + body);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)
? new Date(System.currentTimeMillis()).toString()
: "BAD ORDER";
doWrite(sc, currentTime);
} else if(readBytes < 0) {
// 对端链路关闭
key.cancel();
sc.close();
} else {
; // 读到0字忽略
}
}
}
}
private void doWrite(SocketChannel channel, String response) throws IOException {
if (response != null && response.trim().length() > 0) {
byte[] bytes = response.getBytes();
ByteBuffer writerBuffer = ByteBuffer.allocate(bytes.length);
writerBuffer.put(bytes);
writerBuffer.flip();
channel.write(writerBuffer);
}
}
网友评论