声明
该系列文章由书籍《Netty权威指南》第二版整理而来。只为记录学习笔记。
若认为内容侵权请及时通知本人删除相关内容。
[TOC]
时间服务器--NIO
服务端代码
服务端主程序
public class TimeServer {
public static void main(String[] args) {
int port = 1234;
try {
TimeServerDispatcher timeServer;
timeServer = new TimeServerDispatcher(port);
new Thread(timeServer).start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
服务端处理类
public class TimeServerDispatcher implements Runnable {
private Selector selector;
private ServerSocketChannel ssc;
private volatile boolean stop;
private DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public TimeServerDispatcher(int port) throws IOException {
selector = Selector.open();
ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.socket().bind(new InetSocketAddress(port), 10000);
ssc.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The time server is listening on port : " + port);
}
public void stop() {
this.stop = true;
}
@Override
public void run() {
while (!this.stop) {
try {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
for (SelectionKey k : selectedKeys) {
selectedKeys.remove(k);
try {
doProcessRequest(k);
} catch (Exception e) {
if (k != null) {
k.cancel();
if (k.channel() != null)
k.channel().close();
}
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void doProcessRequest(SelectionKey key) throws IOException {
if (!key.isValid())
return;
if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int len = sc.read(readBuffer);
if (len > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String reqMsg = new String(bytes, "UTF-8");
System.out.println("request msg is : " + reqMsg);
String respMsg = this.df.format(new Date());
doResponse(sc, respMsg);
} else if (len < 0) {
key.cancel();
sc.close();
} else {
// 0
}
}
}
private void doResponse(SocketChannel sc, String respMsg) throws IOException {
if (respMsg == null)
return;
byte[] bytes = respMsg.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
sc.write(writeBuffer);
}
}
客户端代码
客户端主程序
public class TimeClient {
public static void main(String[] args) {
int port = 1234;
new Thread(new TimeClientHandler("127.0.0.1", port)).start();
}
}
客户端处理程序
public class TimeClientHandler implements Runnable {
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
public TimeClientHandler(String host, int port) {
this.host = host;
this.port = port;
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
if (socketChannel.connect(new InetSocketAddress(host, port))) {
socketChannel.register(selector, SelectionKey.OP_READ);
doSendReqMsg(socketChannel, "Hi Server !");
} else
socketChannel.register(selector, SelectionKey.OP_CONNECT);
} catch (IOException e) {
e.printStackTrace();
}
while (!this.stop) {
try {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
for (SelectionKey k : selectedKeys) {
selectedKeys.remove(k);
try {
doProcessResponse(k);
} catch (Exception e) {
if (k != null) {
k.cancel();
if (k.channel() != null)
k.channel().close();
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void doProcessResponse(SelectionKey key) throws IOException {
if (!key.isValid())
return;
SocketChannel sc = (SocketChannel) key.channel();
if (key.isConnectable()) {
if (sc.finishConnect()) {
sc.register(selector, SelectionKey.OP_READ);
doSendReqMsg(sc, "Hi Server !");
} else {
throw new RuntimeException("连接失败");
}
}
if (key.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String respMsg = new String(bytes, "UTF-8");
System.out.println("time : " + respMsg);
this.stop = true;
} else if (readBytes < 0) {
key.cancel();
sc.close();
} else {
// 0
}
}
}
private void doSendReqMsg(SocketChannel sc, String reqMsg) throws IOException {
byte[] req = reqMsg.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
sc.write(writeBuffer);
}
}
总结
这种模型有如下特点:
- 客户端的连接操作时异步的
- SocketChannel的读写操作是异步的
- 一个Selector可以处理成千上万个客户端连接
但是:
- 代码复杂
- 这里的实现可能出现 "读半包","写半包"的情况
参考资料: 《Netty权威指南》第二版
网友评论