1、服务端的代码如下所示:
public classTCPServer {
private static finalIntegerPORT=9090;
public static voidstart()throwsException{
/**
*开启一个ServerSocketChannel
*/
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(newInetSocketAddress(PORT));
/**
*创建一个selector
*/
Selector selector = Selector.open();
/**
*将创建的serverSocketChannel注册到selector的选择器上,指定这个channel只关心OP_ACCEPT事件
*/
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
while(true){
/**
* select()操作,默认是阻塞模式的,当没有accept或者read事件道来的时候,将一直阻塞不会继续向下执行
*/
intreadChannels = selector.select();
if(readChannels <0){
continue;
}
/**
*从selector上获取IO事件,可能是accept,也可能是read
*/
Set selectionKeySet = selector.selectedKeys();
Iterator iter = selectionKeySet.iterator();
/**
*循环遍历SelectionKeys中所有的SelectionKey
*/
while(iter.hasNext()){
SelectionKey ket = iter.next();
if(ket.isAcceptable()){//处理OP_ACCEPT事件
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ);
}else if(ket.isReadable()){//处理OP_READ事件
SocketChannel socketChannel = (SocketChannel)ket.channel();
StringBuilder sb =newStringBuilder();
ByteBuffer buffer = ByteBuffer.allocate(1024);
intreadBytes =0;
intret =0;
/**
*注意读取数据的时候,ByteBuffer的操作,需要flip(),反转buffer,clear进行指针位置的调整
*/
while((ret = socketChannel.read(buffer)) >0){
readBytes += ret;
buffer.flip();
sb.append(Charset.forName("UTF-8").decode(buffer).toString());
buffer.clear();
}
if(readBytes ==0){
System.out.println("handle oppsite close Exception!!");
socketChannel.close();
}
String message = sb.toString();
System.out.println("Message from client : "+ message);
if(Constants.CLIENT_CLOSE.equalsIgnoreCase(message.toString().trim())){
System.out.println("client is going to shutdown!");
socketChannel.close();
}else if(Constants.SERVER_CLOSE.equalsIgnoreCase(message.trim())){
System.out.println("server is going to shutdown!");
socketChannel.close();
serverSocketChannel.close();
selector.close();
}else{
String outMessage ="Server response : "+ message;
socketChannel.write(Charset.forName("UTF-8").encode(outMessage));
}
}
//将selector上当前已经监听到的并且已经处理了的时间标记清除
iter.remove();
}
}
}
public static voidmain(String[] args){
try{
start();
}catch(Exception e) {
e.printStackTrace();
}
}
}
2、客户端的代码如下所示:
public classTCPClient {
private static finalStringHOST="127.0.0.1";
private static finalIntegerPORT=9090;
public static voidstart(String message)throwsException{
/**
*创建一个SocketChannel和一个Selector,将SocketChannel注册到Selector上面,
*并注册OP_CONNECT事件,设置SocketChannel为非阻塞模式
*/
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
//连接到指定的地址
socketChannel.connect(newInetSocketAddress(HOST,PORT));
Selector selector = Selector.open();
socketChannel.register(selector,SelectionKey.OP_CONNECT);
while(true){
if(socketChannel.isConnected()){
socketChannel.write(Charset.forName("UTF-8").encode(message));
if(message ==null|| message.equalsIgnoreCase(Constants.CLIENT_CLOSE)){
socketChannel.close();
selector.close();
System.out.println("See you,客户端退出系统了");
System.exit(0);
}
}
// select()进行IO事件选择操作
intnSelectedKeys = selector.select();
if(nSelectedKeys >0){
for(SelectionKey key : selector.selectedKeys()){
/**
*判断检测到的Channel是不是可连接的,将对应的Channel注册到选择器上面,指定关心的事件类型为OP_READ
*/
if(key.isConnectable()){
SocketChannel connChannel = (SocketChannel) key.channel();
connChannel.configureBlocking(false);
connChannel.register(selector,SelectionKey.OP_READ);
connChannel.finishConnect();
}
/**
*若检测到的IO事件是读事件,则处理相关数据的读相关的业务逻辑
*/
else if(key.isReadable()){
SocketChannel readChannel = (SocketChannel) key.channel();
StringBuilder sb =newStringBuilder();
ByteBuffer buffer = ByteBuffer.allocate(1024);
intreadBytes =0;
intret =0;
/**
*注意对ByteBuffer的读操作,需要关心的是flip,clear操作等等
*/
while((ret = readChannel.read(buffer)) >0){
readBytes += ret;
buffer.flip();
sb.append(Charset.forName("UTF-8").decode(buffer).toString());
buffer.clear();
}
String result = sb.toString();
System.out.println("Message from server : "+ result);
if(readBytes ==0){
System.out.println("handle opposite close Exception");
readChannel.close();
}
}
}
/**
*一次监听的事件处理完之后,需要将已经记录的事件清除掉,准备下一轮的事件标记
*/
selector.selectedKeys().clear();
}else{
System.out.println("handle select timeout Exception");
socketChannel.close();
}
}
}
public static voidmain(String[] args){
try{
Scanner sc =newScanner(System.in);
String message = sc.nextLine();
start(message);
}catch(Exception e) {
e.printStackTrace();
}
}
}
网友评论