美文网首页
Java IO, NIO, AIO和Netty

Java IO, NIO, AIO和Netty

作者: bertrand319 | 来源:发表于2019-11-24 21:08 被阅读0次

    背景

    最近在回顾一下Java IO相关的知识,顺带写一下入门级别的文章。感觉工作以后很少写文章,一直想写点高质量的文章导致最后一篇文章都很难写。所以不写原理,只写实践,随大流,有问题请留言。(后续有时间再补充原理性的东西,从硬件到操作系统到JVM到JDK)

    实现案例

    创建一个server,可以接受多个client端的连接,接收到信息后返回一个接收到的信息。

    传统IO实现

    传统的IO就是我们所说的BIO(block io),

    server端源码如下

    package tech.sohocoder.postman.io;
    
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.net.InetSocketAddress;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ThreadFactory;
    
    public class Server {
    
        private ServerSocket serverSocket;
    
    
        private void start() throws IOException, ClassNotFoundException {
            InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", 9000);
            serverSocket = new ServerSocket();
            serverSocket.bind(inetSocketAddress);
            ExecutorService executorService = Executors.newCachedThreadPool(new CaughtExceptionsThreadFactory());
            while (true) {
                Socket socket = serverSocket.accept();
                System.out.println("accept socket: " + socket.getRemoteSocketAddress());
                executorService.submit(new SocketHandler(socket));
            }
        }
    
        private static class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
            @Override public void uncaughtException(Thread t, Throwable e) {
                e.printStackTrace();
            }
        }
    
        private class SocketHandler implements Runnable {
    
            private Socket socket;
    
            public SocketHandler(Socket socket) {
                this.socket = socket;
            }
    
            @Override
            public void run() {
                try {
                    while (true) {
                        ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
                        String message = ois.readObject().toString();
                        System.out.println("Message Received: " + message);
                        ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
                        //write object to Socket
                        oos.writeObject("Hi Client " + message);
                        if (message.equals("quit")) {
                            ois.close();
                            oos.close();
                            socket.close();
                            break;
                        }
                    }
                }catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) throws IOException, ClassNotFoundException {
            Server server = new Server();
            server.start();
        }
    }
    
    

    client端源码如下

    package tech.sohocoder.postman.io;
    
    import java.io.*;
    import java.net.Socket;
    
    public class Client {
    
        private Socket socket;
    
        public void start() throws IOException, ClassNotFoundException {
            socket = new Socket("localhost", 9000);
            if(socket.isConnected()) {
                System.out.println("socket is connected");
                BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
                for (;;) {
                    final String input = in.readLine();
                    final String line = input != null ? input.trim() : null;
                    if (null == line || line.isEmpty()) { // skip `enter` or `enter` with spaces.
                        continue;
                    }
                    ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
                    oos.writeObject(line);
                    ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
                    System.out.println("Message: " + ois.readObject());
                    if(line.equals("quit")) {
                        oos.close();
                        ois.close();
                        socket.close();
                        break;
                    }
                }
            }
            System.out.println("Bye");
        }
    
        public static void main(String[] args) throws IOException, ClassNotFoundException {
            Client client = new Client();
            client.start();
        }
    }
    
    

    NIO的阻塞实现

    NIO实际上就是面向缓存及通道的新型IO(由JSR 51定义,后面JSR 203进行了扩展,有兴趣阅读一下这两个JSR)可以支持阻塞和非阻塞方式。先实现一下阻塞方式

    client

    package tech.sohocoder.nio.block;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.InetSocketAddress;
    import java.net.SocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SocketChannel;
    
    import static java.lang.System.out;
    
    public class Client {
    
        private SocketChannel socketChannel;
    
        public void start() throws IOException {
            socketChannel = SocketChannel.open();
            SocketAddress socketAddress = new InetSocketAddress("localhost", 9000);
            socketChannel.connect(socketAddress);
    
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
            for (;;) {
                final String input = in.readLine();
                final String line = input != null ? input.trim() : null;
                if (null == line || line.isEmpty()) { // skip `enter` or `enter` with spaces.
                    continue;
                }
    
                ByteBuffer byteBuffer = ByteBuffer.wrap(line.getBytes());
                socketChannel.write(byteBuffer);
    
                if(line.equals("quit")) {
                    out.println("quit!");
                    socketChannel.close();
                    break;
                }
    
                ByteBuffer returnByteBuffer = ByteBuffer.allocate(1024);
                socketChannel.read(returnByteBuffer);
                String message = new String(returnByteBuffer.array()).trim();
                out.println("Receive message: " + message);
            }
        }
    
        public static void main(String[] args) throws IOException {
            tech.sohocoder.nio.noblock.Client client = new tech.sohocoder.nio.noblock.Client();
            client.start();
        }
    }
    
    

    server

    package tech.sohocoder.nio.block;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.net.SocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    
    public class Server {
    
        private ServerSocketChannel serverSocketChannel;
    
        private void start() throws IOException {
            serverSocketChannel = ServerSocketChannel.open();
            SocketAddress socketAddress = new InetSocketAddress(9000);
            serverSocketChannel.bind(socketAddress);
    
            while (true) {
                System.out.println("listening...");
                SocketChannel socketChannel = serverSocketChannel.accept();
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                int readLength = socketChannel.read(byteBuffer);
                if(readLength != -1) {
                    String receiveStr = new String(byteBuffer.array()).trim();
                    System.out.println(receiveStr);
                    socketChannel.write(byteBuffer);
                }
                socketChannel.close();
            }
        }
    
        public static void main(String[] args) throws IOException {
            Server server = new Server();
            server.start();
        }
    }
    
    

    NIO的非阻塞方式

    NIO如果需要非阻塞,需要使用到selector。selector是在JDK1.4加入,主要是用于支持IO多路复用,Linux下jdk实现就是基于epoll。

    client端代码保存一致。

    server端实际上就是使用一个线程来支持多个连接

    package tech.sohocoder.nio.noblock;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.net.SocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    import static java.lang.System.out;
    
    public class Server {
    
        private ServerSocketChannel serverSocketChannel;
    
        private Selector selector;
    
        private void start() throws IOException, InterruptedException {
            serverSocketChannel = ServerSocketChannel.open();
            SocketAddress socketAddress = new InetSocketAddress(9000);
            serverSocketChannel.bind(socketAddress);
    
            serverSocketChannel.configureBlocking(false);
    
            int opSelectionKey = serverSocketChannel.validOps();
    
            selector = Selector.open();
    
            SelectionKey selectionKey = serverSocketChannel.register(selector, opSelectionKey);
    
            out.println(selector);
            out.println(selectionKey);
            while(true) {
                out.println("waiting for connected...");
                selector.select();
                Set<SelectionKey> set  = selector.selectedKeys();
                Iterator<SelectionKey> iterator = set.iterator();
                while (iterator.hasNext()) {
                    SelectionKey mySelectionKey = iterator.next();
                    if(mySelectionKey.isAcceptable()) {
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        socketChannel.configureBlocking(false);
                        SelectionKey selectionKey1 = socketChannel.register(selector, SelectionKey.OP_READ);
                        out.println("socket channel selectionkey: " + selectionKey1);
                        out.println("connect from : " + socketChannel.getRemoteAddress());
                    }else if(mySelectionKey.isReadable()){
                        SocketChannel socketChannel = (SocketChannel) mySelectionKey.channel();
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        socketChannel.read(byteBuffer);
                        String message = new String(byteBuffer.array()).trim();
                        out.println("Receive message: " + message);
                        if(message.equals("quit")) {
                            out.println("close connection: " + socketChannel.getRemoteAddress());
                            socketChannel.close();
                            mySelectionKey.cancel();
                        }else {
                            ByteBuffer returnByteBuffer = ByteBuffer.wrap(" receive your message".getBytes());
                            socketChannel.write(returnByteBuffer);
                        }
                    }
                    iterator.remove();
                }
            }
    
        }
    
        public static void main(String[] args) throws IOException, InterruptedException {
            Server server = new Server();
            server.start();
        }
    }
    
    

    AIO实现

    上面的IO,NIO的阻塞实际上是同步阻塞的方式,NIO的非阻塞是同步非阻塞方式。AIO(asynchronous I/O))是异步IO,实现是异步非阻塞方式,在jdk1.7中引入。

    server端源码如下:

    package tech.sohocoder.aio;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousChannelGroup;
    import java.nio.channels.AsynchronousServerSocketChannel;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.CompletionHandler;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    
    import static java.lang.System.out;
    
    public class Server {
    
        private AsynchronousServerSocketChannel asynchronousServerSocketChannel;
    
        private void start() throws IOException, InterruptedException {
            // worker thread pool
            AsynchronousChannelGroup asynchronousChannelGroup = AsynchronousChannelGroup.withCachedThreadPool(Executors.newCachedThreadPool(), 4);
            asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open(asynchronousChannelGroup);
            int port = 9000;
            InetSocketAddress socketAddress = new InetSocketAddress("localhost", port);
            asynchronousServerSocketChannel.bind(socketAddress);
    
            out.println("Starting listening on port " + port);
            // add handler
            asynchronousServerSocketChannel.accept(asynchronousServerSocketChannel, new CompletionHandler<AsynchronousSocketChannel, Object>() {
                @Override
                public void completed(AsynchronousSocketChannel asynchronousSocketChannel, Object o) {
    
                    try {
                        out.println("connect from : " + asynchronousSocketChannel.getRemoteAddress());
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    // accept next connection
                    asynchronousServerSocketChannel.accept(asynchronousServerSocketChannel, this);
                    while (true) {
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        Future<Integer> future = asynchronousSocketChannel.read(byteBuffer);
                        try {
                            future.get();
                            String message = new String(byteBuffer.array()).trim();
                            out.println("Receive message: " + message);
                            if (message.equals("quit")) {
                                out.println("close client: " + asynchronousSocketChannel.getRemoteAddress());
                                asynchronousSocketChannel.close();
                                break;
                            }
    
                            ByteBuffer returnByteBuffer = ByteBuffer.wrap("receive your message".getBytes());
                            Future<Integer> returnFuture = asynchronousSocketChannel.write(returnByteBuffer);
                            returnFuture.get();
    
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        } catch (ExecutionException e) {
                            e.printStackTrace();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }
    
                @Override
                public void failed(Throwable throwable, Object o) {
                    out.println("error to accept: " + throwable.getMessage());
                }
            });
            asynchronousChannelGroup.awaitTermination(Integer.MAX_VALUE, TimeUnit.MINUTES);
        }
    
        public static void main(String[] args) throws IOException, InterruptedException {
            Server server = new Server();
            server.start();
        }
    }
    
    

    Netty实现

    Netty是java中使用很广泛的库,既可以实现NIO也可以实现AIO,还是针对上面的例子来实现一下

    server端

    package tech.sohocoder.netty;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    import static java.lang.System.out;
    
    public class Server {
    
        private void start() throws InterruptedException {
            EventLoopGroup bossEventLoopGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerEventLoopGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossEventLoopGroup, workerEventLoopGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                // add handler into pipeline
                                socketChannel.pipeline()
                                        .addLast(new StringDecoder())
                                        .addLast(new StringEncoder())
                                        .addLast(new ServerHandler());
                            }
                        });
    
                ChannelFuture channelFuture = serverBootstrap.bind(9000).sync();
                out.println("listening...");
                channelFuture.channel().closeFuture().sync();
            }finally {
               bossEventLoopGroup.shutdownGracefully();
            }
    
        }
    
        public static void main(String[] args) throws InterruptedException {
            Server server = new Server();
            server.start();
        }
    
    }
    

    这里面需要使用到ServerHandler,具体代码如下

    package tech.sohocoder.netty;
    
    import io.netty.channel.ChannelDuplexHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelPromise;
    
    import java.net.SocketAddress;
    
    import static java.lang.System.out;
    
    public class ServerHandler extends ChannelDuplexHandler {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            out.println("Receive message: " + msg);
            String message = "receive your message";
            ctx.writeAndFlush(message);
        }
    
        @Override
        public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
            super.connect(ctx, remoteAddress, localAddress, promise);
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            out.println("connect from: " + ctx.channel().remoteAddress().toString());
            super.channelActive(ctx);
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            out.println("close connection: " + ctx.channel().remoteAddress().toString());
            super.channelInactive(ctx);
        }
    
    }
    

    client端也用netty写一下

    package tech.sohocoder.aio;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.net.InetSocketAddress;
    import java.net.SocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SocketChannel;
    
    import static java.lang.System.out;
    
    public class Client {
    
        private SocketChannel socketChannel;
    
        public void start() throws IOException {
            socketChannel = SocketChannel.open();
            SocketAddress socketAddress = new InetSocketAddress("localhost", 9000);
            socketChannel.connect(socketAddress);
            if(socketChannel.isConnected()) {
                out.println("connect to " + socketChannel.getRemoteAddress());
            }
    
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
            for (;;) {
                final String input = in.readLine();
                final String line = input != null ? input.trim() : null;
                if (null == line || line.isEmpty()) { // skip `enter` or `enter` with spaces.
                    continue;
                }
    
                ByteBuffer byteBuffer = ByteBuffer.wrap(line.getBytes());
                socketChannel.write(byteBuffer);
    
                if(line.equals("quit")) {
                    out.println("quit!");
                    socketChannel.close();
                    break;
                }
    
                ByteBuffer returnByteBuffer = ByteBuffer.allocate(1024);
                socketChannel.read(returnByteBuffer);
                String message = new String(returnByteBuffer.array()).trim();
                out.println("Receive message: " + message);
            }
        }
    
        public static void main(String[] args) throws IOException {
            Client client = new Client();
            client.start();
        }
    }
    
    

    同样要实现一个ClientHandler

    package tech.sohocoder.netty;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    import static java.lang.System.out;
    
    public class ClientHandler extends SimpleChannelInboundHandler<String> {
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
            out.println("Receive message: " + s);
        }
    }
    
    

    相关文章

      网友评论

          本文标题:Java IO, NIO, AIO和Netty

          本文链接:https://www.haomeiwen.com/subject/ywmhwctx.html