NIO

作者: 兮兮码字的地方 | 来源:发表于2020-11-14 17:42 被阅读0次

客户端代码

package com.demo.nio;

import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.ByteBuffer;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.SocketChannel;

import java.util.Iterator;

import java.util.Set;

public class ClientHandler implements Runnable {

private SocketChannel socketChannel;

private int port;

private Selector selector;

private String host;

private boolean stop;

public ClientHandler(String host, int port) {

this.host = host;

this.port = port;

try {

selector = Selector.open();

socketChannel = SocketChannel.open();

socketChannel.configureBlocking(false);

} catch (IOException e) {

e.printStackTrace();

System.exit(1);

}

}

public void run() {

try {

doConnect();// 连接

} catch (IOException e) {

e.printStackTrace();

}

while (!stop) {

try {

selector.select(1000);

Set<SelectionKey> selectionKeys = selector.selectedKeys();

Iterator<SelectionKey> it = selectionKeys.iterator();

SelectionKey key = null;

while (it.hasNext()) {

key = it.next();

it.remove();

handleInput(key);

}

} catch (IOException e) {

e.printStackTrace();

}

}

if (selector != null) {

try {

selector.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

private void handleInput(SelectionKey key) throws IOException {

if (key.isValid()) {

SocketChannel sc = (SocketChannel) key.channel();

if (key.isConnectable()) {

if (sc.finishConnect()) {

} else {

System.exit(1);

}

}

if (key.isReadable()) {// 读取消息

ByteBuffer readBuffer = ByteBuffer.allocate(1024);

int read = sc.read(readBuffer);

if (read > 0) {

readBuffer.flip();

byte[] bytes = new byte[readBuffer.remaining()];

readBuffer.get(bytes);

String body = new String(bytes, "utf-8");

System.out.println("现在时间为:" + body);

this.stop = true;

} else if (read < 0) {

key.cancel();

sc.close();

} else {

}

}

}

}

private void doConnect() throws IOException {

if (socketChannel.connect(new InetSocketAddress(host, port))) {

socketChannel.register(selector, SelectionKey.OP_READ);

} else {

socketChannel.register(selector, SelectionKey.OP_CONNECT);

}

}

private void doWrite(SocketChannel socketChannel,String request)throws IOException {

byte[] bytes = request.getBytes();

ByteBuffer writeBuff = ByteBuffer.allocate(bytes.length);

writeBuff.put(bytes);

writeBuff.flip();

socketChannel.write(writeBuff);

if (!writeBuff.hasRemaining()) {

System.out.println("客户端发送命令成功");

}

}

public void sendMsg(String msg) throws Exception{ 

        doWrite(socketChannel, msg); 

    } 

}

package com.demo.nio;

import java.util.Scanner;

public class Client {

private static String DEFAULT_HOST = "127.0.0.1"; 

    private static int DEFAULT_PORT = 8081; 

    private static ClientHandler clientHandler; 

    public static void start(){ 

        start(DEFAULT_HOST,DEFAULT_PORT); 

    } 

    public static synchronized void start(String ip,int port){ 

        clientHandler = new ClientHandler(ip,port); 

        new Thread(clientHandler,"Server").start(); 

    } 

    //向服务器发送消息 

    public static boolean sendMsg(String msg) throws Exception{ 

        clientHandler.sendMsg(msg); 

        return true; 

    }

@SuppressWarnings("resource")

public static void main(String[] args) {

// 运行客户端

Client.start();

try {

while (Client.sendMsg(new Scanner(System.in).nextLine()));

} catch (Exception e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

服务端代码

package com.demo.nio;

import java.io.IOException;

import java.net.InetSocketAddress;

import java.nio.ByteBuffer;

import java.nio.channels.SelectionKey;

import java.nio.channels.Selector;

import java.nio.channels.ServerSocketChannel;

import java.nio.channels.SocketChannel;

import java.util.Date;

import java.util.Iterator;

import java.util.Set;

public class ServerHandler implements Runnable {

    private Selector selector = null;

    private ServerSocketChannel serverChannel = null;

    private boolean stop;

/**

* 初始化多路复用器,绑定监听端口

*

* @param port

*/

    public ServerHandler(int port) {

        try {

            selector = Selector.open();

            serverChannel = ServerSocketChannel.open();

            serverChannel.configureBlocking(false);

            serverChannel.socket().bind(new InetSocketAddress(port), 1024);

            serverChannel.register(selector, SelectionKey.OP_ACCEPT);

            System.out.println("服务器监听" + port);

        } catch (IOException e) {

            e.printStackTrace();

            System.exit(1);

        }

    }

    public void stop() {

        this.stop = true;

    }

    public void run() {

        while (!stop) {

            try {

                selector.select(1000);

                Set<SelectionKey> selectionKeys = selector.selectedKeys();

                Iterator<SelectionKey> it = selectionKeys.iterator();

                SelectionKey key = null;

                while (it.hasNext()) {

                    key = it.next();

                    it.remove();

                    try {

                        handleInput(key);

                    } catch (IOException e) {

                        if (key != null) {

                            key.cancel();

                            if (key.channel() != null)

                                key.channel().close();

                        }

                    }

                }

            } catch (IOException e) {

                e.printStackTrace();

            }

        }

        if (selector != null) {

            try {

                selector.close();

            } catch (IOException e) {

                e.printStackTrace();

            }

        }

    }

    /**

    * 处理事件

    * @param key

    * @throws IOException

    */

    private void handleInput(SelectionKey key) throws IOException {

        if (key.isValid()) {

            if (key.isAcceptable()) {

                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();

                SocketChannel sc = ssc.accept();

                sc.configureBlocking(false);

                sc.register(selector, SelectionKey.OP_READ);

            }

            if (key.isReadable()) {

                SocketChannel sc = (SocketChannel) key.channel();

                ByteBuffer readBuff = ByteBuffer.allocate(1024);

                //非阻塞的

                int read = sc.read(readBuff);

                if (read > 0) {

                    readBuff.flip();

                    byte[] bytes = new byte[readBuff.remaining()];

                    readBuff.get(bytes);

                    String body = new String(bytes, "utf-8");

                    System.out.println("服务收到消息:" + body);

                    String currentTime = new Date(System.currentTimeMillis()).toString();

                    doWrite(sc, currentTime);

                } else if (read < 0) {

                    key.cancel();

                    sc.close();

                } else {

                }

            }

        }

    }

    /**

    * 异步发送应答消息

    * @param sc

    * @param content

    * @throws IOException

    */

    private void doWrite(SocketChannel sc, String content) throws IOException {

        if (content != null && content.trim().length() > 0) {

            byte[] bytes = content.getBytes();

            ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);

            byteBuffer.put(bytes);

            byteBuffer.flip();

            sc.write(byteBuffer);

        }

    }

}

package com.demo.nio;

public class Server {

private static int DEFAULT_PORT = 8081;

private static ServerHandler serverHandler;

public static void start() {

start(DEFAULT_PORT);

}

public static synchronized void start(int port) {

if (serverHandler != null) {

serverHandler.stop();

}

serverHandler = new ServerHandler(port);

new Thread(serverHandler, "Server").start();

}

public static void main(String[] args) {

// 运行服务器

Server.start();

}

}

相关文章

网友评论

      本文标题:NIO

      本文链接:https://www.haomeiwen.com/subject/odjwbktx.html