美文网首页
NIO系列-03-NIO

NIO系列-03-NIO

作者: hylexus | 来源:发表于2016-09-28 23:19 被阅读16次

    声明

    该系列文章由书籍《Netty权威指南》第二版整理而来。只为记录学习笔记。
    若认为内容侵权请及时通知本人删除相关内容。

    [TOC]

    时间服务器--NIO

    服务端代码

    服务端主程序

    public class TimeServer {
        public static void main(String[] args) {
            int port = 1234;
    
            try {
                TimeServerDispatcher timeServer;
                timeServer = new TimeServerDispatcher(port);
                new Thread(timeServer).start();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    服务端处理类

    public class TimeServerDispatcher implements Runnable {
    
        private Selector selector;
    
        private ServerSocketChannel ssc;
    
        private volatile boolean stop;
    
        private DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
        public TimeServerDispatcher(int port) throws IOException {
            selector = Selector.open();
            ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);
            ssc.socket().bind(new InetSocketAddress(port), 10000);
            ssc.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("The time server is listening on port : " + port);
        }
    
        public void stop() {
            this.stop = true;
        }
    
        @Override
        public void run() {
            while (!this.stop) {
                try {
                    selector.select(1000);
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
                    for (SelectionKey k : selectedKeys) {
                        selectedKeys.remove(k);
                        try {
                            doProcessRequest(k);
                        } catch (Exception e) {
                            if (k != null) {
                                k.cancel();
                                if (k.channel() != null)
                                    k.channel().close();
                            }
                        }
                    }
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
    
            if (selector != null) {
                try {
                    selector.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void doProcessRequest(SelectionKey key) throws IOException {
    
            if (!key.isValid())
                return;
    
            if (key.isAcceptable()) {
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                SocketChannel sc = ssc.accept();
                sc.configureBlocking(false);
                sc.register(selector, SelectionKey.OP_READ);
            }
            if (key.isReadable()) {
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int len = sc.read(readBuffer);
                if (len > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String reqMsg = new String(bytes, "UTF-8");
                    System.out.println("request msg is : " + reqMsg);
                    String respMsg = this.df.format(new Date());
                    doResponse(sc, respMsg);
                } else if (len < 0) {
                    key.cancel();
                    sc.close();
                } else {
                    // 0
                }
            }
        }
    
        private void doResponse(SocketChannel sc, String respMsg) throws IOException {
            if (respMsg == null)
                return;
            
            byte[] bytes = respMsg.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            sc.write(writeBuffer);
        }
    }
    
    

    客户端代码

    客户端主程序

    public class TimeClient {
    
        public static void main(String[] args) {
    
            int port = 1234;
            new Thread(new TimeClientHandler("127.0.0.1", port)).start();
        }
    }
    
    

    客户端处理程序

    public class TimeClientHandler implements Runnable {
    
        private String host;
        private int port;
    
        private Selector selector;
        private SocketChannel socketChannel;
    
        private volatile boolean stop;
    
        public TimeClientHandler(String host, int port) {
            this.host = host;
            this.port = port;
            try {
                selector = Selector.open();
                socketChannel = SocketChannel.open();
                socketChannel.configureBlocking(false);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void run() {
            try {
                if (socketChannel.connect(new InetSocketAddress(host, port))) {
                    socketChannel.register(selector, SelectionKey.OP_READ);
    
                    doSendReqMsg(socketChannel, "Hi Server !");
                } else
                    socketChannel.register(selector, SelectionKey.OP_CONNECT);
            } catch (IOException e) {
                e.printStackTrace();
            }
    
            while (!this.stop) {
                try {
                    selector.select(1000);
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
                    for (SelectionKey k : selectedKeys) {
                        selectedKeys.remove(k);
                        try {
                            doProcessResponse(k);
                        } catch (Exception e) {
                            if (k != null) {
                                k.cancel();
                                if (k.channel() != null)
                                    k.channel().close();
                            }
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
            if (selector != null) {
                try {
                    selector.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void doProcessResponse(SelectionKey key) throws IOException {
    
            if (!key.isValid())
                return;
    
            SocketChannel sc = (SocketChannel) key.channel();
    
            if (key.isConnectable()) {
                if (sc.finishConnect()) {
                    sc.register(selector, SelectionKey.OP_READ);
                    doSendReqMsg(sc, "Hi Server !");
                } else {
                    throw new RuntimeException("连接失败");
                }
            }
            if (key.isReadable()) {
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes = new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String respMsg = new String(bytes, "UTF-8");
                    System.out.println("time : " + respMsg);
                    this.stop = true;
                } else if (readBytes < 0) {
                    key.cancel();
                    sc.close();
                } else {
                    // 0
                }
            }
        }
    
        private void doSendReqMsg(SocketChannel sc, String reqMsg) throws IOException {
            byte[] req = reqMsg.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
            writeBuffer.put(req);
            writeBuffer.flip();
            sc.write(writeBuffer);
        }
    
    }
    
    

    总结

    这种模型有如下特点:

    • 客户端的连接操作时异步的
    • SocketChannel的读写操作是异步的
    • 一个Selector可以处理成千上万个客户端连接

    但是:

    • 代码复杂
    • 这里的实现可能出现 "读半包","写半包"的情况

    参考资料: 《Netty权威指南》第二版

    相关文章

      网友评论

          本文标题:NIO系列-03-NIO

          本文链接:https://www.haomeiwen.com/subject/nhskyttx.html