一.例子
服务端:
TimeServer类:
public class TimeServer {
public static void main(String[] args) {
int port=8080;
MutiplexerTimeServer timeServer=new MutiplexerTimeServer(port);
new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start();
}
}
MutiplexerTimeServer类:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
public class MutiplexerTimeServer implements Runnable {
private Selector selector;
private ServerSocketChannel servChannel;
private volatile boolean stop;
public MutiplexerTimeServer(int port) {
try {
//1.创建一个多路复用器Selector
selector = Selector.open();
//2.打开ServerSocketChannel,用于监听客户端的连接,是所有客户端连接的父管道。
servChannel=ServerSocketChannel.open();
//3.把serverSocketChannel设为非阻塞模式
servChannel.configureBlocking(false);
//4.serverSocketChannel的backlog设置为1024.
servChannel.socket().bind(new InetSocketAddress(port),1024);
//5.初始化成功后,将ServerSocketChannel注册到Selector,监听SelectionKey.OP_ACCEPT
//操作位
servChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The time server is start in port:"+port);
}catch (IOException e){
e.printStackTrace();
System.exit(1);
}
}
public void stop(){
this.stop=true;
}
@Override
public void run() {
while (!stop){
try{
//6.如果至少有一个channel被 selected 就返回,否则就阻塞1s。返回值是 selected的channel数量。
selector.select(1000);
Set<SelectionKey> selectionKeys=selector.selectedKeys();
Iterator<SelectionKey> it=selectionKeys.iterator();
SelectionKey key=null;
//7.轮询所有的key。
while (it.hasNext()){
key=it.next();
it.remove();
try{
//8.处理轮询到的事件
handleInput(key);
}catch (Exception e){
if(key!=null){
key.cancel();
if(key.channel()!=null){
key.channel().close();
}
}
}
}
}catch (Throwable t){
t.printStackTrace();
}
}
//多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册
//并关闭,所以不需要重复释放资源。
if(selector!=null){
try {
selector.close();
}catch (IOException e){
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException{
if(key.isValid()){
//9.key对应的channel是否准备好连接了。
if(key.isAcceptable()){
ServerSocketChannel ssc=(ServerSocketChannel)key.channel();
SocketChannel sc=ssc.accept();
sc.configureBlocking(false);
//10.将连接准备好的SocketChannel的SelectionKey.OP_READ(读准备)事件注册到Selector
sc.register(selector,SelectionKey.OP_READ);
}//11.key对应的channel是否准备好读了
if(key.isReadable()){
//read the data
SocketChannel sc=(SocketChannel)key.channel();
ByteBuffer readBuffer=ByteBuffer.allocate(1024);
int readBytes=sc.read(readBuffer);
if(readBytes>0){
readBuffer.flip();
byte[] bytes=new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body=new String(bytes,"UTF-8");
System.out.println("The time server receice order:"+body);
String currentTime="QUERY TIME ORDER".equalsIgnoreCase(body)?new Date(System.currentTimeMillis()).toString():"BAD ORDER";
doWrite(sc,currentTime);
}else if(readBytes<0){//read返回-1说明客户端的数据发送完毕
//对端链路关闭
key.cancel();
sc.close();
}else ;
/*
read返回0有3种情况,一是某一时刻socketChannel中当前(注意是当前)没有数据可以读,这时会返回0,
其次是bytebuffer的position等于limit了,即bytebuffer的remaining等于0,这个时候也会返回0,
最后一种情况就是客户端的数据发送完毕了,这个时候客户端想获取服务端的反馈调用了recv函数,若服务端继续read,这个时候就会返回0。
*/
}
}
}
private void doWrite(SocketChannel channel,String response) throws IOException{
if(response!=null&&response.trim().length()>0){
byte[] bytes=response.getBytes();
ByteBuffer writeBuffer=ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
}
}
}
客户端:
TimeClient类:
public class TimeClient {
public static void main(String[] args) {
new Thread(new TimeClientHandle("127.0.0.1",8080),"TimeClient-001").start();
}
}
TimeClientHandle类:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class TimeClientHandle implements Runnable{
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
public TimeClientHandle(String host, int port) {
this.host = "127.0.0.1";
this.port = port;
try {
//1.初始化NIO多路复用器。
selector = Selector.open();
//2.初始化SocketChannel。
socketChannel = SocketChannel.open();
//3.设置SocketChannel为非阻塞io。
socketChannel.configureBlocking(false);
}catch (IOException e){
e.printStackTrace();
System.exit(1);
}
}
@Override
public void run() {
try {
doConnect();
}catch (IOException e){
e.printStackTrace();
System.exit(1);
}
while (!stop){
try {
selector.select(1000);
Set<SelectionKey> selectionKeys=selector.selectedKeys();
Iterator<SelectionKey> it=selectionKeys.iterator();
SelectionKey key=null;
while (it.hasNext()){
key=it.next();
it.remove();
try{
handleInput(key);
}catch (Exception e){
if(key!=null){
key.cancel();
if(key.channel()!=null){
key.channel().close();
}
}
}
}
}catch (Exception e){
e.printStackTrace();
System.exit(1);
}
}
if(selector!=null){
try {
selector.close();
}catch (IOException e){
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException{
if(key.isValid()){
SocketChannel sc=(SocketChannel)key.channel();
if(key.isConnectable()){
if(sc.finishConnect()){
sc.register(selector,SelectionKey.OP_READ);
doWrite(sc);
}else {
System.exit(1);
}
}
if(key.isReadable()){
ByteBuffer readBuffer=ByteBuffer.allocate(1024);
int readBytes=sc.read(readBuffer);
if(readBytes>0){
readBuffer.flip();
byte[] bytes=new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body=new String(bytes,"UTF-8");
System.out.println("Now is:"+body);
this.stop=true;
}else if(readBytes<0){//read返回-1说明客户端的数据发送完毕
//对端链路关闭
key.cancel();
sc.close();
}else ;
}
}
}
private void doConnect() throws IOException{
//如果成功连接就注册OP_READ
if(socketChannel.connect(new InetSocketAddress(host,port))){
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
}else {//没有连接成功则注册OP_CONNECT
socketChannel.register(selector,SelectionKey.OP_CONNECT);
}
}
private void doWrite(SocketChannel sc) throws IOException{
byte[] req="QUERY TIME ORDER".getBytes();
System.out.println("req.length:"+req.length);
ByteBuffer writeBuffer=ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
sc.write(writeBuffer);
if(!writeBuffer.hasRemaining()){
System.out.println("Send order 2 server succeed");
}
}
}
NIO编程的优点:
1.客户端发起的连接,读,写操作都是异步的
Q&A
1.为什么不注册OP_WRITE事件,在轮询到key的时候不判断key.isWritable()呢?
key.isWritable()是表示Socket可写,网络不出现阻塞情况下,一直是可以写的,所认一直为true.一般我们不注册OP_WRITE事件.
网友评论