概述
根据NIO源码阅读(1)-SelectorProvider一文的分析,Linux下SelectorProvider的实现类为EpollSelectorProvider,下面具体看看它的实现;
EPollSelectorProvider
public abstract class SelectorProviderImpl
extends SelectorProvider
{
public DatagramChannel openDatagramChannel() throws IOException {
return new DatagramChannelImpl(this);
}
public DatagramChannel openDatagramChannel(ProtocolFamily family) throws IOException {
return new DatagramChannelImpl(this, family);
}
public Pipe openPipe() throws IOException {
return new PipeImpl(this);
}
public abstract AbstractSelector openSelector() throws IOException;
public ServerSocketChannel openServerSocketChannel() throws IOException {
return new ServerSocketChannelImpl(this);
}
public SocketChannel openSocketChannel() throws IOException {
return new SocketChannelImpl(this);
}
}
public class EPollSelectorProvider
extends SelectorProviderImpl
{
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
public Channel inheritedChannel() throws IOException {
return InheritedChannel.getChannel();
}
}
EPollSelectorProvider实现比较简单,主要实现了openSelector方法,返回EPollSelectorImpl对象;
EPollSelectorImpl
EpollSelectorImpl继承自SelectorImpl,比较重要的几个成员变量如下:
//epoll的select方法会堵塞线程,那么如何唤醒堵塞线程呢?
//其实很简单,当有读写事件发生时,select方法会返回;
//因此EpollSelectorImpl注册了一对管道fd0,fd1,并将fd0注册到selector;
//当要中断select方法时,往fd1中写入数字1会导致fd0有可读内容,select方法会返回;
protected int fd0;
protected int fd1;
//Epoll的具体实现,关键方法采用JNI实现
EPollArrayWrapper pollWrapper;
//触发事件但还未处理完毕的SelectionKey,比如有可读数据,但未读完;
protected Set<SelectionKey> selectedKeys;
//将selectedKeys暴露给外部访问,可以删除元素,但不可添加元素
private Set<SelectionKey> publicSelectedKeys;
//注册到Selector的所有SelectionKey
protected HashSet<SelectionKey> keys;
//将keys暴露给外部访问,采用不可变对象,防止被修改
private Set<SelectionKey> publicKeys;
此处要注意:在处理selectedKeys()方法返回的SelectionKey集合时,对于处理完毕的SelectionKey,需要手工从集合中清除,否则下次调用selectedKeys()方法时,还会返回该SelectionKey对象;
构造函数
EPollSelectorImpl(SelectorProvider sp) {
super(sp);
long pipeFds = IOUtil.makePipe(false);//管道
fd0 = (int) (pipeFds >>> 32);//读
fd1 = (int) pipeFds;//写
pollWrapper = new EPollArrayWrapper();
pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<Integer,SelectionKeyImpl>();
}
- 首先通过IOUtil.makePipe(false)返回了一个非堵塞的管道(pipe),底层是通过Linux的pipe系统调用实现的;返回两个int常量,分别指向管道的读、写文件描述符;关于管道的作用,前面已经有解释,此处不展开;
- 接着定义了EPollArrayWrapper变量,并调用initInterrup方法,将fd0注册到epoll;
- 最后定义了一个map类型的变量fdToKey,将channel的文件描述符ID和SelectionKey建立映射关系;
select方法
Selector.select方法最终调用的是doSelect方法:
protected int doSelect(long timeout)
throws IOException
{
if (closed)
throw new ClosedSelectorException();
//处理待取消的SelectionKey(调用SelectionKey.cancel()方法取消)
processDeregisterQueue();
try {
//见java.nio.channels.spi.AbstractSelector类,
//通过blockedOn方法将前线程的blocker变量设为Interruptible类实例,
//当线程中断时,会通过Interruptible类的interrupt间接调用wakeup方法,
//使得pollWrapper.poll方法返回
begin();
//调用epoll底层实现
pollWrapper.poll(timeout);
} finally {
//见java.nio.channels.spi.AbstractSelector类,将当前线程的blocker变量设为null
end();
}
//处理待取消的SelectionKey(调用SelectionKey.cancel()方法取消)
processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.interrupted()) {
//清除epoll事件
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();//将中断标志设为false
IOUtil.drain(fd0);//将fd0的数据全部读完
interruptTriggered = false;
}
}
return numKeysUpdated;
}
//AbstractSelectionKey
public final void cancel() {
synchronized (this) {
if (valid) {
valid = false;
((AbstractSelector)selector()).cancel(this);
}
}
}
- 如果selector已经关闭,抛出异常;
- 调用processDeregisterQueue方法,将cancel的selectionKey从selector中删除,底层会调用epoll_ctl方法移除被epoll所监听的文件描述符;
- begin和end方法主要是为了处理线程中断,将线程的中断转化为Selector的wakeup方法,避免线程堵塞在IO操作上;
从上面的源码可以知道,SelectionKey.cancel方法会将valid标志设为false,但不会立即将SelectionKey删除,而是在下一次select方法调用时进行处理;
register方法
register方法是在父类SelectorImpl中定义的:
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment){
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
synchronized (publicKeys) {
implRegister(k);
}
//k.interestOps(ops)=>channel.translateAndSetInterestOps(ops, this)方法将ops转换为事件=>EPollSelectorImpl.putEventOps>=pollWrapper.setInterest(ch.getFDVal(), ops)
//绕来绕去,最终的逻辑还是在EPollArrayWrapper中
k.interestOps(ops);
return k;
}
protected void implRegister(SelectionKeyImpl ski) {
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
int fd = Integer.valueOf(ch.getFDVal());
fdToKey.put(fd, ski);
pollWrapper.add(fd);
keys.add(ski);
}
//SocketChannelImpl.java
public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
int newOps = 0;
if ((ops & SelectionKey.OP_READ) != 0)
newOps |= PollArrayWrapper.POLLIN;//读事件
if ((ops & SelectionKey.OP_WRITE) != 0)
newOps |= PollArrayWrapper.POLLOUT;//写事件
if ((ops & SelectionKey.OP_CONNECT) != 0)
newOps |= PollArrayWrapper.POLLCONN;//连接事件
sk.selector.putEventOps(sk, newOps);
}
//EPollArrayWrapper.java
void setInterest(int fd, int mask) {
synchronized (updateLock) {
int oldCapacity = updateDescriptors.length;
if (updateCount == oldCapacity) {//数组满了,需要扩容
int newCapacity = oldCapacity + INITIAL_PENDING_UPDATE_SIZE;//容量增加64
int[] newDescriptors = new int[newCapacity];//新建数组
System.arraycopy(updateDescriptors, 0, newDescriptors, 0, oldCapacity);//将老数组元素挪到新数组
updateDescriptors = newDescriptors;
}
updateDescriptors[updateCount++] = fd;//记录发生变化的fd
//
byte b = (byte)mask;
assert (b == mask) && (b != KILLED);
setUpdateEvents(fd, b, false);//记录fd发生变化的事件
}
}
register方法将channel的文件描述符ID和SelectionKey添加到fdToKey,并调用pollWrapper.add方法和ke.interestOps(ops)方法添加更新事件,但直到调用pollWrapper的poll方法,才会真正进行处理,将channel注册到epoll;
EPollArrayWrapper
EPollArrayWrapper封装了底层的Epoll操作,Linux的Epoll接口定义请参考/usr/include/sys/epoll.h;
初始化
由于JDK是基于Linux kernel 2.4进行构建的,因此此处函数名称并不是写死的,而是采用dlsym系统调用从进程加载的so库中进行查找后,返回函数指针实现的;
epoll_create_func = (epoll_create_t) dlsym(RTLD_DEFAULT, "epoll_create");
epoll_ctl_func = (epoll_ctl_t) dlsym(RTLD_DEFAULT, "epoll_ctl");
epoll_wait_func = (epoll_wait_t) dlsym(RTLD_DEFAULT, "epoll_wait");
epoll_create
-
函数定义
int epoll_create(int size);
-
函数描述
打开epoll文件描述符,如果为-1,则说明出错了;从Linux 2.6.8开始,int参数被忽略,但必须传入大于0的数字。 -
JNI实现:
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{
int epfd = (*epoll_create_func)(256);
if (epfd < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
}
return epfd;
}
epoll_ctl
-
函数定义
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t; struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ };
-
函数说明
增加或者移除被epoll所监听的文件描述符;
-
op选项可以为下面三者之一:
- EPOLL_CTL_ADD(1),为fd注册event事件
- EPOLL_CTL_MOD(3),对fd的event事件进行修改
- EPOLL_CTL_DEL(2),删除fd已注册的event事件
-
events可以是以下几个宏的集合:
- EPOLLIN= 0x001:表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
- EPOLLPRI=0x002:表示对应的文件描述符有紧急的数据可读;
- EPOLLOUT=0x004:表示对应的文件描述符可以写;
- EPOLLERR=0x008:表示对应的文件描述符发生错误;
- EPOLLHUP=0x010:表示对应的文件描述符被挂断;
- EPOLLET=1<<31: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(LevelTriggered)来说的;
- EPOLLONESHOT=1<<30:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里;
- JNI实现
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,
jint opcode, jint fd, jint events)
{
struct epoll_event event;
int res;
event.events = events;
event.data.fd = fd;
RESTARTABLE((*epoll_ctl_func)(epfd, (int)opcode, (int)fd, &event), res);
if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
}
上面的RESTARTABLE实际上是个宏,主要是为了处理EINTR错误,此时需要重试;另外从上面的源码可以知道JDK并没有实现边缘触发,关于边缘触发和水平触发的差异简单列举如下,边缘触发的性能更高,但编程难度也更高,netty就重新实现了Epoll机制,采用边缘触发方式;另外像nginx等也采用的是边缘触发:
-
LT是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的。
-
ET是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核会通知你一次,并且除非你做了某些操作导致那个文件描述符不再为就绪状态了,否则不会再次发送通知。
可以看到,本来内核在被DMA中断,捕获到IO设备来数据后,只需要查找这个数据属于哪个文件描述符,进而通知线程里等待的函数即可,但是,LT要求内核在通知阶段还要继续再扫描一次刚才所建立的内核fd和io对应的那个数组,因为应用程序可能没有真正去读上次通知有数据后的那些fd,这种沟通方式效率是很低下的,只是方便编程而已;
epoll_wait
-
函数定义:
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
-
函数描述:
用来等待发生在监听描述符上的事件。它会一直阻塞直到事件发生,另外注意的是,由于epoll_wait同步等待,有可能被信号中断,此时需要重新进行读或写操作;
-
JNI实现
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,
jlong address, jint numfds,
jlong timeout, jint epfd)
{
struct epoll_event *events = jlong_to_ptr(address);
int res;
if (timeout <= 0) { /* Indefinite or no wait */
RESTARTABLE((*epoll_wait_func)(epfd, events, numfds, timeout), res);
} else { /* Bounded wait; bounded restarts */
res = iepoll(epfd, events, numfds, timeout);
}
if (res < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
}
return res;
}
RESTARTABLE是宏的名字,它和iepoll都是为了处理EINTR错误而对epoll_wait进行了封装;
如果进程在一个慢系统调用(slow system call)中阻塞时,当捕获到某个信号且相应信号处理函数返回时,这个系统调用被中断,调用返回错误,设置errno为EINTR
慢系统调用(slow system call):此术语适用于那些可能永远阻塞的系统调用。永远阻塞的系统调用是指调用有可能永远无法返回,多数网络支持函数都属于这一类。如:若没有客户连接到服务器上,那么服务器的accept调用就不会返回
网友评论