美文网首页
NIO系列-04-AIO

NIO系列-04-AIO

作者: hylexus | 来源:发表于2016-09-28 23:22 被阅读39次

    声明

    该系列文章由书籍《Netty权威指南》第二版整理而来。只为记录学习笔记。
    若认为内容侵权请及时通知本人删除相关内容。

    [TOC]

    时间服务器--AIO

    服务端代码

    服务端主程序

    public class TimeServer {
    
        public static void main(String[] args) throws IOException {
            int port = 1234;
            new Thread(new TimeServerHandler(port)).start();
        }
    }
    
    

    服务端处理程序

    public class TimeServerHandler implements Runnable {
    
        private int port;
    
        private CountDownLatch latch;
        private AsynchronousServerSocketChannel asynchronousServerSocketChannel;
    
        public TimeServerHandler(int port) {
            this.port = port;
            try {
                this.asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
                this.asynchronousServerSocketChannel.bind(new InetSocketAddress(this.port));
                System.out.println("The time server is listening in port : " + this.port);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void run() {
    
            this.latch = new CountDownLatch(1);
            doAccept();
            try {
                this.latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        private void doAccept() {
            this.asynchronousServerSocketChannel.accept(this,
                    new CompletionHandler<AsynchronousSocketChannel, TimeServerHandler>() {
    
                        @Override
                        public void completed(AsynchronousSocketChannel result, TimeServerHandler attachment) {
                            attachment.asynchronousServerSocketChannel.accept(attachment, this);
                            ByteBuffer buffer = ByteBuffer.allocate(1024);
                            result.read(buffer, buffer, new TimeServerReadCompletionHandler(result));
                        }
    
                        @Override
                        public void failed(Throwable exc, TimeServerHandler attachment) {
                            exc.printStackTrace();
                            attachment.latch.countDown();
                        }
                    });
        }
    
    //  public AsynchronousServerSocketChannel getAsynchronousServerSocketChannel() {
    //      return asynchronousServerSocketChannel;
    //  }
    //
    //  public void setAsynchronousServerSocketChannel(AsynchronousServerSocketChannel asynchronousServerSocketChannel) {
    //      this.asynchronousServerSocketChannel = asynchronousServerSocketChannel;
    //  }
    //
    //  public CountDownLatch getLatch() {
    //      return latch;
    //  }
    //
    //  public void setLatch(CountDownLatch latch) {
    //      this.latch = latch;
    //  }
    
    }
    
    

    服务端处理程序

    public class TimeServerReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
    
        private AsynchronousSocketChannel channel;
        private DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
        public TimeServerReadCompletionHandler(AsynchronousSocketChannel channel) {
            if (this.channel == null)
                this.channel = channel;
        }
    
        @Override
        public void completed(Integer result, ByteBuffer attachment) {
            attachment.flip();
            byte[] body = new byte[attachment.remaining()];
            attachment.get(body);
            try {
                String req = new String(body, "UTF-8");
                System.out.println("request msg is : " + req);
                doWrite(this.df.format(new Date()));
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
    
        private void doWrite(String currentTime) {
            if (currentTime != null && currentTime.trim().length() > 0) {
                byte[] bytes = (currentTime).getBytes();
                ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
                writeBuffer.put(bytes);
                writeBuffer.flip();
                channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                    @Override
                    public void completed(Integer result, ByteBuffer buffer) {
                        if (buffer.hasRemaining())
                            channel.write(buffer, buffer, this);
                    }
    
                    @Override
                    public void failed(Throwable exc, ByteBuffer attachment) {
                        try {
                            channel.close();
                        } catch (IOException e) {
                        }
                    }
                });
            }
        }
    
        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            try {
                this.channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
    }
    
    

    客户端代码

    客户端主程序

    public class TimeClient {
    
        public static void main(String[] args) {
            int port = 1234;
            new Thread(new TimeClientHandler("127.0.0.1", port)).start();
        }
    }
    
    

    客户端处理类

    public class TimeClientHandler implements CompletionHandler<Void, TimeClientHandler>, Runnable {
    
        private AsynchronousSocketChannel asc;
        private String host;
        private int port;
        private CountDownLatch latch;
    
        public TimeClientHandler(String host, int port) {
            this.host = host;
            this.port = port;
            try {
                this.asc = AsynchronousSocketChannel.open();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void run() {
    
            latch = new CountDownLatch(1);
            asc.connect(new InetSocketAddress(host, port), this, this);
            try {
                latch.await();
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
            try {
                asc.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void completed(Void result, TimeClientHandler attachment) {
            byte[] req = "Hi Server !".getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
            writeBuffer.put(req);
            writeBuffer.flip();
            asc.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer result, ByteBuffer buffer) {
                    if (buffer.hasRemaining()) {
                        asc.write(buffer, buffer, this);
                    } else {
                        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                        asc.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                            @Override
                            public void completed(Integer result, ByteBuffer buffer) {
                                buffer.flip();
                                byte[] bytes = new byte[buffer.remaining()];
                                buffer.get(bytes);
                                String body;
                                try {
                                    body = new String(bytes, "UTF-8");
                                    System.out.println("Now is : " + body);
                                    latch.countDown();
                                } catch (UnsupportedEncodingException e) {
                                    e.printStackTrace();
                                }
                            }
    
                            @Override
                            public void failed(Throwable exc, ByteBuffer attachment) {
                                try {
                                    asc.close();
                                    latch.countDown();
                                } catch (IOException e) {
                                }
                            }
                        });
                    }
                }
    
                @Override
                public void failed(Throwable exc, ByteBuffer attachment) {
                    try {
                        asc.close();
                        latch.countDown();
                    } catch (IOException e) {
                    }
                }
            });
        }
    
        @Override
        public void failed(Throwable exc, TimeClientHandler attachment) {
            exc.printStackTrace();
            try {
                asc.close();
                latch.countDown();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
    }
    
    

    总结

    这种AIO模型有如下特点:

    • 代码复杂
    • 调试困难

    参考资料: 《Netty权威指南》第二版

    相关文章

      网友评论

          本文标题:NIO系列-04-AIO

          本文链接:https://www.haomeiwen.com/subject/jlskyttx.html