多路复用

作者: 毛小力 | 来源:发表于2018-02-27 14:34 被阅读0次

    继承树

    • SelectionKey继承树:
    java.nio.channels.SelectionKey
        -> java.nio.channels.spi.AbstractSelectionKey
            -> sun.nio.ch.SelectionKeyImpl
    
    • Selector继承树:
    java.nio.channels.Selector
        -> java.nio.channels.spi.AbstractSelector
            -> sun.nio.ch.SelectorImpl
                -> sun.nio.ch.EPollSelectorImpl
    
    • SelectorProvider继承树:
    java.nio.channels.spi.SelectorProvider
        -> sun.nio.ch.SelectorProviderImpl
            -> sun.nio.ch.EPollSelectorProvider
    

    SelectionKey

    SelectionKey(选择键)表示SelectableChannel(可选择通道)在Selector(选择器)注册的令牌。

    通道向选择器注册时会创建一个选择键。调用选择键的cancel方法,或关闭通道,或关闭选择器,选择键会失效,可调用isValid方法探测有效状态。

    选择键包含两个操作集:

    1. 兴趣集:选择器下一次选择时,检测哪些操作的准备就绪信息。
    2. 就绪集:选择器检测到哪些操作准备就绪。

    操作可以是读(read)、写(write)、连接(connect)、接受(accept)。

    // 选择键创建时的通道和选择器
    public abstract SelectableChannel channel();
    public abstract Selector selector();
    
    // 取消注册
    public abstract void cancel();
    
    // 探测选择键有效状态
    public abstract boolean isValid();
    
    
    // 选择键的兴趣集
    public abstract int interestOps();
    // 设置兴趣集,返回自身
    public abstract SelectionKey interestOps(int ops);
    // 选择键的就绪集
    public abstract int readyOps();
    
    // 读操作
    public static final int OP_READ = 1 << 0;
    public final boolean isReadable();
    
    // 写操作
    public static final int OP_WRITE = 1 << 2;
    public final boolean isWritable();
    
    // 连接操作
    public static final int OP_CONNECT = 1 << 3;
    public final boolean isConnectable();
    
    // 接受操作
    public static final int OP_ACCEPT = 1 << 4;
    public final boolean isAcceptable();
    

    Selector

    SelectableChannel的多路复用器(选择器)。

    创建选择器:

    // provider: sun.nio.ch.EPollSelectorProvider
    // selector: sun.nio.ch.EPollSelectorImpl
    public static Selector open() throws IOException {
        return SelectorProvider.provider().openSelector();
    }
    

    Selector维护三个选择键(SelectionKey)集:

    • 注册键集:向此选择器注册的通道的选择键。不可直接修改。
    • 已选择键集:选择器上一次选择时,兴趣集中有操作就绪的选择键。为注册键集的子集。不可添加,可通过集合方法删除选择键。
    • 已取消键集:已取消但通道尚未注销的选择键。为注册键集的子集。不可直接访问。

    选择器创建时,三个选择键集均为空;
    通道向选择器注册时,生成的选择键加入注册键集;
    选择过程中,键加入已选择键集;
    取消选择键(cancel键或关闭通道),键加入已取消键集;
    下次选择时,已取消键集对应的通道被注销,选择键从所有键集中删除。

    // 注册键集
    public abstract Set<SelectionKey> keys();
    // 已选择键集
    public abstract Set<SelectionKey> selectedKeys();
    // 无获取已取消键集方法
    

    选择操作:
    查询底层操作系统,更新通道的准备状态,以执行选择操作开始时由兴趣集标示的任何操作。

    // 阻塞
    // 返回时机:有通道选中,或选择器wakeup方法被调用,或当前线程被中断
    // 返回:就绪集被更新的选择键数量
    public abstract int select() throws IOException;
    
    // select()的超时版本
    // timeout:正,超时阻塞;0,无限阻塞;非负
    public abstract int select(long timeout) throws IOException;
    
    // select()的非阻塞版本
    public abstract int selectNow() throws IOException;
    
    // 使尚未返回的第一个选择操作立即返回
    // 若当前无选择操作,则下一次选择操作立即返回,除非是selectNow()
    // 在连续两次选择操作之间多次调用只相当于一次
    public abstract Selector wakeup();
    

    Linux epoll

    epoll是Linux下多路复用IO接口select/poll的增强版本,能显著提高程序在大量并发连接中只有少量活跃的情况下系统CPU的利用率。

    epoll有两种工作方式:

    • LT(level triggered,水平触发):默认工作方式,同时支持阻塞和非阻塞。只要fd就绪(有数据时),内核就会通知fd就绪。传统的select/poll就是此工作方式。
    • ET(edge triggered,边缘触发):高速工作方式,仅支持非阻塞。当fd从未就绪变为就绪时(状态变化),内核会通知fd就绪,并且不会再次通知。

    epoll的使用:

    1. 创建epoll描述符
    int epoll_create (int size);
    
    epoll实现由hash表改为红黑树后,size已无意义
    
    1. 注册监控事件
    int epoll_ctl (int epfd, int op, int fd, struct epoll_event *event);
    
    epfd:epoll描述符
    op:操作类型
        EPOLL_CTL_ADD:添加注册事件
        EPOLL_CTL_MOD:修改注册事件
        EPOLL_CTL_DEL:删除注册事件
    fd:要监听的描述符
    event:要监听的事件
        EPOLLIN:读
        EPOLLOUT:写
        EPOLLRDHUP:关闭了连接
        EPOLLET:设置ET工作方式
        EPOLLONESHOT:设置one-shot工作方式
    
    1. epoll等待
    int epoll_wait (int epfd, struct epoll_event* events, int maxevents, int timeout);
    
    epfd:epoll描述符
    events:就绪事件,将事件从内核复制到events数组中
    maxevents:最多监听多少个事件
    timeout:超时毫秒,-1为无限阻塞,0则立即返回
    
    1. 例子
    #include <stdio.h>
    #include <unistd.h>
    #include <sys/epoll.h>
    int main(void) {
        int epfd = epoll_create(1); // 创建epoll描述符
        
        struct epoll_event ev; // 监听事件
        ev.data.fd = STDIN_FILENO; // 监听标准输入
        ev.events = EPOLLIN|EPOLLET; // 监听读,设置ET
        epoll_ctl(epfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev); // 注册epoll事件
        
        struct epoll_event events[5]; // 就绪事件
        for (;;) {
            int nfds = epoll_wait(epfd, events, 5, -1);
            for (int i = 0; i < nfds; i++) {
                if (events[i].data.fd == STDIN_FILENO)
                    printf("epoll read event");
            }
        }
    }
    

    Java Epoll

    Java利用Linux Epoll实现多路复用。多路复用器类为sun.nio.ch.EPollSelectorImpl,epoll封装类为sun.nio.ch.EPollArrayWrapper。

    class EPollSelectorImpl extends SelectorImpl {
        // epoll对象
        EPollArrayWrapper pollWrapper;
        // 通道fd->选择键
        private Map<Integer,SelectionKeyImpl> fdToKey;
        
        // 保存要监听的描述符和事件
        // 描述符值小时:eventsLow:描述符为下标,事件为值
        // 描述符值大时:eventsHigh:描述符->事件
        private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
        private Map<Integer,Byte> eventsHigh;
        // 特殊的事件值:表示忽略要更新的监听描述符和事件
        private static final byte  KILLED = (byte)-1;
    
        // 创建选择器实例
        EPollSelectorImpl(SelectorProvider sp) throws IOException {
            ...
            pollWrapper = new EPollArrayWrapper();
            ...
            fdToKey = new HashMap<>();
        }
    }
    
    class EPollArrayWrapper {
        // epoll描述符
        private final int epfd;
        // epoll_wait函数返回的epoll_event数组
        private final AllocatedNativeObject pollArray;
        // epoll_event数组基址
        private final long pollArrayAddress;
        // epoll更新的
        int updated;
    
        // 创建epoll封装类实例
        EPollArrayWrapper() throws IOException {
            // 创建epoll描述符
            epfd = epollCreate();
    
            // 创建epoll_wait函数epoll_event数组
            int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
            pollArray = new AllocatedNativeObject(allocationSize, true);
            pollArrayAddress = pollArray.address();
            
            ...
        }
    }
    

    向选择器注册通道:

    // AbstractSelectableChannel
    // sel:选择器;ops:兴趣集;att:附件
    SelectionKey register(Selector sel, int ops, Object att) {
        synchronized (regLock) {
            // 若已注册,则更新兴趣集和附件
            SelectionKey k = findKey(sel);
            if (k != null) {
                k.interestOps(ops);
                k.attach(att);
            }
    
            // 若未注册,则向选择器进行注册
            if (k == null) {
                synchronized (keyLock) {
                    k = ((AbstractSelector)sel).register(this, ops, att); // 
                    addKey(k);
                }
            }
            return k;
        }
    }
    
    // SelectorImpl
    SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) {
        // 创建选择键
        SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
        k.attach(attachment);
        // 注册通道
        synchronized (publicKeys) {
            implRegister(k); // 
        }
        // 设置兴趣集
        k.interestOps(ops);
        return k;
    }
    
    // EPollSelectorImpl
    void implRegister(SelectionKeyImpl ski) {
        // 多路复用通道
        SelChImpl ch = ski.channel;
        // 通道描述符
        int fd = Integer.valueOf(ch.getFDVal());
        // 通道描述符->通道选择键
        fdToKey.put(fd, ski);
        // 向epoll实例添加通道描述符
        pollWrapper.add(fd); //
        // 添加到注册键集
        keys.add(ski);
    }
    void add(int fd) {
        synchronized (updateLock) {
            setUpdateEvents(fd, (byte)0, true);
        }
    }
    // fd:要监听的描述符;events:要监听的事件;
    void setUpdateEvents(int fd, byte events, boolean force) {
        if (fd < MAX_UPDATE_ARRAY_SIZE) {
            if ((eventsLow[fd] != KILLED) || force) {
                eventsLow[fd] = events;
            }
        } else {
            Integer key = Integer.valueOf(fd);
            if (!isEventsHighKilled(key) || force) {
                eventsHigh.put(key, Byte.valueOf(events));
            }
        }
    }
    
    // 设置兴趣操作集
    // SelectionKeyImpl
    SelectionKey interestOps(int ops) {
        return nioInterestOps(ops);
    }
    SelectionKey nioInterestOps(int ops) {
        channel.translateAndSetInterestOps(ops, this);
        interestOps = ops;
        return this;
    }
    // ServerSocketChannelImpl
    void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
        int newOps = 0;
        //翻译成Linux epoll监听事件
        if ((ops & SelectionKey.OP_ACCEPT) != 0)
            newOps |= PollArrayWrapper.POLLIN;
        // 注册监听事件
        sk.selector.putEventOps(sk, newOps);
    }
    // EPollSelectorImpl
    void putEventOps(SelectionKeyImpl ski, int ops) {
        SelChImpl ch = ski.channel;
        // 向epoll注册要监听的描述符和事件
        pollWrapper.setInterest(ch.getFDVal(), ops);
    }
    // EPollArrayWrapper
    void setInterest(int fd, int mask) {
        synchronized (updateLock) {
            // 设置监听描述符
            updateDescriptors[updateCount++] = fd;
            // 设置监听事件
            byte b = (byte)mask;
            assert (b == mask) && (b != KILLED);
            setUpdateEvents(fd, b, false);
        }
    }
    
    
    

    选择操作:

    // SelectorImpl
    int select() {
        return select(0);
    }
    int select(long timeout) {
        return lockAndDoSelect((timeout == 0) ? -1 : timeout);
    }
    public int selectNow() {
        return lockAndDoSelect(0);
    }
    int lockAndDoSelect(long timeout) {
        synchronized (this) {
            synchronized (publicKeys) {
                synchronized (publicSelectedKeys) {
                    return doSelect(timeout); //
                }
            }
        }
    }
    
    // EPollSelectorImpl
    // timeout:正,超时阻塞;-1,无限阻塞;0,立即返回;与epoll_wait.timeout一致
    int doSelect(long timeout) {
        // 注销已取消键集的通道
        processDeregisterQueue();
        try {
            // 设置中断器,以便在阻塞时能够响应中断
            // 中断器将调用wakeup()使选择操作返回
            begin();
            // epoll_wait
            pollWrapper.poll(timeout);
        } finally {
            end();
        }
        
        // 再次处理已取消键集
        processDeregisterQueue();
    
        int numKeysUpdated = updateSelectedKeys();
        if (pollWrapper.interrupted()) {
            // Clear the wakeup pipe
            pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
            synchronized (interruptLock) {
                pollWrapper.clearInterrupted();
                IOUtil.drain(fd0);
                interruptTriggered = false;
            }
        }
        return numKeysUpdated;
    }
    
    // EPollArrayWrapper
    int poll(long timeout) {
        // 更新注册的通道和事件
        updateRegistrations();
        // epoll_wait
        updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
        for (int i=0; i<updated; i++) {
            if (getDescriptor(i) == incomingInterruptFD) {
                interruptedIndex = i;
                interrupted = true;
                break;
            }
        }
        return updated;
    }
    // 更新通道注册
    void updateRegistrations() {
        synchronized (updateLock) {
            int j = 0;
            while (j < updateCount) {
                int fd = updateDescriptors[j];
                short events = getUpdateEvents(fd);
                boolean isRegistered = registered.get(fd);
                int opcode = 0;
    
                if (events != KILLED) {
                    if (isRegistered) {
                        opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
                    } else {
                        opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
                    }
                    if (opcode != 0) {
                        // 注册监听描述符和事件
                        epollCtl(epfd, opcode, fd, events);
                        if (opcode == EPOLL_CTL_ADD) {
                            registered.set(fd);
                        } else if (opcode == EPOLL_CTL_DEL) {
                            registered.clear(fd);
                        }
                    }
                }
                j++;
            }
            updateCount = 0;
        }
    }
    
    private int updateSelectedKeys() {
        int entries = pollWrapper.updated;
        int numKeysUpdated = 0;
        for (int i=0; i<entries; i++) {
            // 获取就绪描述符
            int nextFD = pollWrapper.getDescriptor(i);
            // 获取就绪描述符的选择键
            SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
            if (ski != null) {
                // 获取就绪事件
                int rOps = pollWrapper.getEventOps(i);
                if (selectedKeys.contains(ski)) {
                    // 设置选择键的就绪操作集
                    if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
                        numKeysUpdated++;
                    }
                } else {
                    ski.channel.translateAndSetReadyOps(rOps, ski);
                    if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                        selectedKeys.add(ski);
                        numKeysUpdated++;
                    }
                }
            }
        }
        return numKeysUpdated;
    }
    
    // 对应操作系统的epoll:
    // 创建epoll实例
    private native int epollCreate();
    // epoll事件注册
    private native void epollCtl(int epfd, int opcode, int fd, int events);
    // epoll等待
    private native int epollWait(long pollAddress, int numfds, long timeout, int epfd);
    

    相关文章

      网友评论

        本文标题:多路复用

        本文链接:https://www.haomeiwen.com/subject/lhfdtftx.html