美文网首页
JavaNio-Selector

JavaNio-Selector

作者: 大风过岗 | 来源:发表于2021-03-30 13:24 被阅读0次

    一、概览

    在这篇文章中,我们将探索一下JavaNIO的Selector组件。selector提供了一个机制,该机制可以监视一个或多个NIO通道,当这些通道上的某些操作已就绪时,可以及时地识别到。

    利用这种方式,单个线程就可以管理多个通道,因此也就可以管理多个网络连接。这也为编写高可用高扩展的网络服务器提供了技术保障。

    二、 为什么要使用Selector

    在Selector的帮助下,我们可以使用一个而非多个线程来管理多个通道。对于操作系统而言,线程间的上下文切换是十分昂贵的,而且使用的线程过多也极大地占据内存空间。

    因此,使用的线程数越少越好。然而,需要记住的是,现代操作系统和cpu可以很好地处理多任务,因此,多线程的消耗也在随着时间而减少。

    这里,我们将看下我们是如何利用selector来达到一个线程管理多个通道的。

    注意: selector不仅可以帮助你读取数据,他们同样可以监听网络连接,并且提供在多个低速通道间写数据。

    三、设置

    要想使用selector的话,我们并不需要什么特殊的配置。我们所需要的类都在java.nio包下。我们只需要引入我们需要的包就可以啦。
    之后,我们就可以把多个channel注册到此selector对象上了。当这些通道上有IO活动发生时,该selector自会提醒我们的。这也就解释了,我们何以能够在依赖于单线程完成从大量数据源中读写数据。

    注册到selector上的channel必须是SelectableChannel的子类,因为只有这种类型的channel才能设置为非阻塞模式。

    四、创建一个Selector

    调用Selector类的open方法即可创建一个selector实例,此时它是用系统默认的selector提供者来创建一个新的selector的。

    Selector selector = Selector.open();
    
    

    五、注册selectable Channel

    如果你想要让selector去监听某个通道的话,那么你首先需要把这些需要监听的channel都注册到该selector上。我们可以调用channel的register方法来完成注册。

    但是在把某个channel注册到selector上之前,它必须被设置为非阻塞模式(non-blocking mode):

    channel.configureBlocking(false);
    SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
    

    当然了,这也就意味着,我们无法把FileChannel和selector放在一起使用,因为FileChannel无法切换到非阻塞模式。通常我们都是把socket channel和selector放在一起使用。

    channel.register()方法的第一个参数是我们先前创建的Selector对象,第二个参数是我们所关注的该channel上发生的事件。

    我们总共可监听的事件有四种,每一个都是由SelectionKey中的一个常量来表示的:

    . Connect
    - 当某个客户端试图连接服务器时,就会触发该事件。由SelectionKey.OP_CONNECT来表示
    . Accept
    - 当服务器接收某个客户端的连接时,就会触发该事件。由SelectionKey.OP_ACCEPT来表示
    . Read
    - 当服务器已准备好从通道中读取数据时,就会触发该事件。由SelectionKey.OP_READ来表示
    . Write
    - 当服务器已准备好向该通道中写数据时,就会触发该事件。由SelectionKey.OP_WRITE来表示

    方法的返回值是一个SelectionKey对象,这个对象就代表了某个通道注册到selector之后的注册结果。

    六、SelectionKey 对象

    正如我们在上面所看的那样,当我们把某个channel注册到selector上之后,我们就会得到一个SelectionKey对象,该对象中存储了通道注册的信息。

    6.1 事件集

    事件集参数定义了我们希望该selector监听此通道上的哪些事件。它是一个integer值,我们可以通过以下方式获取相关信息。

    首先,我们可以通过SelectionKey的interestOps方法获取到事件集,然后我们把Selectionkey和此值做"与运算",我们就可以得到一个布尔值来表明此事件是不是我们监听的事件。

    ```
      int interestSet = selectionKey.interestOps();
    
      boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
      boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
      boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
      boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;
    
    ```
    

    6.2 就绪集

    就续集定义了某个channel上有哪些事件是准备就绪的。它是一个整型数字。

    我们有一种便捷的方式来获取的某个channel的就绪集:

    ```
      selectionKey.isAcceptable();
      selectionKey.isConnectable();
      selectionKey.isReadable();
      selectionKey.isWriteable();
    ```
    

    6.3 通道

    要想根据SelectionKey访问channel的话,有一个非常简单的方法:

    Channel channel = key.channel();
    
    

    6.4 Selector

    从SelectionKey对象上获取对应的Selector对象也非常简单:

    Selector selector = key.selector();
    

    6.5 附属对象

    我们可以向SelectionKey上附属一个对象,因为有时我们或许给某个channel分配一个自定义ID或者附属某个java对象,以便于可以跟踪该channel的行为情况:
    我们可以使用SelectionKey的attach()方法来很方便地做到这一点:

    key.attach(object);
    
    Object object = key.attachment();
    
    

    还有一种方式,就是在channel注册的时候,我们把需要附属的对象作为参数传递给register方法:

    SelectionKey key = channel.register(
      selector, SelectionKey.OP_ACCEPT, object);
    
    

    七、channel key选择

    到目前为止,我们已经看到了如何创建一个selector对象,如何向selector上注册channel,以及观察SelectionKey的属性信息。

    这才进行到一半,现在我们需要持续地挑选就绪事件:

    int channels = selector.select();
    

    此方法是一个阻塞方法,它会一直阻塞住直到有就绪事件到来。方法的返回值代表的是处于就绪状态channel个数。

    紧接着,我们通常会获取需要处理的selectionKey

     Set<SelectionKey>  selectionKeys = selector.selectedKeys();
    

    后面,我们只需要遍历这个集合,获取到对应的channel,并执行相应的处理动作即可。

    八、完整示例

    我们这里有个简单的服务端和客户端的简单示例,用以演示一下,使用selector是如何编写网络程序的。

    8.1 服务端程序

    public class EchoServer {
    
        private static final String POISION_PILL = "POISON_PILL";
    
    
        public static void main(String[] args) throws IOException {
    
            Selector selector   = Selector.open();
    
            ServerSocketChannel  serverSocket = ServerSocketChannel.open();
            serverSocket.bind(new InetSocketAddress("localhost",5454));
            serverSocket.configureBlocking(false);
            SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
            ByteBuffer  buffer = ByteBuffer.allocate(256);
    
            while (true){
    
                int select = selector.select();// selecting the ready set,this method blocks until at least one channel is ready for an operation
    
                Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 获取可用于处理的selected keys
                //遍历这些已准备就绪的事件,并处理
                Iterator<SelectionKey> iter = selectedKeys.iterator();
                while (iter.hasNext()){
    
                    SelectionKey key = iter.next();
                    //逐一对这些已就绪事件进行处理
    
                    //如果是接收网络连接的事件
                    if(key.isAcceptable()){
                        register(selector,serverSocket);
                    }
    
                    //如果是可读事件
                    if(key.isReadable()){
                        answerWithEcho(buffer,key);
                    }
                    iter.remove(); //由此可以看出,使用迭代器进行遍历,在遍历时,可以执行移出动作
                }
            }
    
        }
    
    
    
        private static void register(Selector selector, ServerSocketChannel serverSocket) throws IOException {
    
            //如果发现,该key是网络套接字接收事件,则接收此客户端连接,并把此客户端连接注册到selector上
            SocketChannel  client = serverSocket.accept();
            client.configureBlocking(false);
            client.register(selector,SelectionKey.OP_READ);
        }
    
        private static void answerWithEcho(ByteBuffer buffer, SelectionKey key) throws IOException {
    
            SocketChannel   client = (SocketChannel) key.channel();  //根据key取出对应的channel
            client.read(buffer); //把客户端发来的数据读进buffer中,可以和之前的相比就可看出
                                 //之前是一个字节一个字节的以流的形式读取,现在是批量地读进buffer中
    
    
            String content = new String(buffer.array());
            if(content.trim().equals(POISION_PILL)){
                //如果收到了"POISION_PILL"字符串的话,则关闭此客户端
                client.close();
                System.out.println("Not accepting client messages anymore");
            }else {
                buffer.flip(); //
                client.write(buffer);
                buffer.clear(); //清空buffer
            }
    
        }
    
    
        public static Process  start() throws IOException,InterruptedException {
    
            String javaHome = System.getProperty("java.home");
            String  javaBin = javaHome + File.separator + "bin"  + File.separator + "java";
            String classPath= System.getProperty("java.class.path");
            String className= EchoServer.class.getCanonicalName();
    
            ProcessBuilder  builder = new ProcessBuilder(javaBin,"-cp", classPath,className);
    
            return builder.start();
        }
    
    }
    
    
    

    8.2 客户端程序

    public class EchoClient {
    
    
        private static SocketChannel  client;
        private static ByteBuffer buffer;
        private static EchoClient instance;
    
    
    
        public static  EchoClient start(){
    
            if(instance == null){
                instance = new EchoClient();
            }
            return instance;
        }
    
        public static void stop()throws IOException{
            client.close();
            buffer = null;
        }
    
    
        private EchoClient(){
    
            try{
    
                client = SocketChannel.open(new InetSocketAddress("localhost",5454));
                buffer = ByteBuffer.allocate(256);
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    
    
        public String sendMessage(String msg){
    
            buffer = ByteBuffer.wrap(msg.getBytes());
            String response = null;
    
            try{
                client.write(buffer);
                buffer.clear();
                client.read(buffer);
                response = new String(buffer.array()).trim();
                buffer.clear();
            }catch (IOException e){
                e.printStackTrace();
            }
            return response;
        }
    
    }
    
    
    

    8.3 测试程序

    public class EchoTest {
    
        Process  server;
    
        EchoClient  client;
    
    
        @Before
        public void setup()throws IOException,InterruptedException {
            server = EchoServer.start();
            client = EchoClient.start();
        }
    
    
    
        @Test
        public void givenServerClient_whenServerEchosMessage_thenCorrect() {
    
            String resp1 = client.sendMessage("hello");
            String resp2 = client.sendMessage("world");
            System.out.println(resp1);
            System.out.println(resp2);
        }
    
    
        @Test
        public void  whenWakeUpCalledOnSelector_thenBlokedThreadReturns() throws Exception {
    
            Pipe pipe = Pipe.open();
            Selector selector = Selector.open();
            SelectableChannel channel = pipe.source();
            channel.configureBlocking(false);
            channel.register(selector, SelectionKey.OP_READ);
    
            List<String> invocationStepsTracker = Collections.synchronizedList(new ArrayList<>());
    
            CountDownLatch latch = new  CountDownLatch(1);
    
            new Thread(()->{
    
                invocationStepsTracker.add(">> Count down");
                latch.countDown();
    
                invocationStepsTracker.add(">> Count down");
                latch.countDown();
    
                try {
    
                    invocationStepsTracker.add(">> Start select");
                    selector.select();
                    invocationStepsTracker.add(">> End select");
                } catch (IOException e) {
                    e.printStackTrace();
                }
    
            }).start();
    
            invocationStepsTracker.add(">> Start await");
            latch.await();
            invocationStepsTracker.add(">> End await");
            invocationStepsTracker.add(">> Wakeup thread");
            selector.wakeup();
            // clean up
            channel.close();
    
            System.out.println("============输出StepTracker中的内容================");
            System.out.println(JSONObject.toJSONString(invocationStepsTracker));
        }
    
        @After
        public void tearDown()throws IOException {
            server.destroy();
            EchoClient.stop();
        }
    }
    
    
    

    九、Selector.wakeup()

    前面,我们已经讲到了,当我们调用selector.select()方法时,此方法会把当前线程阻塞住直到被监听的channel上有就绪事件
    发生。但是我们可以在其他线程中调用selector.wakeup()方法来唤醒被此selector阻塞的线程。

    调用selector.wakeup()方法产生的结果是:不管是否有通道处于就绪状态,被阻塞的线程都会立即返回,而非继续等待。

    我们可以使用CountDownLatch来演示一下,并跟踪一下代码的执行情况:

    @Test
       public void  whenWakeUpCalledOnSelector_thenBlokedThreadReturns() throws Exception {
    
           Pipe pipe = Pipe.open();
           Selector selector = Selector.open();
           SelectableChannel channel = pipe.source();
           channel.configureBlocking(false);
           channel.register(selector, SelectionKey.OP_READ);
    
           List<String> invocationStepsTracker = Collections.synchronizedList(new ArrayList<>());
    
           CountDownLatch latch = new  CountDownLatch(1);
    
           new Thread(()->{
    
               invocationStepsTracker.add(">> Count down");
               latch.countDown();
    
               invocationStepsTracker.add(">> Count down");
               latch.countDown();
    
               try {
    
                   invocationStepsTracker.add(">> Start select");
                   selector.select();
                   invocationStepsTracker.add(">> End select");
               } catch (IOException e) {
                   e.printStackTrace();
               }
    
           }).start();
    
           invocationStepsTracker.add(">> Start await");
           latch.await();
           invocationStepsTracker.add(">> End await");
           invocationStepsTracker.add(">> Wakeup thread");
           selector.wakeup();
           // clean up
           channel.close();
    
           System.out.println("============输出StepTracker中的内容================");
           System.out.println(JSONObject.toJSONString(invocationStepsTracker));
       }
    
    

    参考文献

    1. baeldung-JavaNIO-selector

    2. Oracle文档Selector

    相关文章

      网友评论

          本文标题:JavaNio-Selector

          本文链接:https://www.haomeiwen.com/subject/xfnwhltx.html