继承树
- SelectionKey继承树:
java.nio.channels.SelectionKey
-> java.nio.channels.spi.AbstractSelectionKey
-> sun.nio.ch.SelectionKeyImpl
- Selector继承树:
java.nio.channels.Selector
-> java.nio.channels.spi.AbstractSelector
-> sun.nio.ch.SelectorImpl
-> sun.nio.ch.EPollSelectorImpl
- SelectorProvider继承树:
java.nio.channels.spi.SelectorProvider
-> sun.nio.ch.SelectorProviderImpl
-> sun.nio.ch.EPollSelectorProvider
SelectionKey
SelectionKey(选择键)表示SelectableChannel(可选择通道)在Selector(选择器)注册的令牌。
通道向选择器注册时会创建一个选择键。调用选择键的cancel方法,或关闭通道,或关闭选择器,选择键会失效,可调用isValid方法探测有效状态。
选择键包含两个操作集:
- 兴趣集:选择器下一次选择时,检测哪些操作的准备就绪信息。
- 就绪集:选择器检测到哪些操作准备就绪。
操作可以是读(read)、写(write)、连接(connect)、接受(accept)。
// 选择键创建时的通道和选择器
public abstract SelectableChannel channel();
public abstract Selector selector();
// 取消注册
public abstract void cancel();
// 探测选择键有效状态
public abstract boolean isValid();
// 选择键的兴趣集
public abstract int interestOps();
// 设置兴趣集,返回自身
public abstract SelectionKey interestOps(int ops);
// 选择键的就绪集
public abstract int readyOps();
// 读操作
public static final int OP_READ = 1 << 0;
public final boolean isReadable();
// 写操作
public static final int OP_WRITE = 1 << 2;
public final boolean isWritable();
// 连接操作
public static final int OP_CONNECT = 1 << 3;
public final boolean isConnectable();
// 接受操作
public static final int OP_ACCEPT = 1 << 4;
public final boolean isAcceptable();
Selector
SelectableChannel的多路复用器(选择器)。
创建选择器:
// provider: sun.nio.ch.EPollSelectorProvider
// selector: sun.nio.ch.EPollSelectorImpl
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
Selector维护三个选择键(SelectionKey)集:
- 注册键集:向此选择器注册的通道的选择键。不可直接修改。
- 已选择键集:选择器上一次选择时,兴趣集中有操作就绪的选择键。为注册键集的子集。不可添加,可通过集合方法删除选择键。
- 已取消键集:已取消但通道尚未注销的选择键。为注册键集的子集。不可直接访问。
选择器创建时,三个选择键集均为空;
通道向选择器注册时,生成的选择键加入注册键集;
选择过程中,键加入已选择键集;
取消选择键(cancel键或关闭通道),键加入已取消键集;
下次选择时,已取消键集对应的通道被注销,选择键从所有键集中删除。
// 注册键集
public abstract Set<SelectionKey> keys();
// 已选择键集
public abstract Set<SelectionKey> selectedKeys();
// 无获取已取消键集方法
选择操作:
查询底层操作系统,更新通道的准备状态,以执行选择操作开始时由兴趣集标示的任何操作。
// 阻塞
// 返回时机:有通道选中,或选择器wakeup方法被调用,或当前线程被中断
// 返回:就绪集被更新的选择键数量
public abstract int select() throws IOException;
// select()的超时版本
// timeout:正,超时阻塞;0,无限阻塞;非负
public abstract int select(long timeout) throws IOException;
// select()的非阻塞版本
public abstract int selectNow() throws IOException;
// 使尚未返回的第一个选择操作立即返回
// 若当前无选择操作,则下一次选择操作立即返回,除非是selectNow()
// 在连续两次选择操作之间多次调用只相当于一次
public abstract Selector wakeup();
Linux epoll
epoll是Linux下多路复用IO接口select/poll的增强版本,能显著提高程序在大量并发连接中只有少量活跃的情况下系统CPU的利用率。
epoll有两种工作方式:
- LT(level triggered,水平触发):默认工作方式,同时支持阻塞和非阻塞。只要fd就绪(有数据时),内核就会通知fd就绪。传统的select/poll就是此工作方式。
- ET(edge triggered,边缘触发):高速工作方式,仅支持非阻塞。当fd从未就绪变为就绪时(状态变化),内核会通知fd就绪,并且不会再次通知。
epoll的使用:
- 创建epoll描述符
int epoll_create (int size);
epoll实现由hash表改为红黑树后,size已无意义
- 注册监控事件
int epoll_ctl (int epfd, int op, int fd, struct epoll_event *event);
epfd:epoll描述符
op:操作类型
EPOLL_CTL_ADD:添加注册事件
EPOLL_CTL_MOD:修改注册事件
EPOLL_CTL_DEL:删除注册事件
fd:要监听的描述符
event:要监听的事件
EPOLLIN:读
EPOLLOUT:写
EPOLLRDHUP:关闭了连接
EPOLLET:设置ET工作方式
EPOLLONESHOT:设置one-shot工作方式
- epoll等待
int epoll_wait (int epfd, struct epoll_event* events, int maxevents, int timeout);
epfd:epoll描述符
events:就绪事件,将事件从内核复制到events数组中
maxevents:最多监听多少个事件
timeout:超时毫秒,-1为无限阻塞,0则立即返回
- 例子
#include <stdio.h>
#include <unistd.h>
#include <sys/epoll.h>
int main(void) {
int epfd = epoll_create(1); // 创建epoll描述符
struct epoll_event ev; // 监听事件
ev.data.fd = STDIN_FILENO; // 监听标准输入
ev.events = EPOLLIN|EPOLLET; // 监听读,设置ET
epoll_ctl(epfd, EPOLL_CTL_ADD, STDIN_FILENO, &ev); // 注册epoll事件
struct epoll_event events[5]; // 就绪事件
for (;;) {
int nfds = epoll_wait(epfd, events, 5, -1);
for (int i = 0; i < nfds; i++) {
if (events[i].data.fd == STDIN_FILENO)
printf("epoll read event");
}
}
}
Java Epoll
Java利用Linux Epoll实现多路复用。多路复用器类为sun.nio.ch.EPollSelectorImpl,epoll封装类为sun.nio.ch.EPollArrayWrapper。
class EPollSelectorImpl extends SelectorImpl {
// epoll对象
EPollArrayWrapper pollWrapper;
// 通道fd->选择键
private Map<Integer,SelectionKeyImpl> fdToKey;
// 保存要监听的描述符和事件
// 描述符值小时:eventsLow:描述符为下标,事件为值
// 描述符值大时:eventsHigh:描述符->事件
private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
private Map<Integer,Byte> eventsHigh;
// 特殊的事件值:表示忽略要更新的监听描述符和事件
private static final byte KILLED = (byte)-1;
// 创建选择器实例
EPollSelectorImpl(SelectorProvider sp) throws IOException {
...
pollWrapper = new EPollArrayWrapper();
...
fdToKey = new HashMap<>();
}
}
class EPollArrayWrapper {
// epoll描述符
private final int epfd;
// epoll_wait函数返回的epoll_event数组
private final AllocatedNativeObject pollArray;
// epoll_event数组基址
private final long pollArrayAddress;
// epoll更新的
int updated;
// 创建epoll封装类实例
EPollArrayWrapper() throws IOException {
// 创建epoll描述符
epfd = epollCreate();
// 创建epoll_wait函数epoll_event数组
int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
...
}
}
向选择器注册通道:
// AbstractSelectableChannel
// sel:选择器;ops:兴趣集;att:附件
SelectionKey register(Selector sel, int ops, Object att) {
synchronized (regLock) {
// 若已注册,则更新兴趣集和附件
SelectionKey k = findKey(sel);
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
// 若未注册,则向选择器进行注册
if (k == null) {
synchronized (keyLock) {
k = ((AbstractSelector)sel).register(this, ops, att); //
addKey(k);
}
}
return k;
}
}
// SelectorImpl
SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) {
// 创建选择键
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
// 注册通道
synchronized (publicKeys) {
implRegister(k); //
}
// 设置兴趣集
k.interestOps(ops);
return k;
}
// EPollSelectorImpl
void implRegister(SelectionKeyImpl ski) {
// 多路复用通道
SelChImpl ch = ski.channel;
// 通道描述符
int fd = Integer.valueOf(ch.getFDVal());
// 通道描述符->通道选择键
fdToKey.put(fd, ski);
// 向epoll实例添加通道描述符
pollWrapper.add(fd); //
// 添加到注册键集
keys.add(ski);
}
void add(int fd) {
synchronized (updateLock) {
setUpdateEvents(fd, (byte)0, true);
}
}
// fd:要监听的描述符;events:要监听的事件;
void setUpdateEvents(int fd, byte events, boolean force) {
if (fd < MAX_UPDATE_ARRAY_SIZE) {
if ((eventsLow[fd] != KILLED) || force) {
eventsLow[fd] = events;
}
} else {
Integer key = Integer.valueOf(fd);
if (!isEventsHighKilled(key) || force) {
eventsHigh.put(key, Byte.valueOf(events));
}
}
}
// 设置兴趣操作集
// SelectionKeyImpl
SelectionKey interestOps(int ops) {
return nioInterestOps(ops);
}
SelectionKey nioInterestOps(int ops) {
channel.translateAndSetInterestOps(ops, this);
interestOps = ops;
return this;
}
// ServerSocketChannelImpl
void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
int newOps = 0;
//翻译成Linux epoll监听事件
if ((ops & SelectionKey.OP_ACCEPT) != 0)
newOps |= PollArrayWrapper.POLLIN;
// 注册监听事件
sk.selector.putEventOps(sk, newOps);
}
// EPollSelectorImpl
void putEventOps(SelectionKeyImpl ski, int ops) {
SelChImpl ch = ski.channel;
// 向epoll注册要监听的描述符和事件
pollWrapper.setInterest(ch.getFDVal(), ops);
}
// EPollArrayWrapper
void setInterest(int fd, int mask) {
synchronized (updateLock) {
// 设置监听描述符
updateDescriptors[updateCount++] = fd;
// 设置监听事件
byte b = (byte)mask;
assert (b == mask) && (b != KILLED);
setUpdateEvents(fd, b, false);
}
}
选择操作:
// SelectorImpl
int select() {
return select(0);
}
int select(long timeout) {
return lockAndDoSelect((timeout == 0) ? -1 : timeout);
}
public int selectNow() {
return lockAndDoSelect(0);
}
int lockAndDoSelect(long timeout) {
synchronized (this) {
synchronized (publicKeys) {
synchronized (publicSelectedKeys) {
return doSelect(timeout); //
}
}
}
}
// EPollSelectorImpl
// timeout:正,超时阻塞;-1,无限阻塞;0,立即返回;与epoll_wait.timeout一致
int doSelect(long timeout) {
// 注销已取消键集的通道
processDeregisterQueue();
try {
// 设置中断器,以便在阻塞时能够响应中断
// 中断器将调用wakeup()使选择操作返回
begin();
// epoll_wait
pollWrapper.poll(timeout);
} finally {
end();
}
// 再次处理已取消键集
processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
// EPollArrayWrapper
int poll(long timeout) {
// 更新注册的通道和事件
updateRegistrations();
// epoll_wait
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
// 更新通道注册
void updateRegistrations() {
synchronized (updateLock) {
int j = 0;
while (j < updateCount) {
int fd = updateDescriptors[j];
short events = getUpdateEvents(fd);
boolean isRegistered = registered.get(fd);
int opcode = 0;
if (events != KILLED) {
if (isRegistered) {
opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
} else {
opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
}
if (opcode != 0) {
// 注册监听描述符和事件
epollCtl(epfd, opcode, fd, events);
if (opcode == EPOLL_CTL_ADD) {
registered.set(fd);
} else if (opcode == EPOLL_CTL_DEL) {
registered.clear(fd);
}
}
}
j++;
}
updateCount = 0;
}
}
private int updateSelectedKeys() {
int entries = pollWrapper.updated;
int numKeysUpdated = 0;
for (int i=0; i<entries; i++) {
// 获取就绪描述符
int nextFD = pollWrapper.getDescriptor(i);
// 获取就绪描述符的选择键
SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
if (ski != null) {
// 获取就绪事件
int rOps = pollWrapper.getEventOps(i);
if (selectedKeys.contains(ski)) {
// 设置选择键的就绪操作集
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
numKeysUpdated++;
}
} else {
ski.channel.translateAndSetReadyOps(rOps, ski);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski);
numKeysUpdated++;
}
}
}
}
return numKeysUpdated;
}
// 对应操作系统的epoll:
// 创建epoll实例
private native int epollCreate();
// epoll事件注册
private native void epollCtl(int epfd, int opcode, int fd, int events);
// epoll等待
private native int epollWait(long pollAddress, int numfds, long timeout, int epfd);
网友评论