nio实现
我们知道使用java nio时会使用以下代码:
//获取selector
Selector selector = Selector.open();
selector.select(1000);
//获取就绪状态的channel的key
Set<SelectionKey> selectionKeys = selector.selectedKeys();
open()的实现为:
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
这是使用了SelectorProvider去创建一个Selector,看下SelectorProvider的默认实例:
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
public class DefaultSelectorProvider {
public static SelectorProvider create() {
String osname = AccessController.doPrivileged(new GetPropertyAction("os.name"));
if ("SunOS".equals(osname)) {
return new sun.nio.ch.DevPollSelectorProvider();
}
// use EPollSelectorProvider for Linux kernels >= 2.6
if ("Linux".equals(osname)) {
String osversion = AccessController.doPrivileged(
new GetPropertyAction("os.version"));
String[] vers = osversion.split("\\.", 0);
if (vers.length >= 2) {
try {
int major = Integer.parseInt(vers[0]);
int minor = Integer.parseInt(vers[1]);
if (major > 2 || (major == 2 && minor >= 6)) {
return new sun.nio.ch.EPollSelectorProvider();
}
} catch (NumberFormatException x) {
// format not recognized
}
}
}
return new sun.nio.ch.PollSelectorProvider();
}
}
从以上可知:对于Linux,返回的Provder是EPollSelectorProvider,其余操作系统,返回的是PollSelectorProvider。
继续看下EPollSelectorProvider
public class EPollSelectorProvider extends SelectorProviderImpl
{
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
public Channel inheritedChannel() throws IOException {
return InheritedChannel.getChannel();
}
}
EPollSelectorImpl的实现为:
class EPollSelectorImpl extends SelectorImpl{
// The poll object
EPollArrayWrapper pollWrapper;
// Maps from file descriptors to keys
private Map<Integer,SelectionKeyImpl> fdToKey;
/**
* Package private constructor called by factory method in
* the abstract superclass Selector.
*/
EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
pollWrapper = new EPollArrayWrapper();
pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<>();
}
protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
processDeregisterQueue();
try {
begin();
pollWrapper.poll(timeout);
} finally {
end();
}
processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
其中doSelect 方法会在selector.select执行时执行,doSelect方法中最重要的是
pollWrapper.poll(timeout);
关于EPollArrayWrapper:
* The system call to wait for I/O events is epoll_wait(2). It populates an
* array of epoll_event structures that are passed to the call. The data
* member of the epoll_event structure contains the same data as was set
* when the file descriptor was registered to epoll via epoll_ctl(2). In
* this implementation we set data.fd to be the file descriptor that we
* register. That way, we have the file descriptor available when we
* process the events.
EPollArrayWrapper() throws IOException {
// creates the epoll file descriptor
epfd = epollCreate();
// the epoll_event array passed to epoll_wait
int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
// eventHigh needed when using file descriptors > 64k
if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
eventsHigh = new HashMap<>();
}
int poll(long timeout) throws IOException {
updateRegistrations();
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
private native int epollCreate();
private native void epollCtl(int epfd, int opcode, int fd, int events);
private native int epollWait(long pollAddress, int numfds, long timeout,
int epfd) throws IOException;
首先调用epollCtl系统调用,更新fd到epoll实例,然后调用epollWait系统调用,线程在此处阻塞,超时或有fd就绪时会被唤醒,返回值是一个fd的集合,0表示无就绪时间,-1表示report error and abort,否则遍历并处理fd。
EPollArrayWrapper的三个native方法的实现代码可参阅openjdk7/jdk/src/solaris/native/sun/nio/ch/ EPollArrayWrapper.c,可看到这三个native方法正是对上述epoll系列系统调用的包装。
自己实现一个Native方法(linux版) - CSDN博客
#include "jni.h"
#include "jni_util.h"
#include "jvm.h"
#include "jlong.h"
#include <unistd.h>
#include <sys/time.h>
#include <sys/epoll.h>
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{
/*
* epoll_create expects a size as a hint to the kernel about how to
* dimension internal structures. We can't predict the size in advance.
*/
int epfd = epoll_create(256);
if (epfd < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
}
return epfd;
}
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,
jint opcode, jint fd, jint events)
{
struct epoll_event event;
int res;
event.events = events;
event.data.fd = fd;
RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);
}
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,
jlong address, jint numfds,
jlong timeout, jint epfd)
{
struct epoll_event *events = jlong_to_ptr(address);
int res;
if (timeout <= 0) { /* Indefinite or no wait */
RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
} else { /* Bounded wait; bounded restarts */
res = iepoll(epfd, events, numfds, timeout);
}
if (res < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
}
return res;
}
epoll原理
JDK 1.7 NIO Selector在linux平台上的实现类是sun.nio.ch.EPollSelectorImpl,这个类通过linux下的epoll系列系统调用实现NIO。epoll是poll/select系统调用的一个改进版本,能以更高的性能实现IO事件的检测和分发(主要归功于epoll的事件回调机制,下文详述),主要包含以下3个系统调用:
#include
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
epoll_create
创建epoll实例,会创建所需要的红黑树,以及就绪链表,以及代表epoll实例的文件句柄。
在epoll早期的实现中,对于监控文件描述符的组织并不是使用红黑树,而是hash表。目前已经可以动态调整,但是为了兼容老的版本,所以仍然保留,这个size没有意义。
epoll_ctl
可以操作epoll_create创建的epoll,如将socket句柄加入到epoll中让其监控,或把epoll正在监控的某个socket句柄移出epoll。
添加,修改,或者删除 注册到epoll实例中的文件描述符上的监控事件
epoll_wait
在调用时,在给定的timeout时间内,所监控的句柄中有事件发生时,就返回用户态的进程。
当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关。eventpoll结构体如下所示:
struct eventpoll{
....
/*等待队列:epoll正是通过此等待队列实现的事件回调*/
wait_queue_head_t wq;
/*红黑树的根节点:这颗树中存储着所有添加到epoll中的需要监控的事件(用于保存已注册过的文件描述符)*/
struct rb_root rbr;
/*双链表:存放着将要通过epoll_wait返回给用户的满足条件的事件(保存了已就绪的文件描述符)*/
struct list_head rdlist;
....
};
epoll数据结构.jpg
在epoll中,对于每一个事件,都会建立一个epitem结构体,如下所示:
struct epitem{
struct rb_node rbn;//红黑树节点
struct list_head rdllink;//双向链表节点
struct epoll_filefd ffd; //事件句柄信息
struct eventpoll *ep; //指向其所属的eventpoll对象
struct epoll_event event; //期待发生的事件类型
}
当调用epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可。如果rdlist不为空,则把发生的事件复制到用户态,同时将事件数量返回给用户。
具体实现参考源码:linux/eventpoll.c at master · torvalds/linux · GitHub
要深刻理解epoll,首先得了解epoll的三大关键要素:mmap、红黑树、链表。
epoll是通过内核与用户空间mmap同一块内存实现的。mmap将用户空间的一块地址和内核空间的一块地址同时映射到相同的一块物理内存地址(不管是用户空间还是内核空间都是虚拟地址,最终要通过地址映射映射到物理地址),使得这块物理内存对内核和对用户均可见,减少用户态和内核态之间的数据交换。内核可以直接看到epoll监听的句柄,效率高。
红黑树将存储epoll所监听的套接字。上面mmap出来的内存如何保存epoll所监听的套接字,必然也得有一套数据结构,epoll在实现上采用红黑树去存储所有套接字,当添加或者删除一个套接字时(epoll_ctl),都在红黑树上去处理,红黑树本身插入和删除性能比较好,时间复杂度O(logN)。
通过epoll_ctl函数添加进来的事件都会被放在红黑树的某个节点内,所以,重复添加是没有用的。当把事件添加进来的时候时候会完成关键的一步,那就是该事件都会与相应的设备(网卡)驱动程序建立回调关系,当相应的事件发生后,就会调用这个回调函数(ep_poll_callback),这个回调函数其实就所把这个事件添加到rdllist这个双向链表中。一旦有事件发生,epoll就会将该事件添加到双向链表中。那么当我们调用epoll_wait时,epoll_wait只需要检查rdlist双向链表中是否有存在注册的事件,效率非常可观。
综上所述,epoll系统调用通过等待队列,其事件检测(epoll_wait系统调用)的时间复杂度为O(n),其中n是“活跃”的文件描述符总数,所谓的活跃,是指在该文件描述符上有频繁的读写操作,而对比poll或select系统调用(其实现代码在fs/select.c中),其时间复杂度也是O(n),但这个n却是注册的文件描述符的总数。因此,当活跃的文件描述符占总的文件描述符比例较小时,例如,在长连接服务器的场景中,虽然同时可能需要维持数十万条长连接,但其中只有少数的连接是活跃的,使用epoll就比较合适。
poll&&epoll实现分析(二)——epoll实现-lvyilong316-ChinaUnix博客
Linux下的I/O复用与epoll详解 - junren - 博客园
linux epoll机制 - CSDN博客
总结
Linux2.6之后支持epoll
windows支持select而不支持epoll
不同系统下nio的实现是不一样的,包括Sunos linux 和windows
select的复杂度为O(N)
select有最大fd限制,默认为1024
修改sys/select.h可以改变select的fd数量限制
epoll的事件模型,无fd数量限制,复杂度O(1),不需要遍历fd
参考:
Java NIO之EPollSelectorImpl详解 – hellojavacases微信公众号网站
深入浅出NIO之Selector实现原理 - 简书
linux epoll 源码浅析(尾篇)码农少华新浪博客
网友评论