美文网首页
RocketMQ Runtime ServiceThread的设

RocketMQ Runtime ServiceThread的设

作者: affe | 来源:发表于2020-04-19 08:59 被阅读0次

最近正好在看Java Concurrency In Practice(以下简称JCIP), 里面的很多思想都在rocketmq runtime的源码中有所体现,因此就尝试着从ServiceThread这个类出发,看看我能从这本书里悟到多少东西。

我首先要做的是猜ServiceThread是用来解决什么问题的。在我看来,ServiceThread主要有如下几个特性

  • 当处于stopped 状态的时候,不执行任何任务
  • 当不处于stopped状态的时候,用户调用waitForRunning()可以启动一段定时器,并阻塞一段时间,或者
  • 使用wakeup()立即结束跑完当前的定时器,立即退出阻塞状态

一个典型的使用场景是某个Service 直接集成ServiceThread,run()方法重写如下

    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            this.waitForRunning(10000);
             this.rebalanceImpl.doRebalance();
        }

        log.info(this.getServiceName() + " service end");
    }

这段代码就是告诉线程,每隔十秒钟跑一次rebalanceImpl.doRebalance()(当然这个函数根据业务不同而异)
然后其他地方可以显式调用wakeup(),在还没到十秒的时候后就执行rebalanceImpl.doRebalance(),比如当有worker变化的时候,我们要立刻触发一次doRebalance()

    class WorkerStatusListenerImpl implements ClusterManagementService.WorkerStatusListener {

        /**
         * When alive workers change.
         */
        @Override
        public void onWorkerChange() {
            log.info("Wake up rebalance service");
            RebalanceService.this.wakeup();
        }
    }

ServiceThread 实现了Runnable接口,但是同时也持有一个Thread对象,至于为什么要这么设计我还不是很懂。

成员变量

ServiceThread的主要成员变量如下

    protected final Thread thread;
    protected final CountDownLatch2 waitPoint = new CountDownLatch2(1);
    protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
    protected volatile boolean stopped = false;
    private static final long JOIN_TIME = 90 * 1000;
  • Thread thread: 用来跑ServiceThread的run()方法,不过我不知道这是不是一个标准的自定义的thread的实现(因为用到了Composition Over Inheritance 的理念,感觉应该是很标准的写法)

  • CountDownLatch2 waitPoint: 这个CountDownLatch2是RocketMQ Connect自己基于AQS实现的一个同步器,和CountDownLatch不同的是它支持reset(),也就是可以被反复利用。我们知道通常来说CyclicBarrier也能被反复利用,他们区别在哪里呢?一个是用来等待事件完成,一个是用来等待其他线程执行到某一步,等我们深入了解CountDownLatch的具体实现时再进行讲解。在ServiceThread里,这个waitPoint主要用来提供我们之前说的特性里的第2,3点:既能定时执行任务,又能通过其他显式调用方法“提前执行任务”,具体实现之后会讲到。

  • volatile AtomicBoolean hasNotified: 这个变量是用来保证当用户调用waitForRunning()时,如果发现已经有线程调用了wakeup(),但是waitPoint还没有被释放的时候(count还不为0),可以立即退出(因为马上waitPoint就要被释放了)。这是由于,可能出于效率的考虑,wakeup()并不是atomic的,它会首先将hasNotified设置为true,告诉其他人,我马上就要把count减为0了(也就是释放这个CountDownLatch),然后再紧接着把count减为0。这两步并不是原子的。之后在讲核心方法的时候会更详细的解说。

  • volatile boolean stopped: 这个比较简单,它不需要原子性的原因在于,stopped被设置成True之后是允许上层代码继续执行 waitForRuning(),也就是说具体停了之后还跑不跑waitForRunning()是由上层代码决定,stopped只是提供给上层一个“君子协议”一样的标志位:有人告诉我应该stop了,所以你最好check一下是不是stopped了,如果stopped了就不要再调用waitForRunning()了。stopped并不在waitForRunning()里起任何作用。

问题:如果有两个线程同时调用waitForRunning(),第三个线程调用wakeup(),那此时两个线程都被允许继续执行,那么这是expected behaviour嘛

核心方法

