最近正好在看Java Concurrency In Practice(以下简称JCIP), 里面的很多思想都在rocketmq runtime的源码中有所体现,因此就尝试着从ServiceThread这个类出发,看看我能从这本书里悟到多少东西。
我首先要做的是猜ServiceThread是用来解决什么问题的。在我看来,ServiceThread主要有如下几个特性
- 当处于stopped 状态的时候,不执行任何任务
- 当不处于stopped状态的时候,用户调用
waitForRunning()
可以启动一段定时器,并阻塞一段时间,或者 - 使用
wakeup()
立即结束跑完当前的定时器,立即退出阻塞状态
一个典型的使用场景是某个Service 直接集成ServiceThread,run()方法重写如下
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
this.waitForRunning(10000);
this.rebalanceImpl.doRebalance();
}
log.info(this.getServiceName() + " service end");
}
这段代码就是告诉线程,每隔十秒钟跑一次rebalanceImpl.doRebalance()(当然这个函数根据业务不同而异)
然后其他地方可以显式调用wakeup(),在还没到十秒的时候后就执行rebalanceImpl.doRebalance(),比如当有worker变化的时候,我们要立刻触发一次doRebalance()
class WorkerStatusListenerImpl implements ClusterManagementService.WorkerStatusListener {
/**
* When alive workers change.
*/
@Override
public void onWorkerChange() {
log.info("Wake up rebalance service");
RebalanceService.this.wakeup();
}
}
ServiceThread 实现了Runnable接口,但是同时也持有一个Thread对象,至于为什么要这么设计我还不是很懂。
成员变量
ServiceThread的主要成员变量如下
protected final Thread thread;
protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
protected volatile boolean stopped = false;
private static final long JOIN_TIME = 90 * 1000;
-
Thread thread
: 用来跑ServiceThread的run()方法,不过我不知道这是不是一个标准的自定义的thread的实现(因为用到了Composition Over Inheritance 的理念,感觉应该是很标准的写法) -
CountDownLatch2 waitPoint
: 这个CountDownLatch2是RocketMQ Connect自己基于AQS实现的一个同步器,和CountDownLatch不同的是它支持reset(),也就是可以被反复利用。我们知道通常来说CyclicBarrier也能被反复利用,他们区别在哪里呢?一个是用来等待事件完成,一个是用来等待其他线程执行到某一步,等我们深入了解CountDownLatch的具体实现时再进行讲解。在ServiceThread里,这个waitPoint主要用来提供我们之前说的特性里的第2,3点:既能定时执行任务,又能通过其他显式调用方法“提前执行任务”,具体实现之后会讲到。 -
volatile AtomicBoolean hasNotified
: 这个变量是用来保证当用户调用waitForRunning()时,如果发现已经有线程调用了wakeup(),但是waitPoint还没有被释放的时候(count还不为0),可以立即退出(因为马上waitPoint就要被释放了)。这是由于,可能出于效率的考虑,wakeup()并不是atomic的,它会首先将hasNotified设置为true,告诉其他人,我马上就要把count减为0了(也就是释放这个CountDownLatch),然后再紧接着把count减为0。这两步并不是原子的。之后在讲核心方法的时候会更详细的解说。 -
volatile boolean stopped
: 这个比较简单,它不需要原子性的原因在于,stopped被设置成True之后是允许上层代码继续执行 waitForRuning(),也就是说具体停了之后还跑不跑waitForRunning()是由上层代码决定,stopped只是提供给上层一个“君子协议”一样的标志位:有人告诉我应该stop了,所以你最好check一下是不是stopped了,如果stopped了就不要再调用waitForRunning()了。stopped并不在waitForRunning()里起任何作用。
问题:如果有两个线程同时调用waitForRunning(),第三个线程调用wakeup(),那此时两个线程都被允许继续执行,那么这是expected behaviour嘛
核心方法
最重要的方法就如下所示:
protected void waitForRunning(long interval) {
// if hasNotified == True, means either wakeup() or stop()
// has been executed by another thread
if (hasNotified.compareAndSet(true, false)) {
this.onWaitEnd();
return;
}
//entry to wait
waitPoint.reset();
try {
waitPoint.await(interval, TimeUnit.MILLISECONDS);
// TODO why we swallowed the exceptions here
} catch (InterruptedException e) {
} finally {
hasNotified.set(false);
this.onWaitEnd();
}
}
那我们先看第一个CAS操作,首先如果hasNotified是false的话,意味着要么没有其他线程执行了waitForRunning(), 要么有其他线程执行了waitForRunnning()只是没有任何的wakeup()。那这时会继续执行到waitPoint.reset(),也就是即使之前有人已经调用过waitForRunning()了,我们现在也给它reset()掉,重新开始计时。如果是True的话,意味着已经有一个wakeup()或者stop()被执行了,于是此次调用把hasNotified设置为false,相当于清空标志位,下一次waitForRunning就可以正常进行了。这里有个值得注意的点在于,假设hasNotified是True,但是wakeup()里的waitPoint.countDown();
仍然没有被执行,所以理论上正在执行waitPoint.await(...)的前一个线程没有被释放,而立刻调用waitForRunning()的线程因为看到hasNotified是true之后,将hasNotified设置为false后就退出了。
一开始我会想,这可能是个不公平的实现,因为最开始调用waitForRunning()的线程需要等到waitPoint.await()才能执行,而当前因为提前return
而继续执行的线程之则完全不用经过waitPoint.await()。当然这样也不会有太大的差别,尤其是考虑到几乎不会有多个线程同时调用waitForRunning()的情况下()
这样做应该没有问题的,因为我们认为waitPoint.countDown()立即就会执行,所以最终waitPoint.await()也会很快被释放:要么因为timeout或者
问题:waitPoint.countDown() 以及 waitPoint.await()之间能保证不插入其他任何指令吗,也就是说,waitPoint.countDown()只要变成0之后,一定会让当时正在等待的waitPoint.await()得到释放吗?我觉得应该要这样才对,否则如果在waitPoint.countDown() 中有另外一个线程调用了waitPoint.reset(),那么await()就不会被立即释放了,之后可以去CountDownLatch2里仔细看看是不是有这样的保证。
public void wakeup() {
// TODO end the waiting before interval happens
if (hasNotified.compareAndSet(false, true)) {
waitPoint.countDown(); // notify
}
}
在wakeup()里,首先使用CAS将hasNotified设置为true,然后调用waitPoint.countDown(),这两步如前所述不是原子性的,但我们证明了即使这样写也没有什么问题。(至少我现在还看不出来)
一些其他的思考
注意这一段
try {
waitPoint.await(interval, TimeUnit.MILLISECONDS);
// TODO why we swallowed the exceptions here
} catch (InterruptedException e) {
} finally {
hasNotified.set(false);
this.onWaitEnd();
}
我们catch住了InterruptedException e但是却没有做任何处理(也就是swallow吞掉了这个Exception),按照JCIP里的说法,interrupt通常的语义是cancellation,只有最顶层的调用代码应该对InterruptedException做处理,而所有不拥有这个Thread的代码要么:继续抛出InterruptedException, 要么将interrupted标志位重新设置为true。
基于这个原则,我们就应该判断,ServiceThread是不是应该来swallow这个InterruptedException。通常情况下waitPoint.await() 会跑在调用者的线程内,而会调用waitForRunning()的线程就是ServiceThread所持有的那个Thread object
public ServiceThread() {
this.thread = new Thread(this, this.getServiceName());
}
而一般来说waitForRunning只会在这个Runnable的run()方法中被调用,因run()方法才是最上层的调用者,看起来说waitForRunning()不应该去swallow这个InterruptedException。当然这只是理论,实际上,我们知道即使这个InterruptedException被吞掉了也无伤大雅:如果被interrupt,那就当成被wakeup(),直接提前结束。
不过现在又有个问题了,什么情况下,waitPoint.await()会被Interrupt呢?是Crtl + C嘛?我们又要去看JVM的Interrupt是怎么传播的,以及为什么我按了Ctrl+C之后java程序就结束了。
当然这里还有一些publish相关的问题,如果waitForRunning只应该在run()中使用的话,是不是waitForRunning就不应该是public的呢?
网友评论