美文网首页
Selector.select()

Selector.select()

作者: 书唐瑞 | 来源:发表于2021-12-07 11:59 被阅读0次

    Netty的底层依然是依赖于JDK的NIO . 开发NIO服务端的代码如下所示

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.*;
    import java.util.Iterator;
    import java.util.Set;
    
    public class Server {
    
    
        // 缓冲区的大小
        private static final int BUFFER_SIZE = 1024;
    
        // 缓冲区
        private static ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
    
        // 选择器
        private static Selector selector = null;
    
    
    
        public static void main(String[] args) {
    
    
            ServerSocketChannel serverSocketChannel;
    
            try {
                selector = Selector.open();
                serverSocketChannel = ServerSocketChannel.open();
    
                serverSocketChannel.socket().bind(new InetSocketAddress(8080), 64);
                serverSocketChannel.configureBlocking(false);
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
                for (;;) {
    
                    int readyChannels = selector.select();
                    if (readyChannels == 0) {
                        continue;
                    }
    
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
    
                    Iterator iterator = selectedKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = (SelectionKey) iterator.next();
                        handleKey(key);
                        iterator.remove();
                    }
    
                }
    
            } catch (Exception ignored) {
    
            }
    
        }
    
        // 处理SelectionKey
        private static void handleKey(SelectionKey key) throws IOException {
            // 是否有连接进来
            if (key.isAcceptable()) {
                ServerSocketChannel server = (ServerSocketChannel) key.channel();
                SocketChannel socketChannel = server.accept();
                // SocketChannel通道的可读事件注册到Selector中
                registerChannel(selector, socketChannel, SelectionKey.OP_READ);
                // 连接成功 向Client打个招呼
                if (socketChannel.isConnected()) {
                    buffer.clear();
                    buffer.put("I am Server...".getBytes());
                    buffer.flip();
                    socketChannel.write(buffer);
    
                }
    
            }
            // 通道的可读事件就绪
            if (key.isReadable()) {
                SocketChannel socketChannel = (SocketChannel) key.channel();
                buffer.clear(); // 清空缓冲区
                // 读取数据
                int len = 0;
                while ((len = socketChannel.read(buffer)) > 0) {
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        System.out.println("Server读取的数据:" + new String(buffer.array(), 0, len));
                    }
                }
                if (len < 0) {
                    // 非法的SelectionKey 关闭Channel
                    socketChannel.close();
                }
                // SocketChannel通道的可写事件注册到Selector中
                registerChannel(selector, socketChannel, SelectionKey.OP_WRITE);
            }
            // 通道的可写事件就绪
            if (key.isWritable()) {
                SocketChannel socketChannel = (SocketChannel) key.channel();
                buffer.clear(); // 清空缓冲区
                // 准备发送的数据
                String message_from_server = "Hello,Client... " + socketChannel.getLocalAddress();
                buffer.put(message_from_server.getBytes());
                buffer.flip();
                socketChannel.write(buffer);
                System.out.println("Server发送的数据:" + message_from_server);
                // SocketChannel通道的可写事件注册到Selector中
                registerChannel(selector, socketChannel, SelectionKey.OP_READ);
            }
        }
    
    
        private static void registerChannel(Selector selector, SelectableChannel channel, int ops) throws IOException {
            if (channel == null) {
                return;
            }
            channel.configureBlocking(false);
            channel.register(selector, ops);
        }
    }
    
    

    本篇文章就来讲解下selector.select()的功能 .

    个人感觉, 好多功能都是按照三部曲来实现的 1.生产一个冰箱 2.把大象装进冰箱 3.把大象从冰箱取出来

    1.生产一个冰箱

    在调用Selector.open()的时候, 底层会创建各种属性和数据结构,用于存储相关信息.

    // 源码位置 java.nio.channels.Selector#open
    public static Selector open() throws IOException {
        return SelectorProvider.provider().openSelector();
    }
    

    在Windows平台返回 WindowsSelectorImpl 对象 , 在Linux平台返回 EPollSelectorImpl 对象 . 这里以EPollSelectorImpl分析 .

    在Windows平台 , 通过跟踪open()源码的方式, 看不到sun.nio.ch.EPollSelectorImpl 这个类, 可以在Linux平台 或者 直接下载JDK源码 或者互联网搜索EPollSelectorImpl都可以看到这个类 .

    EPollSelectorImpl 继承 SelectorImpl , 在 EPollSelectorImpl 内部有个EPollArrayWrapper类 , EPollArrayWrapper内部就是关于epoll相关的操作 .

    IO多路复用的实现方式有 select, poll, epoll . epoll 主要涉及三个方法: epoll_create, epoll_ctl, epoll_wait

    个人认为, 要想学好Java, 依然要对C语言, 包括一些系统调用了解或熟悉.

    图片.png 图片.png 图片.png

    在实例化EPollSelectorImpl的时候, 创建了 Set<SelectionKey> selectedKeys , Map<Integer,SelectionKeyImpl> fdToKey, byte[] eventsLow 或 Map<Integer,Byte> eventsHigh 等重要属性 .

    我们不必在意这些属性'散落'在哪些类里, 我们更关注的是, 实例化EPollSelectorImpl的时候 会 创建一些集合等属性对象, 用于存储数据. 这就是在生产一个冰箱, 为后面存储数据使用.

    而且还会创建一个堆外内存的pollArray对象, 这个对象用于接收内核返回的可读写的文件描述符. 因为在进行调用epoll_wait的时候, 需要给内核传递一个对象, 内核会将已经准备就绪的文件描述符填充到这个对象.

    通过man epoll_wait查看


    图片.png

    第二个参数 struct epoll_event *events 是一个传出参数, 而pollArray对象就会传到这个参数上 .

    所有与内核交互的对象, 必须是堆外内存的对象 .

    2.把大象装进冰箱

    在我们自己的代码中 会通过 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT) 注册感兴趣的事件 .

    文章后面会有一个函数之间调用的总图

    // 源码位置 EPollSelectorImpl#implRegister
    protected void implRegister(SelectionKeyImpl ski) {
        if (closed)
            throw new ClosedSelectorException();
        SelChImpl ch = ski.channel;
        int fd = Integer.valueOf(ch.getFDVal());
        fdToKey.put(fd, ski);
        pollWrapper.add(fd);
        keys.add(ski);
    }
    

    会将<fd, SelectionKeyImpl>的对应关系存储到 Map<Integer,SelectionKeyImpl> fdToKey 集合中, 假如我们有个6号文件描述符 fd=6, 把它存储到 fdToKey中, 即<6, SelectionKeyImpl>这样的关系 . 当6号文件描述符有数据进来的时候, 调用epoll_wait的时候, 内核就会把6号文件描述符返回给用户态, 我们再根据<6, SelectionKeyImpl>这个关系,就能找到这个SelectionKeyImpl 了 .

    在调用register方法的时候, 不仅会存储<fd, SelectionKeyImpl>的对应关系, 还会将所有的fd存储到 int[] updateDescriptors 中, 也会将 <fd, events>的关系存储到byte[] eventsLow 或 Map<Integer,Byte> eventsHigh中.

    一句话, 在上面我们已经生产了一个冰箱, 在这里, 我们把数据(也就是大象)放进这个冰箱里面.

    3.把大象从冰箱取出来

    selector.select()

    关键代码最终会调用到EPollArrayWrapper 这个类里的方法.

    会将之前上一步的文件描述符和对应的事件, 通过epoll_ctl系统调用, 放到epoll的红黑树上.

    最终会调用到epoll_wait系统调用函数, 如果有文件描述符就绪, 就将对应的文件描述符放到堆外内存的pollArray对象上.

    用户态拿到pollArray对象之后, 通过遍历, 根据fd从fdToKey中将SelectionKeyImpl 放到 Set<SelectionKey> selectedKeys集合中, 用户态在调用selector.selectedKeys()的时候, 就会将selectedKeys集合返回 . 这样我们的业务代码就拿到了 selectedKeys集合, 进行后续操作处理.

    关于函数之间的调用如下图, 具体也可以查看 https://www.yuque.com/infuq/default/wy8fap#cGAYr

    Reactor模型epoll版服务器 - Java语言.png

    相关文章

      网友评论

          本文标题:Selector.select()

          本文链接:https://www.haomeiwen.com/subject/otvoxrtx.html