解释下Channel中accept,read,write中的begin/end函数做了什么
protected final void begin() {
if (interruptor == null) {
interruptor = new Interruptible() {
public void interrupt(Thread target) {
synchronized (closeLock) {
if (!open)
return;
open = false;
interrupted = target;
try {
//这里关闭了当前的channel
AbstractInterruptibleChannel.this.implCloseChannel();
} catch (IOException x) { }
}
}};
}
blockedOn(interruptor);
Thread me = Thread.currentThread();
if (me.isInterrupted())
interruptor.interrupt(me);//如果当前线程被中断,则优雅的关闭channel
}
protected final void end(boolean completed)
throws AsynchronousCloseException
{
blockedOn(null);
Thread interrupted = this.interrupted;
//如果当前线程被中断,则抛出中断关闭异常,即当前线程被其他线程调用interrupt中断
if (interrupted != null && interrupted == Thread.currentThread()) {
interrupted = null;
throw new ClosedByInterruptException();
}
//如果通讯未完成且channel已经关闭,抛出异步关闭异常,即当前channel被其他线程调用close的时候
if (!completed && !open)
throw new AsynchronousCloseException();
}
单步跟踪blockedOn()函数可以得到
其实等价于
Thread.currentThread().blockedOn(interruptor);
由于blockedOn这个函数是protected的,所以需要绕一大圈
private volatile Interruptible blocker;
void blockedOn(Interruptible b) {
synchronized (blockerLock) {
blocker = b;
}
}
这个blocker又干了啥呢,搜索Thead类全文:
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);//看到了吗,当线程执行interrupt的时候就会去执行上面的interruptor类了
return;
}
}
interrupt0();
}
所以总结下begin的作用是,给当前的thread设置一个blocker使之可以在线程被中断时优雅的关闭channel,并且最后检测一遍currentThread是否被中断
end的作用是取消begin设置的blocker,并对线程中断/channel关闭等状态抛出相应异常
同样在SelectorImpl中也有begin/end对
protected int doSelect(long timeout)
throws IOException
{
if (closed)
throw new ClosedSelectorException();
processDeregisterQueue();
try {
begin();
pollWrapper.poll(timeout);
} finally {
end();
}
...
}
protected final void begin() {
if (interruptor == null) {
interruptor = new Interruptible() {
public void interrupt(Thread ignore) {
AbstractSelector.this.wakeup();//这里是唤醒当前selector
}};
}
AbstractInterruptibleChannel.blockedOn(interruptor);
Thread me = Thread.currentThread();
if (me.isInterrupted())
interruptor.interrupt(me);
}
end就不贴了,是一样的
wakeup做了什么?
public Selector wakeup() {
synchronized (interruptLock) {
if (!interruptTriggered) {//interruptTriggered默认是false的,当close的时候置为true(具体看代码)
pollWrapper.interrupt();//这里其实调用了native的interrupt(outgoingInterruptFD)方法
//这里中断了write的fd
interruptTriggered = true;
}
}
return this;
}
具体看pollWrapper.interrupt()怎么作用的
EPollSelectorImpl(SelectorProvider sp) {
super(sp);
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
pollWrapper = new EPollArrayWrapper();
//其实在selector构造函数的时候就已经初始化interrupt
pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<Integer,SelectionKeyImpl>();
}
EPollArrayWrapper:
void initInterrupt(int fd0, int fd1) {
outgoingInterruptFD = fd1;//写通道
incomingInterruptFD = fd0;//读通道
epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);//添加fd0兴趣事件为可读(EPOLLIN)
}
查看EPollArrayWrapper.c
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_interrupt(JNIEnv *env, jobject this, jint fd)
{
int fakebuf[1];
fakebuf[0] = 1;
//可以看到这里往fd1写入了一个字节,使写通道fd0变成readable状态,selector因为有事件就绪而中止阻塞(上面的epollCtl已经注册了fd0的read事件)
if (write(fd, fakebuf, 1) < 0) {
JNU_ThrowIOExceptionWithLastError(env,"write to interrupt fd failed");
}
}
总结selectorImpl的begin同样也设置了一个blocker,
当this.selector.select()阻塞时,要怎么中断呢,
selector在初始化的时候会先给自己分配2个用于中断的fd,一个读,一个写,然后给读fd注册一个感兴趣的读事件,这样在中断时只要给写fd写入一个字节,那么读事件就会感知到,使得selector被唤醒。
网友评论