客户端代码
package com.demo.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class ClientHandler implements Runnable {
private SocketChannel socketChannel;
private int port;
private Selector selector;
private String host;
private boolean stop;
public ClientHandler(String host, int port) {
this.host = host;
this.port = port;
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
public void run() {
try {
doConnect();// 连接
} catch (IOException e) {
e.printStackTrace();
}
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
handleInput(key);
}
} catch (IOException e) {
e.printStackTrace();
}
}
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
SocketChannel sc = (SocketChannel) key.channel();
if (key.isConnectable()) {
if (sc.finishConnect()) {
} else {
System.exit(1);
}
}
if (key.isReadable()) {// 读取消息
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int read = sc.read(readBuffer);
if (read > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "utf-8");
System.out.println("现在时间为:" + body);
this.stop = true;
} else if (read < 0) {
key.cancel();
sc.close();
} else {
}
}
}
}
private void doConnect() throws IOException {
if (socketChannel.connect(new InetSocketAddress(host, port))) {
socketChannel.register(selector, SelectionKey.OP_READ);
} else {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
}
private void doWrite(SocketChannel socketChannel,String request)throws IOException {
byte[] bytes = request.getBytes();
ByteBuffer writeBuff = ByteBuffer.allocate(bytes.length);
writeBuff.put(bytes);
writeBuff.flip();
socketChannel.write(writeBuff);
if (!writeBuff.hasRemaining()) {
System.out.println("客户端发送命令成功");
}
}
public void sendMsg(String msg) throws Exception{
doWrite(socketChannel, msg);
}
}
package com.demo.nio;
import java.util.Scanner;
public class Client {
private static String DEFAULT_HOST = "127.0.0.1";
private static int DEFAULT_PORT = 8081;
private static ClientHandler clientHandler;
public static void start(){
start(DEFAULT_HOST,DEFAULT_PORT);
}
public static synchronized void start(String ip,int port){
clientHandler = new ClientHandler(ip,port);
new Thread(clientHandler,"Server").start();
}
//向服务器发送消息
public static boolean sendMsg(String msg) throws Exception{
clientHandler.sendMsg(msg);
return true;
}
@SuppressWarnings("resource")
public static void main(String[] args) {
// 运行客户端
Client.start();
try {
while (Client.sendMsg(new Scanner(System.in).nextLine()));
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
服务端代码
package com.demo.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
public class ServerHandler implements Runnable {
private Selector selector = null;
private ServerSocketChannel serverChannel = null;
private boolean stop;
/**
* 初始化多路复用器,绑定监听端口
*
* @param port
*/
public ServerHandler(int port) {
try {
selector = Selector.open();
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(port), 1024);
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器监听" + port);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
public void stop() {
this.stop = true;
}
public void run() {
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (IOException e) {
if (key != null) {
key.cancel();
if (key.channel() != null)
key.channel().close();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 处理事件
* @param key
* @throws IOException
*/
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuff = ByteBuffer.allocate(1024);
//非阻塞的
int read = sc.read(readBuff);
if (read > 0) {
readBuff.flip();
byte[] bytes = new byte[readBuff.remaining()];
readBuff.get(bytes);
String body = new String(bytes, "utf-8");
System.out.println("服务收到消息:" + body);
String currentTime = new Date(System.currentTimeMillis()).toString();
doWrite(sc, currentTime);
} else if (read < 0) {
key.cancel();
sc.close();
} else {
}
}
}
}
/**
* 异步发送应答消息
* @param sc
* @param content
* @throws IOException
*/
private void doWrite(SocketChannel sc, String content) throws IOException {
if (content != null && content.trim().length() > 0) {
byte[] bytes = content.getBytes();
ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
byteBuffer.put(bytes);
byteBuffer.flip();
sc.write(byteBuffer);
}
}
}
package com.demo.nio;
public class Server {
private static int DEFAULT_PORT = 8081;
private static ServerHandler serverHandler;
public static void start() {
start(DEFAULT_PORT);
}
public static synchronized void start(int port) {
if (serverHandler != null) {
serverHandler.stop();
}
serverHandler = new ServerHandler(port);
new Thread(serverHandler, "Server").start();
}
public static void main(String[] args) {
// 运行服务器
Server.start();
}
}
网友评论