美文网首页
JAVANIO -选择器02 Selector原理和使用

JAVANIO -选择器02 Selector原理和使用

作者: 贪睡的企鹅 | 来源:发表于2019-07-23 13:39 被阅读0次

    Selector 概述

    Selector 一般称为选择器,用来作为SelectableChannel通道的多路复用器。SelectableChannel类型通道可以被注册到多路复用器,通过多路复用器监听感兴趣的事件,这样就可以通过Selector实现单个线程可以管理多个SelectableChannel通道,从而管理多个网络连接。

    image

    监听事件

    当SelectableChannel通道注册到Selector多路复用器时需要指定感兴趣的事件。

    //socketChannel注册到选择器中监听读取到达事件
    socketChannel.register(selector, SelectionKey.OP_READ);
    

    事件类型定义在SelectionKey类的静态常量中,使用二进制位运算得到的整数。

    public abstract class SelectionKey {
    
    ...省略代码
    //一个通道已准备好读取
    public static final int OP_READ = 1 << 0;
    
    //一个通道已准备好写入
    public static final int OP_WRITE = 1 << 2;
    
    //与远程服务器建立连接。 
    public static final int OP_CONNECT = 1 << 3;
    
    // ServerSocketChannel接受连接。 
    public static final int OP_ACCEPT = 1 << 4;
    

    如果SelectableChannel通道对多个选择键感兴趣,可以使用‘|’位运算后在注册到选择器

    int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE; 
    socketChannel.register(selector, interestSet);
    

    不同SelectableChannel子类(通道)支持的SelectionKey(选择键)不同。
    可以通过validOps函数获取其支持的监听事件

        //ServerSocketChannel 支持的选择键
        public final int validOps() {
            return SelectionKey.OP_ACCEPT;
        }
        
        //socketChannel 支持的选择键
        public final int validOps() {
            return (SelectionKey.OP_READ
                    | SelectionKey.OP_WRITE
                    | SelectionKey.OP_CONNECT);
        }
    

    SelectionKey

    一个Selector中可以注册多通道,不同通道在选择器中被封装成为SelectionKey对象。

    public class SelectionKeyImpl extends AbstractSelectionKey {
        /** 通道 **/
        final SelChImpl channel;
        /** 多路复用器**/
        public final SelectorImpl selector;
        private int index;
        /** 感兴趣的事件 **/
        private volatile int interestOps;
        /** 就绪的事件**/
        private int readyOps;
    

    从SelectionKeyImpl定义可以看出相同通道注册多次感兴趣选择键对应到Selector中SelectionKey对象是同一个。

    SocketChannel ch1 = serverSocketChannel.accept();
    socketChannel.configureBlocking(false);
    serverSocketChannel.close();
    SelectionKey register3 = socketChannel.register(selector, SelectionKey.OP_READ);
    SelectionKey register4 = socketChannel.register(selector, SelectionKey.OP_WRITE);
    
    SocketChannel ch2 = serverSocketChannel2.accept();
    socketChannel.configureBlocking(false);
    SelectionKey register3 = socketChannel.register(selector, SelectionKey.OP_READ);
    
    image

    事件管理机制

    Seclect内部存在三个集合来管理SelectionKey。监听事件通道集合(publicKeys),通道事件就绪集合(publicSelectedKeys),取消监听通道(cancelKeys)

    • 通过SelectableChannel.register方法可以将Channel通道封装成SelectionKey对象添加到Seclect内部publicKeys集合中。
    • 通过Seclect.keys方法获取集合publicKeys集合,但无法手动修改。
    • 如果某个通道的事件到达,会将通道对应SelectionKey添加到publicSelectedKeys集合中。用户线程观测SelectionKey集合中SelectionKey对已到达的事件作处理。
    • 通过selectedKeys方法获取publicSelectedKeys集合,每次通道处理需要手动删除。避免重复处理。
    • 如果通道事件到达,用户未处理就将SelectionKey从publicSelectedKeys集合中删除。那么下一次调用select方法时会重新将从publicSelectedKeys集合添加到publicSelectedKeys集合中。
    • 如果通道事件到达,用户处理后会在下一次调用select方法时将通道对应的SelectionKey从publicKeys集合删除
    image

    案例

    • 通过SelectableChannel.register方法可以将Channel通道封装成SelectionKey对象添加到keysSet集合中。并通过Seclect。keys方法获取集合publicKeys
        /**
         * 通过register,可以将Channel通道封装成SelectionKey对象添加到keysSet集合中
         */
        @Test
        public void test_keysSet_add() throws Exception {
    
            /** 实例化一个选择器对象 **/
            Selector selector = Selector.open();
    
            /** 创建服务器套接字通道 ServerSocketChannel **/
            ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
            /** 绑定监听 InetSocketAddress **/
            serverSocketChannel1.bind(new InetSocketAddress("localhost", 8888));
            /** 设置为非阻塞IO模型 **/
            serverSocketChannel1.configureBlocking(false);
            /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
            SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);
    
            /** 创建服务器套接字通道 ServerSocketChannel **/
            ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
            /** 绑定监听 InetSocketAddress **/
            serverSocketChannel2.bind(new InetSocketAddress("localhost", 7777));
            /** 设置为非阻塞IO模型 **/
            serverSocketChannel2.configureBlocking(false);
            /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
            SelectionKey register2 = serverSocketChannel2.register(selector, SelectionKey.OP_ACCEPT);
    
            /** 获取publicKeys集合 **/
            Set<SelectionKey> selectionKeys_ = selector.keys();
            /** 获取注册到选择器中Channel通道的数量**/
            System.out.println("selector.keys.size()" + selector.keys());
    
            /** 遍历SelectionKey**/
            Iterator<SelectionKey> iterator_ = selectionKeys_.iterator();
            while (iterator_.hasNext()) {
                SelectionKey key = iterator_.next();
                System.out.println(key);
            }
        }
    
    
    register1sun.nio.ch.SelectionKeyImpl@7f560810
    register2sun.nio.ch.SelectionKeyImpl@69d9c55
    selector.keys.size()[sun.nio.ch.SelectionKeyImpl@7f560810, sun.nio.ch.SelectionKeyImpl@69d9c55]
    sun.nio.ch.SelectionKeyImpl@7f560810
    sun.nio.ch.SelectionKeyImpl@69d9c55    
    
    • 同一个通道注册多次事件,其注册到keysSet中SelectionKey是同一个对象
        @Test
        public void test_keysSet_add2_server() throws Exception {
    
            /** 实例化一个选择器对象 **/
            Selector selector = Selector.open();
    
            /** 创建服务器套接字通道 ServerSocketChannel **/
            ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
            /** 绑定监听 InetSocketAddress **/
            serverSocketChannel1.bind(new InetSocketAddress("localhost", 7777));
            /** 设置为非阻塞IO模型 **/
            serverSocketChannel1.configureBlocking(false);
            /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
            SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);
    
            /** 创建服务器套接字通道 ServerSocketChannel **/
            ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
            /** 绑定监听 InetSocketAddress **/
            serverSocketChannel2.bind(new InetSocketAddress("localhost", 8888));
            /** 设置为非阻塞IO模型 **/
            serverSocketChannel2.configureBlocking(false);
            /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
            SelectionKey register2 = serverSocketChannel2.register(selector, SelectionKey.OP_ACCEPT);
    
    
            boolean is_Run=true;
            while (is_Run) {
    
                /** 阻塞等待事件到达**/
                selector.select();
    
                /** 获取到达事件SelectionKey集合**/
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
    
                /** 遍历SelectionKey**/
                while (iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    /** 判断是否是OP_ACCEPT事件**/
                    if(key.isAcceptable()){
                        /** 从SelectionKey获取对应通道ServerSocketChannel**/
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
                        /** 获取SocketChannel**/
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        socketChannel.configureBlocking(false);
                        serverSocketChannel.close();
                        SelectionKey register3 = socketChannel.register(selector, SelectionKey.OP_READ);
                        SelectionKey register4 = socketChannel.register(selector, SelectionKey.OP_WRITE);
                        // 同一个通道注册多次事件,返回的都是同一个 SelectionKey
                        System.out.println(register3==register4);
                    }
                }
            }
        }
    
    
        @Test
        public void test_keysSet_add2_client() throws Exception {
            SocketChannel socketChannel = SocketChannel.open();
            SocketChannel socketChannel2 = SocketChannel.open();
            try {
                socketChannel.configureBlocking(true);
                socketChannel2.configureBlocking(true);
                socketChannel.connect(new InetSocketAddress("localhost", 8888));
                socketChannel2.connect(new InetSocketAddress("localhost", 7777));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
       
    true
    true    
    

    无法手动对publicKeys集合做修改,如修改会抛出UnsupportedOperationException异常

        @Test
        public void test_keysSet_del1() throws Exception {
    
            /** 实例化一个选择器对象 **/
            Selector selector = Selector.open();
    
            /** 创建服务器套接字通道 ServerSocketChannel **/
            ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
            /** 绑定监听 InetSocketAddress **/
            serverSocketChannel1.bind(new InetSocketAddress("localhost", 8888));
            /** 设置为非阻塞IO模型 **/
            serverSocketChannel1.configureBlocking(false);
            /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
            SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);
    
            /** 创建服务器套接字通道 ServerSocketChannel **/
            ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
            /** 绑定监听 InetSocketAddress **/
            serverSocketChannel2.bind(new InetSocketAddress("localhost", 7777));
            /** 设置为非阻塞IO模型 **/
            serverSocketChannel2.configureBlocking(false);
            /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
            SelectionKey register2 = serverSocketChannel2.register(selector, SelectionKey.OP_ACCEPT);
    
            /** 获取publicKeys集合 **/
            Set<SelectionKey> selectionKeys_ = selector.keys();
            //手动删除会抛出异常
            selectionKeys_.remove(register2);
        }
    
    • 如果注册在选择器中通道对应SelectionKey被关闭,选择器会在publicKeys集合中标记这个SelectionKey,并在下次调用selector.select()方法后从publicKeys集合中删除
        @Test
        public void test_keysSet_del3_server() throws Exception {
    
            /** 实例化一个选择器对象 **/
            Selector selector = Selector.open();
    
            /** 创建服务器套接字通道 ServerSocketChannel **/
            ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
            /** 绑定监听 InetSocketAddress **/
            serverSocketChannel1.bind(new InetSocketAddress("localhost", 8888));
            /** 设置为非阻塞IO模型 **/
            serverSocketChannel1.configureBlocking(false);
            /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
            SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);
    
            /** 创建服务器套接字通道 ServerSocketChannel **/
            ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
            /** 绑定监听 InetSocketAddress **/
            serverSocketChannel2.bind(new InetSocketAddress("localhost", 7777));
            /** 设置为非阻塞IO模型 **/
            serverSocketChannel2.configureBlocking(false);
            /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
            SelectionKey register2 = serverSocketChannel2.register(selector, SelectionKey.OP_ACCEPT);
    
            // 如果注册在选择器中通道对应SelectionKey被关闭,select()方法执行后将关闭通道SelectionKey从集合中删除
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(15);
                        Set<SelectionKey> selectionKeys = selector.keys();
                        System.out.println(selectionKeys);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
    
            boolean is_Run=true;
            while (is_Run) {
                // 如果注册在选择器中通道对应SelectionKey被关闭,select()方法执行前publicKeys集合
                Set<SelectionKey> selectionKeys2 = selector.keys();
                System.out.println(selectionKeys2);
                /** 阻塞等待事件到达**/
                selector.select();
    
                /** 获取到达事件SelectionKey集合**/
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
    
                /** 遍历SelectionKey**/
                while (iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    /** 判断是否是OP_ACCEPT事件**/
                    if(key.isAcceptable()){
                        /** 从SelectionKey获取对应通道ServerSocketChannel**/
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
                        /** 获取SocketChannel**/
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        //如果注册在选择器中通道对应SelectionKey被关闭,选择器会在publicKeys集合中标记这个SelectionKey,并在下次selector.select()方法后删除
                        key.cancel();
                        System.out.println("selector.keys pre_del()");
                    }
                }
            }
        }
    
        @Test
        public void test_keysSet_del3_client() throws Exception {
            SocketChannel socketChannel = SocketChannel.open();
            try {
                socketChannel.configureBlocking(true);
                socketChannel.connect(new InetSocketAddress("localhost", 8888));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    [sun.nio.ch.SelectionKeyImpl@69d9c55, sun.nio.ch.SelectionKeyImpl@13a57a3b]
    selector.keys pre_del()
    [sun.nio.ch.SelectionKeyImpl@69d9c55, sun.nio.ch.SelectionKeyImpl@13a57a3b]
    [sun.nio.ch.SelectionKeyImpl@13a57a3b]    
    
    • 如果注册在选择器中通道被关闭时,选择器会在publicKeys集合中标记这个通道对应的SelectionKey,并在下次调用selector.select()方法后从publicKeys集合中删除
     @Test
        public void test_keysSet_del2_server() throws Exception {
    
            /** 实例化一个选择器对象 **/
            Selector selector = Selector.open();
    
            /** 创建服务器套接字通道 ServerSocketChannel **/
            ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
            /** 绑定监听 InetSocketAddress **/
            serverSocketChannel1.bind(new InetSocketAddress("localhost", 8888));
            /** 设置为非阻塞IO模型 **/
            serverSocketChannel1.configureBlocking(false);
            /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
            SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);
    
            /** 创建服务器套接字通道 ServerSocketChannel **/
            ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
            /** 绑定监听 InetSocketAddress **/
            serverSocketChannel2.bind(new InetSocketAddress("localhost", 7777));
            /** 设置为非阻塞IO模型 **/
            serverSocketChannel2.configureBlocking(false);
            /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
            SelectionKey register2 = serverSocketChannel2.register(selector, SelectionKey.OP_ACCEPT);
    
            /** 用来查看通道被关闭后并调用selector.select()方法后keys集合会将关闭通道SelectionKey从集合中删除 **/
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        TimeUnit.SECONDS.sleep(15);
                        Set<SelectionKey> selectionKeys = selector.keys();
                        System.out.println(selectionKeys);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
    
            boolean is_Run=true;
            while (is_Run) {
                // 在通道关闭后,select()方法执行前答应下当前keys集合
                Set<SelectionKey> selectionKeys2 = selector.keys();
                System.out.println(selectionKeys2);
                /** 阻塞等待事件到达**/
                selector.select();
    
                /** 获取到达事件SelectionKey集合**/
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
    
                /** 遍历SelectionKey**/
                while (iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    /** 判断是否是OP_ACCEPT事件**/
                    if(key.isAcceptable()){
                        /** 从SelectionKey获取对应通道ServerSocketChannel**/
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
                        /** 获取SocketChannel**/
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        //选择器中通道被关闭后,选择器会在keysSet集合中标记,并在下次selector.select()方法后删除
                        serverSocketChannel.close();
                        System.out.println("selector.keys pre_del()");
                    }
                }
            }
        }
    
        @Test
        public void test_keysSet_del2_client() throws Exception {
            SocketChannel socketChannel = SocketChannel.open();
            try {
                socketChannel.configureBlocking(true);
                socketChannel.connect(new InetSocketAddress("localhost", 8888));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    [sun.nio.ch.SelectionKeyImpl@69d9c55, sun.nio.ch.SelectionKeyImpl@13a57a3b]
    selector.keys pre_del()
    [sun.nio.ch.SelectionKeyImpl@69d9c55, sun.nio.ch.SelectionKeyImpl@13a57a3b]
    [sun.nio.ch.SelectionKeyImpl@13a57a3b]    
    
    • 在处理完成通道事件后需要手动从publicSelectedKeys集合中删除否则导致重复操作发生
     @Test
        public void test_publicSelectedKeys2_server() throws Exception {
            /** 实例化一个选择器对象 **/
            Selector selector = Selector.open();
    
            /** 创建服务器套接字通道 ServerSocketChannel **/
            ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
            /** 绑定监听 InetSocketAddress **/
            serverSocketChannel1.bind(new InetSocketAddress("localhost", 8888));
            /** 设置为非阻塞IO模型 **/
            serverSocketChannel1.configureBlocking(false);
            /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
            SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);
    
    
            /** 创建服务器套接字通道 ServerSocketChannel **/
            ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
            /** 绑定监听 InetSocketAddress **/
            serverSocketChannel2.bind(new InetSocketAddress("localhost", 7777));
            /** 设置为非阻塞IO模型 **/
            serverSocketChannel2.configureBlocking(false);
            /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
            SelectionKey register2 = serverSocketChannel2.register(selector, SelectionKey.OP_ACCEPT);
    
            Set<SelectionKey> selectionKeys=new HashSet<SelectionKey>();
            boolean is_Run=true;
            while (is_Run) {
    
                /** 阻塞等待事件到达**/
                selector.select();
                /** 获取到达事件SelectionKey集合**/
                selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
    
                /** 遍历SelectionKey**/
                while (iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    //在处理完成通道事件后需要手动从publicSelectedKeys集合中删除否则导致重复操作发送
                    iterator.remove();
                    /** 判断是否是OP_ACCEPT事件**/
                    if(key.isAcceptable()){
                        /** 从SelectionKey获取对应通道ServerSocketChannel**/
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
                        SocketChannel socketChannel = serverSocketChannel.accept();
    
                        if(socketChannel==null){
                            System.out.println("重复连接");
                        }
                        InetSocketAddress localAddress =   (InetSocketAddress) serverSocketChannel.getLocalAddress();
                        System.out.println(localAddress.getPort()+"被连接了");
                        System.out.println("isAcceptable");
                    }
                }
            }
        }
    
        @Test
        public void test_publicSelectedKeys2_client1() throws Exception {
            SocketChannel socketChannel = SocketChannel.open();
            try {
                socketChannel.configureBlocking(true);
                socketChannel.connect(new InetSocketAddress("localhost", 8888));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @Test
        public void test_publicSelectedKeys2_client2() throws Exception {
            SocketChannel socketChannel = SocketChannel.open();
            try {
                socketChannel.configureBlocking(true);
                socketChannel.connect(new InetSocketAddress("localhost", 7777));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        
    1 注释掉iterator.remove();,顺序执行test_publicSelectedKeys2_server,test_publicSelectedKeys2_client1,test_publicSelectedKeys2_client2  
    
    8888被连接了
    isAcceptable
    重复连接
    8888被连接了
    isAcceptable
    7777被连接了
    isAcceptable
    
    2 打开注释iterator.remove();顺序执行test_publicSelectedKeys2_server,test_publicSelectedKeys2_client1,test_publicSelectedKeys2_client
    
    8888被连接了
    isAcceptable
    7777被连接了
    isAcceptable
    
    
    

    不同的事件处理

    OP_READ

    • 如果通道注册了OP_READ事件,当服务端或客户端收到对方数据请求时,会将通道对应SelectionKey添加到publicSelectedKeys集合,用户线程遍历需SelectedKeys集合获取通道,调用read方法读取数据。读取数据完毕,就可以将SelectionKey从publicSelectedKeys集合中删除。

    • 如果不处理直接将通道对应SelectionKey从publicSelectedKeys集合中删除。那么下一次调用select方法时会重新将从publicSelectedKeys集合添加到publicSelectedKeys集合中。

    @Test
        public void test_publicSelectedKeys_server2() throws Exception {
            /** 实例化一个选择器对象 **/
            Selector selector = Selector.open();
    
            /** 创建服务器套接字通道 ServerSocketChannel **/
            ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
            /** 绑定监听 InetSocketAddress **/
            serverSocketChannel1.bind(new InetSocketAddress("localhost", 8888));
            /** 设置为非阻塞IO模型 **/
            serverSocketChannel1.configureBlocking(false);
            /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
            SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);
            Set<SelectionKey> selectionKeys=new HashSet<SelectionKey>();
            boolean is_Run=true;
            while (is_Run) {
    
                /** 阻塞等待事件到达**/
                System.out.println("selector.selectedKeys:"+selectionKeys);
                selector.select();
                System.out.println("selector.selectedKeys:"+selectionKeys);
                /** 获取到达事件SelectionKey集合**/
                selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
    
                /** 遍历SelectionKey**/
                while (iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    /** 判断是否是OP_ACCEPT事件**/
                    if(key.isAcceptable()){
                        /** 从SelectionKey获取对应通道ServerSocketChannel**/
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        socketChannel.configureBlocking(false);
                        socketChannel.register(selector,SelectionKey.OP_READ);
                        System.out.println("isAcceptable");
                    }else if(key.isReadable()){
                        SocketChannel socketChannel = (SocketChannel)key.channel();
                        //注释代码未打开,没有对请求连接事件处理,每次调用selector.selectedKeys()会将SelectionKey重新添加到publicSelectedKeys集合中
                        //ByteBuffer allocate = ByteBuffer.allocate(50);
                        //socketChannel.read(allocate);
                        //System.out.println(new String(allocate.array()));
                    }
                }
            }
        }
    
        @Test
        public void test_publicSelectedKeys_client2() throws Exception {
            SocketChannel socketChannel = SocketChannel.open();
            try {
                socketChannel.configureBlocking(true);
                socketChannel.connect(new InetSocketAddress("localhost", 8888));
                socketChannel.write(ByteBuffer.wrap("hello server".getBytes()));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    

    OP_WRITE

    如果通道注册了OP_READ事件,选择器会自动将通道对应SelectionKey添加到publicSelectedKeys集合,,用户线程遍历需SelectedKeys集合获取通道,调用write方法发送数据。发送数据完毕,就可以将SelectionKey从publicSelectedKeys集合中删除。不同下次调用select方法时会重新将从publicSelectedKeys集合添加到publicSelectedKeys集合中,也就是无法停止,这时我们只能通过设置SelectionKey.interestOps()重新设置事件,将OP_WRITE事件去掉,那么下次下次调用select方法时就不会重新添加到publicSelectedKeys集合中。

        @Test
        public void test_publicSelectedKeys_client3() throws Exception {
            /** 实例化一个选择器对象 **/
            Selector selector = Selector.open();
    
            SocketChannel socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            socketChannel.connect(new InetSocketAddress("localhost", 7777));
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
    
            boolean is_Run = true;
            while (is_Run) {
                /** 阻塞等待事件到达**/
                selector.select();
    
                /** 获取到达事件SelectionKey集合**/
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
    
                /** 遍历SelectionKey**/
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if (key.isConnectable()) {
                        socketChannel = (SocketChannel) key.channel();
                        while (!socketChannel.finishConnect()){
                        }
                        /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_READ(当客户端请求数据时事件到达被添加到selectedKeys集合中) **/
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    }
                    else if (key.isReadable()) {
                        socketChannel = (SocketChannel) key.channel();
                        ByteBuffer byteBuffer = ByteBuffer.allocate(100);
                        int read = socketChannel.read(byteBuffer);
                        System.out.println(new String(byteBuffer.array()));
                    }
                }
            }
        }
    

    OP_ACCEPT

    • 如果通道注册了OP_ACCEPT事件,当客户端收向服务端请求连接时,会将通道对应SelectionKey添加到publicSelectedKeys集合,用户线程遍历需SelectedKeys集合获取通道,调用serverSocketChannel.accept()处理后就可以将SelectionKey从publicSelectedKeys集合中删除。

    • 如果不处理直接将通道对应SelectionKey从publicSelectedKeys集合中删除。那么下一次调用select方法时会重新将从publicSelectedKeys集合添加到publicSelectedKeys集合中。

        @Test
        public void test_publicSelectedKeys_server1() throws Exception {
            /** 实例化一个选择器对象 **/
            Selector selector = Selector.open();
    
            /** 创建服务器套接字通道 ServerSocketChannel **/
            ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
            /** 绑定监听 InetSocketAddress **/
            serverSocketChannel1.bind(new InetSocketAddress("localhost", 8888));
            /** 设置为非阻塞IO模型 **/
            serverSocketChannel1.configureBlocking(false);
            /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
            SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);
            Set<SelectionKey> selectionKeys=new HashSet<SelectionKey>();
            boolean is_Run=true;
            while (is_Run) {
    
                /** 阻塞等待事件到达**/
                System.out.println("selector.selectedKeys:"+selectionKeys);
                selector.select();
                System.out.println("selector.selectedKeys:"+selectionKeys);
                /** 获取到达事件SelectionKey集合**/
                selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
    
                /** 遍历SelectionKey**/
                while (iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    /** 判断是否是OP_ACCEPT事件**/
                    if(key.isAcceptable()){
                        /** 从SelectionKey获取对应通道ServerSocketChannel**/
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
                        //注释代码未打开,没有对请求连接事件处理,每次调用selector.selectedKeys()会将SelectionKey重新添加到publicSelectedKeys集合中
                        //serverSocketChannel.accept();
                        System.out.println("isAcceptable");
                    }
                }
            }
        }
    
        @Test
        public void test_publicSelectedKeys_client1() throws Exception {
            SocketChannel socketChannel = SocketChannel.open();
            try {
                socketChannel.configureBlocking(true);
                socketChannel.connect(new InetSocketAddress("localhost", 8888));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    

    select

    select()方法提供给用户线程判断是否存在就绪通道,如果不存在就绪通道则会阻塞当前线程,直到某个通道的事件到达。

    处理流程

    image

    需要注意如下特性

    • 线程因为select()导致的阻塞可以被interrupt()中断而释放

    • 线程因为select()导致的阻塞可以被close()中断而释放

    • 线程因为select()导致的阻塞可以被wakeup()中断而释放

    案例
    • select()阻塞被interrupt()唤醒
     /**
         * interrupt()中断函数可以用来释放所有selector.select()阻塞
         */
        @Test
        public void test_select() throws Exception {
    
            /** 实例化一个选择器对象 **/
            Selector selector = Selector.open();
            Thread currentThread = Thread.currentThread();
    
            /** 开启一个线程5s后调用close()方法关闭选择器**/
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
    
                    try {
                        TimeUnit.SECONDS.sleep(5);
                        currentThread.interrupt();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            thread.start();
    
            /** 创建服务器套接字通道 ServerSocketChannel **/
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    
            /** 绑定监听 InetSocketAddress **/
            serverSocketChannel.bind(new InetSocketAddress("localhost", 8888));
    
            /** 设置为非阻塞IO模型 **/
            serverSocketChannel.configureBlocking(false);
    
            /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
            /** 阻塞等待事件到达**/
            //可以被中断
            selector.select();
    
            /** 获取到达事件SelectionKey集合**/
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
    
            /** 遍历SelectionKey**/
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                /** 判断是否是OP_ACCEPT事件**/
                if(key.isAcceptable()){
                    /** 从SelectionKey获取对应通道ServerSocketChannel**/
                    ServerSocketChannel socketChannel = (ServerSocketChannel)key.channel();
                    /** 获取SocketChannel**/
                    SocketChannel accept = socketChannel.accept();
                    accept.close();
                }
            }
            serverSocketChannel.close();
            System.out.println("over");
        }
    //over    
    
    • select()阻塞可以被close()中断
     @Test
        public void test_select3() throws Exception {
    
            /** 实例化一个选择器对象 **/
            Selector selector = Selector.open();
            Thread currentThread = Thread.currentThread();
    
            /** 开启一个线程5s后调用close()方法关闭选择器**/
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
    
                    try {
                        TimeUnit.SECONDS.sleep(5);
                        selector.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            thread.start();
    
            /** 创建服务器套接字通道 ServerSocketChannel **/
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    
            /** 绑定监听 InetSocketAddress **/
            serverSocketChannel.bind(new InetSocketAddress("localhost", 8888));
    
            /** 设置为非阻塞IO模型 **/
            serverSocketChannel.configureBlocking(false);
    
            /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
            /** 阻塞等待事件到达**/
            //可以被中断
            selector.select();
    
            serverSocketChannel.close();
            System.out.println("over");
        }
    
    • select()阻塞被wakeup()唤醒
    @Test
        public void test_select1() throws Exception {
    
            /** 实例化一个选择器对象 **/
            Selector selector = Selector.open();
            Thread currentThread = Thread.currentThread();
    
            /** 开启一个线程5s后调用wakeup()方法**/
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
    
                    try {
                        TimeUnit.SECONDS.sleep(5);
                        selector.wakeup();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            thread.start();
    
            /** 创建服务器套接字通道 ServerSocketChannel **/
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    
            /** 绑定监听 InetSocketAddress **/
            serverSocketChannel.bind(new InetSocketAddress("localhost", 8888));
    
            /** 设置为非阻塞IO模型 **/
            serverSocketChannel.configureBlocking(false);
    
            /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
            /** 阻塞等待事件到达**/
            //可以被中断
            selector.select();
    
    
            /** 获取到达事件SelectionKey集合**/
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
    
            /** 遍历SelectionKey**/
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                /** 判断是否是OP_ACCEPT事件**/
                if(key.isAcceptable()){
                    /** 从SelectionKey获取对应通道ServerSocketChannel**/
                    ServerSocketChannel socketChannel = (ServerSocketChannel)key.channel();
                    /** 获取SocketChannel**/
                    SocketChannel accept = socketChannel.accept();
                    accept.close();
                }
            }
            serverSocketChannel.close();
            System.out.println("over");
        }
    
    • selector.select()返回的是publicSelectedKeys变化值,如一个通道监听了连接OP_ACCEPT事件并阻塞,当客户端连接事件到达,线程从阻塞中唤醒返回1,表明有一个通道添加到了publicSelectedKeys集合中。如果事件被处理后并没有从publicSelectedKeys集合中删除,客户端在次发起连接事件到达,线程在次从阻塞中唤醒返回值0,因为当前通道已经存在与publicSelectedKeys集合中
     @Test
        public void test_select4_server() throws Exception {
            /** 实例化一个选择器对象 **/
            Selector selector = Selector.open();
    
            /** 创建服务器套接字通道 ServerSocketChannel **/
            ServerSocketChannel serverSocketChannel1 = ServerSocketChannel.open();
            /** 绑定监听 InetSocketAddress **/
            serverSocketChannel1.bind(new InetSocketAddress("localhost", 8888));
            /** 设置为非阻塞IO模型 **/
            serverSocketChannel1.configureBlocking(false);
            /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
            SelectionKey register1 = serverSocketChannel1.register(selector, SelectionKey.OP_ACCEPT);
    
    
            /** 创建服务器套接字通道 ServerSocketChannel **/
            ServerSocketChannel serverSocketChannel2 = ServerSocketChannel.open();
            /** 绑定监听 InetSocketAddress **/
            serverSocketChannel2.bind(new InetSocketAddress("localhost", 7777));
            /** 设置为非阻塞IO模型 **/
            serverSocketChannel2.configureBlocking(false);
            /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
            SelectionKey register2 = serverSocketChannel2.register(selector, SelectionKey.OP_ACCEPT);
    
    
            /** 创建服务器套接字通道 ServerSocketChannel **/
            ServerSocketChannel serverSocketChannel3 = ServerSocketChannel.open();
            /** 绑定监听 InetSocketAddress **/
            serverSocketChannel3.bind(new InetSocketAddress("localhost", 6666));
            /** 设置为非阻塞IO模型 **/
            serverSocketChannel3.configureBlocking(false);
            /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
            SelectionKey register3 = serverSocketChannel3.register(selector, SelectionKey.OP_ACCEPT);
    
            Set<SelectionKey> selectionKeys=new HashSet<SelectionKey>();
            boolean is_Run=true;
            while (is_Run) {
    
                /** 阻塞等待事件到达**/
                int updatekey = selector.select();
                /** 获取到达事件SelectionKey集合**/
                selectionKeys = selector.selectedKeys();
                System.out.println("updatekey:"+updatekey);
    
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
    
                /** 遍历SelectionKey**/
                while (iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    //在处理完成通道事件后需要手动从publicSelectedKeys集合中删除否则导致重复操作发送
                    //iterator.remove();
                    /** 判断是否是OP_ACCEPT事件**/
                    if(key.isAcceptable()){
                        /** 从SelectionKey获取对应通道ServerSocketChannel**/
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
                        SocketChannel socketChannel = serverSocketChannel.accept();
                    }
                }
            }
        }
    
        @Test
        public void test_select4_client1() throws Exception {
            SocketChannel socketChannel = SocketChannel.open();
            try {
                socketChannel.configureBlocking(true);
                socketChannel.connect(new InetSocketAddress("localhost", 6666));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @Test
        public void test_select4_client2() throws Exception {
            SocketChannel socketChannel = SocketChannel.open();
            SocketChannel socketChannel1 = SocketChannel.open();
            try {
                socketChannel.configureBlocking(true);
                socketChannel1.configureBlocking(true);
                socketChannel.connect(new InetSocketAddress("localhost", 7777));
                socketChannel1.connect(new InetSocketAddress("localhost", 8888));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }   
    

    close

     /**
         * 调用close()关闭选择器会导致
         *
         * 1 释放所有selector.select()阻塞
         *
         * 2 除了再次调用close()和wakeup()方法外,调用selector()的其他方法均出现异常。
         *
         * 3 如果选择器已经关闭则在次调用此方法将不起作用
         */
        @Test
        public void test_close() throws Exception {
    
            /** 实例化一个选择器对象 **/
            Selector selector = Selector.open();
    
            /** 开启一个线程5s后调用close()方法关闭选择器**/
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
    
                    try {
                        TimeUnit.SECONDS.sleep(5);
                        selector.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            thread.start();
    
            /** 创建服务器套接字通道 ServerSocketChannel **/
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    
            /** 绑定监听 InetSocketAddress **/
            serverSocketChannel.bind(new InetSocketAddress("localhost", 8888));
    
            /** 设置为非阻塞IO模型 **/
            serverSocketChannel.configureBlocking(false);
    
            /** 将serverSocketChannel通道注册到selector选择器中,并设置感兴趣的事件OP_ACCEPT**/
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
            /** 阻塞等待事件到达**/
            selector.select();
    
            /** 获取到达事件SelectionKey集合**/
            //此出抛出异常
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
    
            /** 遍历SelectionKey**/
            while (iterator.hasNext()){
                SelectionKey key = iterator.next();
                /** 判断是否是OP_ACCEPT事件**/
                if(key.isAcceptable()){
                    /** 从SelectionKey获取对应通道ServerSocketChannel**/
                    ServerSocketChannel socketChannel = (ServerSocketChannel)key.channel();
                    /** 获取SocketChannel**/
                    SocketChannel accept = socketChannel.accept();
                    accept.close();
                }
            }
            serverSocketChannel.close();
        }
    

    相关文章

      网友评论

          本文标题:JAVANIO -选择器02 Selector原理和使用

          本文链接:https://www.haomeiwen.com/subject/msmolctx.html