最重要的方法就如下所示:

    protected void waitForRunning(long interval) {
        // if hasNotified == True, means either wakeup() or stop() 
        // has been executed by another thread
        if (hasNotified.compareAndSet(true, false)) {
            this.onWaitEnd();
            return;
        }

        //entry to wait
        waitPoint.reset();

        try {
            waitPoint.await(interval, TimeUnit.MILLISECONDS);
            // TODO why we swallowed the exceptions here
        } catch (InterruptedException e) {
        } finally {
            hasNotified.set(false);
            this.onWaitEnd();
        }
    }

那我们先看第一个CAS操作,首先如果hasNotified是false的话,意味着要么没有其他线程执行了waitForRunning(), 要么有其他线程执行了waitForRunnning()只是没有任何的wakeup()。那这时会继续执行到waitPoint.reset(),也就是即使之前有人已经调用过waitForRunning()了,我们现在也给它reset()掉,重新开始计时。如果是True的话,意味着已经有一个wakeup()或者stop()被执行了,于是此次调用把hasNotified设置为false,相当于清空标志位,下一次waitForRunning就可以正常进行了。这里有个值得注意的点在于,假设hasNotified是True,但是wakeup()里的waitPoint.countDown();仍然没有被执行,所以理论上正在执行waitPoint.await(...)的前一个线程没有被释放,而立刻调用waitForRunning()的线程因为看到hasNotified是true之后,将hasNotified设置为false后就退出了。

一开始我会想,这可能是个不公平的实现,因为最开始调用waitForRunning()的线程需要等到waitPoint.await()才能执行,而当前因为提前return而继续执行的线程之则完全不用经过waitPoint.await()。当然这样也不会有太大的差别,尤其是考虑到几乎不会有多个线程同时调用waitForRunning()的情况下()

这样做应该没有问题的,因为我们认为waitPoint.countDown()立即就会执行,所以最终waitPoint.await()也会很快被释放:要么因为timeout或者

问题:waitPoint.countDown() 以及 waitPoint.await()之间能保证不插入其他任何指令吗,也就是说,waitPoint.countDown()只要变成0之后,一定会让当时正在等待的waitPoint.await()得到释放吗?我觉得应该要这样才对,否则如果在waitPoint.countDown() 中有另外一个线程调用了waitPoint.reset(),那么await()就不会被立即释放了,之后可以去CountDownLatch2里仔细看看是不是有这样的保证。

    public void wakeup() {
        // TODO end the waiting before interval happens
        if (hasNotified.compareAndSet(false, true)) {
            waitPoint.countDown(); // notify
        }
    }

在wakeup()里,首先使用CAS将hasNotified设置为true,然后调用waitPoint.countDown(),这两步如前所述不是原子性的,但我们证明了即使这样写也没有什么问题。(至少我现在还看不出来)

一些其他的思考

注意这一段

        try {
            waitPoint.await(interval, TimeUnit.MILLISECONDS);
            // TODO why we swallowed the exceptions here
        } catch (InterruptedException e) {
        } finally {
            hasNotified.set(false);
            this.onWaitEnd();
        }

我们catch住了InterruptedException e但是却没有做任何处理(也就是swallow吞掉了这个Exception),按照JCIP里的说法,interrupt通常的语义是cancellation,只有最顶层的调用代码应该对InterruptedException做处理,而所有不拥有这个Thread的代码要么:继续抛出InterruptedException, 要么将interrupted标志位重新设置为true。

基于这个原则,我们就应该判断,ServiceThread是不是应该来swallow这个InterruptedException。通常情况下waitPoint.await() 会跑在调用者的线程内,而会调用waitForRunning()的线程就是ServiceThread所持有的那个Thread object

    public ServiceThread() {
        this.thread = new Thread(this, this.getServiceName());
    }

而一般来说waitForRunning只会在这个Runnable的run()方法中被调用,因run()方法才是最上层的调用者,看起来说waitForRunning()不应该去swallow这个InterruptedException。当然这只是理论,实际上,我们知道即使这个InterruptedException被吞掉了也无伤大雅:如果被interrupt,那就当成被wakeup(),直接提前结束。

不过现在又有个问题了,什么情况下,waitPoint.await()会被Interrupt呢?是Crtl + C嘛?我们又要去看JVM的Interrupt是怎么传播的,以及为什么我按了Ctrl+C之后java程序就结束了。

当然这里还有一些publish相关的问题,如果waitForRunning只应该在run()中使用的话,是不是waitForRunning就不应该是public的呢?

相关文章

网友评论

      本文标题:RocketMQ Runtime ServiceThread的设

      本文链接:https://www.haomeiwen.com/subject/lbbevhtx.html