美文网首页
基于JavaNIO的服务间的通讯

基于JavaNIO的服务间的通讯

作者: tukangzheng | 来源:发表于2017-10-27 14:12 被阅读0次

    1、服务端的代码如下所示:

    public classTCPServer {

    private static finalIntegerPORT=9090;

    public static voidstart()throwsException{

    /**

    *开启一个ServerSocketChannel

    */

    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

    serverSocketChannel.configureBlocking(false);

    serverSocketChannel.bind(newInetSocketAddress(PORT));

    /**

    *创建一个selector

    */

    Selector selector = Selector.open();

    /**

    *将创建的serverSocketChannel注册到selector的选择器上,指定这个channel只关心OP_ACCEPT事件

    */

    serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);

    while(true){

    /**

    * select()操作,默认是阻塞模式的,当没有accept或者read事件道来的时候,将一直阻塞不会继续向下执行

    */

    intreadChannels = selector.select();

    if(readChannels <0){

    continue;

    }

    /**

    *从selector上获取IO事件,可能是accept,也可能是read

    */

    Set selectionKeySet = selector.selectedKeys();

    Iterator iter = selectionKeySet.iterator();

    /**

    *循环遍历SelectionKeys中所有的SelectionKey

    */

    while(iter.hasNext()){

    SelectionKey ket = iter.next();

    if(ket.isAcceptable()){//处理OP_ACCEPT事件

    SocketChannel socketChannel = serverSocketChannel.accept();

    socketChannel.configureBlocking(false);

    socketChannel.register(selector,SelectionKey.OP_READ);

    }else if(ket.isReadable()){//处理OP_READ事件

    SocketChannel socketChannel = (SocketChannel)ket.channel();

    StringBuilder sb =newStringBuilder();

    ByteBuffer buffer = ByteBuffer.allocate(1024);

    intreadBytes =0;

    intret =0;

    /**

    *注意读取数据的时候,ByteBuffer的操作,需要flip(),反转buffer,clear进行指针位置的调整

    */

    while((ret = socketChannel.read(buffer)) >0){

    readBytes += ret;

    buffer.flip();

    sb.append(Charset.forName("UTF-8").decode(buffer).toString());

    buffer.clear();

    }

    if(readBytes ==0){

    System.out.println("handle oppsite close Exception!!");

    socketChannel.close();

    }

    String message = sb.toString();

    System.out.println("Message from client : "+ message);

    if(Constants.CLIENT_CLOSE.equalsIgnoreCase(message.toString().trim())){

    System.out.println("client is going to shutdown!");

    socketChannel.close();

    }else if(Constants.SERVER_CLOSE.equalsIgnoreCase(message.trim())){

    System.out.println("server is going to shutdown!");

    socketChannel.close();

    serverSocketChannel.close();

    selector.close();

    }else{

    String outMessage ="Server response : "+ message;

    socketChannel.write(Charset.forName("UTF-8").encode(outMessage));

    }

    }

    //将selector上当前已经监听到的并且已经处理了的时间标记清除

    iter.remove();

    }

    }

    }

    public static voidmain(String[] args){

    try{

    start();

    }catch(Exception e) {

    e.printStackTrace();

    }

    }

    }

    2、客户端的代码如下所示:

    public classTCPClient {

    private static finalStringHOST="127.0.0.1";

    private static finalIntegerPORT=9090;

    public static voidstart(String message)throwsException{

    /**

    *创建一个SocketChannel和一个Selector,将SocketChannel注册到Selector上面,

    *并注册OP_CONNECT事件,设置SocketChannel为非阻塞模式

    */

    SocketChannel socketChannel = SocketChannel.open();

    socketChannel.configureBlocking(false);

    //连接到指定的地址

    socketChannel.connect(newInetSocketAddress(HOST,PORT));

    Selector selector = Selector.open();

    socketChannel.register(selector,SelectionKey.OP_CONNECT);

    while(true){

    if(socketChannel.isConnected()){

    socketChannel.write(Charset.forName("UTF-8").encode(message));

    if(message ==null|| message.equalsIgnoreCase(Constants.CLIENT_CLOSE)){

    socketChannel.close();

    selector.close();

    System.out.println("See you,客户端退出系统了");

    System.exit(0);

    }

    }

    // select()进行IO事件选择操作

    intnSelectedKeys = selector.select();

    if(nSelectedKeys >0){

    for(SelectionKey key : selector.selectedKeys()){

    /**

    *判断检测到的Channel是不是可连接的,将对应的Channel注册到选择器上面,指定关心的事件类型为OP_READ

    */

    if(key.isConnectable()){

    SocketChannel connChannel = (SocketChannel) key.channel();

    connChannel.configureBlocking(false);

    connChannel.register(selector,SelectionKey.OP_READ);

    connChannel.finishConnect();

    }

    /**

    *若检测到的IO事件是读事件,则处理相关数据的读相关的业务逻辑

    */

    else if(key.isReadable()){

    SocketChannel readChannel = (SocketChannel) key.channel();

    StringBuilder sb =newStringBuilder();

    ByteBuffer buffer = ByteBuffer.allocate(1024);

    intreadBytes =0;

    intret =0;

    /**

    *注意对ByteBuffer的读操作,需要关心的是flip,clear操作等等

    */

    while((ret = readChannel.read(buffer)) >0){

    readBytes += ret;

    buffer.flip();

    sb.append(Charset.forName("UTF-8").decode(buffer).toString());

    buffer.clear();

    }

    String result = sb.toString();

    System.out.println("Message from server : "+ result);

    if(readBytes ==0){

    System.out.println("handle opposite close Exception");

    readChannel.close();

    }

    }

    }

    /**

    *一次监听的事件处理完之后,需要将已经记录的事件清除掉,准备下一轮的事件标记

    */

    selector.selectedKeys().clear();

    }else{

    System.out.println("handle select timeout Exception");

    socketChannel.close();

    }

    }

    }

    public static voidmain(String[] args){

    try{

    Scanner sc =newScanner(System.in);

    String message = sc.nextLine();

    start(message);

    }catch(Exception e) {

    e.printStackTrace();

    }

    }

    }

    相关文章

      网友评论

          本文标题:基于JavaNIO的服务间的通讯

          本文链接:https://www.haomeiwen.com/subject/oiwrpxtx.html