美文网首页
IO模型代码实例

IO模型代码实例

作者: 南园故剑00 | 来源:发表于2020-06-08 18:44 被阅读0次
    package com.gupao.edu.vip.bio;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    /**
     * @description: BIO 服务端源码
     *
     * nc 实现任意TCP/UDP端口的侦听,nc可以作为server以TCP或UDP方式侦听指定端口
     *  nc 192.168.74.1 7777  通过nc去访问192.168.74.1主机的7777端口
     *
     * @date : 2020/1/3 10:57
     * @author: zwz
     */
    @Slf4j
    public class ServerDemo {
    
        //默认的端口号
        private static final int DEFAULT_PORT = 7777;
    
        //单例的serverSocket
        private static ServerSocket serverSocket;
    
        public static void main(String[] args) throws IOException {
            start();
        }
    
        public static void start() throws IOException {
            start(DEFAULT_PORT);
        }
    
        public synchronized static void start(int port) throws IOException {
            if (serverSocket != null) {
                return;
            }
    
            try {
                serverSocket = new ServerSocket(port);
    
                System.out.println("step1: new ServerSocket(port)");
                log.info("服务端已启动,端口号:" + port);
                System.out.println(("服务端已启动,端口号:" + port));
                //自旋
                while (true) {
    
                    //只能接受一次,while true也没卵用
                    Socket socket = serverSocket.accept();
    
                    //这里阻塞
                    System.out.println("step2: socket " + socket.getPort());
    
                    InputStream inputStream = socket.getInputStream();
                    BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
                    System.out.println("step3 " + reader.readLine());
                }
            } finally {
                if (serverSocket != null) {
                    log.info("服务端已关闭");
                    System.out.println(("服务端已关闭"));
                    serverSocket.close();
                    serverSocket = null;
                }
            }
        }
    }
    
    package com.gupao.edu.vip.nio.channel;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    import java.nio.charset.StandardCharsets;
    import java.util.LinkedList;
    import java.util.Random;
    
    /**
     * @description: 通过JDK实现的NIO
     * @date : 2020/1/3 23:30
     * @author: zwz
     */
    public class ServiceSocketChannelNIODemo {
    
        public static void main(String[] args) {
    
            InetSocketAddress localAddress = new InetSocketAddress(8571);
            LinkedList<SocketChannel> clients = new LinkedList<>();
    
            Charset utf8 = StandardCharsets.UTF_8;
            ServerSocketChannel ssc = null;
            Random random = new Random();
            try {
                //创建服务器通道
                ssc = ServerSocketChannel.open();
    
                //配置通道为非阻塞
                ssc.configureBlocking(false);
    
                //设置监听服务器的端口,设置最大连接缓冲数为100
                ssc.bind(localAddress, 100);
    
                while (true) {
                    Thread.sleep(1000);
                    //不会阻塞
                    SocketChannel client = ssc.accept();
                    if (client == null) {
                        System.out.println("null ...");
                    } else {
                        client.configureBlocking(false);
                        int port = client.socket().getPort();
                        System.out.println("clinet port: " + port);
                        clients.add(client);
                    }
                    System.out.println("没有阻塞");
    
                    //缓冲区可以在堆里,也可以在堆外
                    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(4096);
    
                    //串行化
    
                    //资源浪费
                    //每个连接都是一条路,每条路都要看一眼
                    for (SocketChannel c : clients) {
                        //不会阻塞
                        int num = c.read(byteBuffer);
                        if (num > 0) {
    
                            /*
                             * 在写模式下调用flip() 之后,buffer从写模式变为读模式
                             * 在调用flip()之后,读/写指针指到缓冲区头部,并设置了最多只能读出之前写入的数据长度(而不是整个缓存的容量大小)
                             *
                             *     public final Buffer flip() {
                                    limit = position;
                                    position = 0;
                                    mark = -1;
                                    return this;
                                }
                             */
                            byteBuffer.flip();
    
                            byte[] bytes = new byte[byteBuffer.limit()];
                            byteBuffer.get(bytes);
                            String b = new String(bytes);
    
    //                        CharBuffer cb = utf8.decode(byteBuffer);
    //                        String b = new String(cb.array());
                            System.out.println(c.socket().getPort() + " : " + b);
                            byteBuffer.clear();
                        }
                    }
                }
    
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    package com.gupao.edu.vip.nio.channel;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    import java.nio.charset.StandardCharsets;
    import java.util.Iterator;
    import java.util.Random;
    import java.util.Set;
    
    /**
     * @description: JDK 多路复用单线程
     * ServerSocketChannel和SocketChannel 注册到同一个selector(一个线程对应一个selector)上
     * 多路就是多个 channel
     * @date : 2020/1/3 23:30
     * @author: zwz
     */
    public class ServiceSocketMuliSingleThreaDemo {
    
        private static InetSocketAddress localAddress;
    
        public static void main(String[] args) {
    
            localAddress = new InetSocketAddress(8087);
            Charset utf8 = StandardCharsets.UTF_8;
            ServerSocketChannel ssc = null;
            Selector selector = null;
            Random random = new Random();
            try {
                //创建选择器
                selector = Selector.open();
    
                //创建服务器通道
                ssc = ServerSocketChannel.open();
    
                //配置通道为非阻塞
                ssc.configureBlocking(false);
    
                //设置监听服务器的端口,设置最大连接缓冲数为100
                ssc.bind(localAddress, 100);
    
                //服务器通道只能对tcp链接事件感兴趣.ssc注册到selector上
                SelectionKey register = ssc.register(selector, SelectionKey.OP_ACCEPT);
    
                System.out.println("注册后selector.keys的数量:" + selector.keys().size()); // 1
    
            } catch (IOException e) {
                e.printStackTrace();
            }
    
            System.out.println("server start with address" + localAddress);
    
            //服务器线程被中断后退出
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    //询问内核有没有事件
                    int n = selector != null ? selector.select(0) : 0;
                    if (n == 0) {
                        continue;
                    }
    
                    //从多路复用器取出有效的key
                    Set<SelectionKey> keySet = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = keySet.iterator();
                    SelectionKey key = null;
    
                    while (iterator.hasNext()) {
                        key = iterator.next();
                        //防止下次select方法返回已处理过的通道
                        iterator.remove();
    
                        //若发现异常,说明客户端连接出现问题,但服务器要保证正常
                        try {
                            //ssc通道只能对链接事件感兴趣
                            if (key.isAcceptable()) {  //是否可以连接。有新的客户端连接
                                ServerSocketChannel ssc1 = (ServerSocketChannel) key.channel();
                                //accept方法会返回一个普通通道,每个通道在内核中都对应一个socket缓冲区
                                SocketChannel sc = ssc1.accept();
                                sc.configureBlocking(false);
    
                                //向选择器注册这个通道和普通通道感兴趣的事件,同时提供这个新通道相关的缓冲区
                                int interestSet = SelectionKey.OP_READ;
                                ByteBuffer byteBuffer = ByteBuffer.allocate(8192);
                                //将channel和buffer一对一绑定
                                sc.register(selector, interestSet, byteBuffer);
                                System.out.println("客户端连接后 注册后selector.keys的数量:" + selector.keys().size());  // 2,3,4
    
                                System.out.println("---------------------");
                                System.out.println("accept from " + sc.getRemoteAddress());
                                System.out.println("---------------------");
                            }
    
                            //普通通道感兴趣读事件且有数据可读
                            if (key.isReadable()) {
                                System.out.println("一般数据到达-------------");
                                //通过selectionKey获取对应的通道
                                SocketChannel sc = (SocketChannel) key.channel();
                                //通过selectionKey获取通道对应的缓冲区
                                ByteBuffer buffers = (ByteBuffer) key.attachment();
                                buffers.clear();
                                int read = 0;
                                try {
                                    while (true) {
                                        read = sc.read(buffers);
                                        if (read > 0) {
                                            buffers.flip();
                                            byte[] bytes = new byte[buffers.limit()];
                                            buffers.get(bytes);
                                            String b = new String(bytes);
    //                                            CharBuffer cb = utf8.decode(buffers);
                                            System.out.println("读取的数据是 " + b);
                                            while (buffers.hasRemaining()) {
                                                sc.write(buffers);
                                            }
    
                                            buffers.clear();
                                        } else if (read == 0) {
                                            break;
                                        }
                                        //                 System.out.println("不加 -1 ,疯涨输出----------");
                                        else { // -1 客户端close wait 死循环CPU 100%
                                            sc.close();
                                            break;
                                        }
                                    }
                                } catch (Exception e) {
                                    e.printStackTrace();
                                }
                            }
    
                        } catch (IOException e) {
                            System.out.println("server encounter client error");
                            //若客户端连接出现异常,从selector中移除这个key
                            key.cancel();
                            key.channel().close();
                        }
                    }
    
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (selector != null) {
                        selector.close();
                    }
                } catch (IOException e) {
                    System.out.println("selector close failed");
                }
            }
        }
    
    }
    
    package com.gupao.edu.vip.nio.channel;
    
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @description: JDK 多路复用多线程
     * @date : 2020/1/3 23:30
     * @author: zwz
     */
    public class ServiceSocketMuliThreadsDemo {
    
    
        public static void main(String[] args) {
            ServiceSocketMuliThreadsDemo service = new ServiceSocketMuliThreadsDemo();
            service.initServer();
    
            NioThread t1 = new NioThread(service.selector1, 2);
            NioThread t2 = new NioThread(service.selector2);
            NioThread t3 = new NioThread(service.selector3);
    
            t1.start();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            t2.start();
            t3.start();
    
            System.out.println("服务器启动了--------------");
    
            try {
                System.in.read();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        private ServerSocketChannel server = null;
        private Selector selector1 = null;
        private Selector selector2 = null;
        private Selector selector3 = null;
    
        private InetSocketAddress localAddress = new InetSocketAddress(9999);
    
        public void initServer() {
            try {
                server = ServerSocketChannel.open();
                server.configureBlocking(false);
                server.bind(localAddress);
    
                selector1 = Selector.open();
                selector2 = Selector.open();
                selector3 = Selector.open();
    
                //这里将ServerSocketChannel 注册到 selector1 上
                server.register(selector1, SelectionKey.OP_ACCEPT);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        static class NioThread extends Thread {
            Selector selector = null;
            static int selectors = 0;
            int id = 0;
            boolean isBoss = false;
    
            //静态变量:类变量。不论创建多少对象只初始化一次。
            static BlockingQueue<SocketChannel>[] queue;
    
            static AtomicInteger idx = new AtomicInteger();
    
            NioThread(Selector sel, int n) {
                this.isBoss = true;
                this.selector = sel;
                selectors = n;
                int id = 0;
                queue = new LinkedBlockingQueue[selectors];
                for (int i = 0; i < n; i++) {
                    queue[i] = new LinkedBlockingQueue<>();
                }
                System.out.println("BOSS 启动");
            }
    
            NioThread(Selector sel) {
                this.selector = sel;
                id = idx.getAndIncrement() % selectors;
                System.out.println("WORKER:" + id + " 启动");
            }
    
            @Override
            public void run() {
                try {
                    while (true) {
                        while (selector.select(10) > 0) {  //阻塞10ms
                            Set<SelectionKey> selectionKeys = selector.selectedKeys();
                            Iterator<SelectionKey> iterator = selectionKeys.iterator();
                            while (iterator.hasNext()) {
                                SelectionKey key = iterator.next();
                                iterator.remove();
                                if (key.isAcceptable()) {
                                    acceptHandler(key);
                                } else if (key.isReadable()) {
                                    readHandler(key);
                                }
                            }
                        }
                        //boss不参与。只有worker根据id分配
                        if (!isBoss && !queue[id].isEmpty()) {
                            ByteBuffer buffer = ByteBuffer.allocate(8192);
                            SocketChannel client = queue[id].take();
                            //注册在自己的 selector 上
                            client.register(selector, SelectionKey.OP_READ, buffer);
                            System.out.println("----------------------------");
                            System.out.println("新客户端:" + client.socket().getPort() + " 分配到worker:" + id);
                            System.out.println("----------------------------");
                        }
                    }
                } catch (IOException | InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            private void readHandler(SelectionKey key) throws IOException {
                System.out.println("一般数据到达-------------");
                //通过selectionKey获取对应的通道
                SocketChannel sc = (SocketChannel) key.channel();
                //通过selectionKey获取通道对应的缓冲区
                ByteBuffer buffers = (ByteBuffer) key.attachment();
                buffers.clear();
                int read = 0;
                try {
                    while (true) {
                        read = sc.read(buffers);
                        if (read > 0) {
                            buffers.flip();
                            byte[] bytes = new byte[buffers.limit()];
                            buffers.get(bytes);
                            String b = new String(bytes);
    //                                            CharBuffer cb = utf8.decode(buffers);
                            System.out.println("读取的数据是 " + b);
                            while (buffers.hasRemaining()) {
                                sc.write(buffers);
                            }
    
                            buffers.clear();
                        } else if (read == 0) {
                            break;
                        }
                        //                 System.out.println("不加 -1 ,疯涨输出----------");
                        else { // -1 客户端close wait 死循环CPU 100%
                            sc.close();
                            break;
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
            private void acceptHandler(SelectionKey key) throws IOException {
                System.out.println("可读-------------");
                //通过selectionKey获取对应的通道
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                SocketChannel client = ssc.accept();
                client.configureBlocking(false);
                int num = idx.getAndIncrement() % selectors;  //0,1
                queue[num].add(client);
            }
        }
    
    }
    

    相关文章

      网友评论

          本文标题:IO模型代码实例

          本文链接:https://www.haomeiwen.com/subject/kycdtktx.html