美文网首页
并发工具类-CountDownLatch(基于AQS共享模式)

并发工具类-CountDownLatch(基于AQS共享模式)

作者: 王侦 | 来源:发表于2019-07-15 16:43 被阅读0次

1.使用示例

public class UseCountDownLatch {
    static CountDownLatch latch = new CountDownLatch(6);

    //初始化线程(只有一步,有4个)
    private static class InitThread implements Runnable{

        @Override
        public void run() {
            System.out.println("Thread_"+Thread.currentThread().getId()
                    +" ready init work......");
            latch.countDown();//初始化线程完成工作了,countDown方法只扣减一次;
            for(int i =0;i<2;i++) {
                System.out.println("Thread_"+Thread.currentThread().getId()
                        +" ........continue do its work");
            }
        }
    }

    //业务线程
    private static class BusiThread implements Runnable{

        @Override
        public void run() {
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            for(int i =0;i<3;i++) {
                System.out.println("BusiThread_"+Thread.currentThread().getId()
                        +" do business-----");
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        //单独的初始化线程,初始化分为2步,需要扣减两次
        new Thread(new Runnable() {
            @Override
            public void run() {
                SleepTools.ms(1);
                System.out.println("Thread_"+Thread.currentThread().getId()
                        +" ready init work step 1st......");
                latch.countDown();//每完成一步初始化工作,扣减一次
                System.out.println("begin step 2nd.......");
                SleepTools.ms(1);
                System.out.println("Thread_"+Thread.currentThread().getId()
                        +" ready init work step 2nd......");
                latch.countDown();//每完成一步初始化工作,扣减一次
            }
        }).start();
        new Thread(new BusiThread()).start();
        for(int i=0;i< 4;i++){
            Thread thread = new Thread(new InitThread());
            thread.start();
        }

        latch.await();
        System.out.println("Main do ites work........");
    }
}

结果:

Thread_16 ready init work......
Thread_17 ready init work......
Thread_16 ........continue do its work
Thread_16 ........continue do its work
Thread_17 ........continue do its work
Thread_17 ........continue do its work
Thread_15 ready init work......
Thread_18 ready init work......
Thread_18 ........continue do its work
Thread_18 ........continue do its work
Thread_15 ........continue do its work
Thread_15 ........continue do its work
Thread_13 ready init work step 1st......
begin step 2nd.......
Thread_13 ready init work step 2nd......
BusiThread_14 do business-----
BusiThread_14 do business-----
BusiThread_14 do business-----
Main do ites work........

结果分析:BusiThread()和Main线程都调用了latch.await(),因此这两个线程要等四个InitThread()和main方法中的初始化线程完成所有初始化工作之后,才能从await处继续向后执行。

2.官方文档

A synchronization aid that allows one or more threads to wait until 
a set of operations being performed in other threads completes.

A CountDownLatch is initialized with a given count. The await 
methods block until the current count reaches zero due to 
invocations of the countDown() method, after which all waiting 
threads are released and any subsequent invocations of await 
return immediately. This is a one-shot phenomenon -- the count 
cannot be reset. If you need a version that resets the count, 
consider using a CyclicBarrier.

A CountDownLatch is a versatile synchronization tool and can be 
used for a number of purposes. A CountDownLatch initialized with 
a count of one serves as a simple on/off latch, or gate: all threads 
invoking await wait at the gate until it is opened by a thread 
invoking countDown(). A CountDownLatch initialized to N can be 
used to make one thread wait until N threads have completed 
some action, or some action has been completed N times.

A useful property of a CountDownLatch is that it doesn't require 
that threads calling countDown wait for the count to reach zero 
before proceeding, it simply prevents any thread from proceeding 
past an await until all threads could pass.

一种同步辅助工具,允许多个线程等待另外一些线程执行一组操作完成。

使用给定的count初始化CountDownLatch,await方法会阻塞直到count变为0(通过调用countDown()可以让count减小),此时所有等待线程将会被释放并且任何后续的await调用将立即返回。只能调用一次,count不能被重置。如果希望可以重置count,考虑使用CyclicBarrier。

初始化为N的CountDownLatch可使一个线程等待直到其他N个线程完成某个操作,或者某个操作完成N次。

不要求调用countDown的线程等待直到count变为0。

3.构造器和相关属性

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

4.await和countDown方法

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

tryAcquireShared是由CountDownLatch.Sync实现的。tryAcquireShared,如果getState()不为0,也即count不为0,则获取锁失败,此时线程将加入同步队列并阻塞。如果count已经为0,则直接获取成功。

这里不会对count进行任何更新操作,只是判断是否为0.

    public void countDown() {
        sync.releaseShared(1);
    }
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

tryReleaseShared是由CountDownLatch.Sync实现的:

  • 如果count已经为0,则直接返回false,表明此时已经不用再调用countDown()
  • 否则将count减一,如果count减为0,则返回true。返回true时,会调用doReleaseShared(),此时会唤醒所有等待在同步队列中的线程。
    /**
     * Release action for shared mode -- signals successor and ensures
     * propagation. (Note: For exclusive mode, release just amounts
     * to calling unparkSuccessor of head if it needs signal.)
     */
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

相关文章

网友评论

      本文标题:并发工具类-CountDownLatch(基于AQS共享模式)

      本文链接:https://www.haomeiwen.com/subject/hmvnkctx.html