美文网首页
CountDownLatch与CyclicBarrier

CountDownLatch与CyclicBarrier

作者: HannahLi_9f1c | 来源:发表于2020-05-13 15:08 被阅读0次

    CountDownLatch

    简介

    CountDownLatch使用AQS同步框架实现了多线程计时器。主线程等待所有线程完成任务之后,主线程再进行下一步动作。比如说吃鸡游戏要等待队友准备好后可以开始玩游戏,马拉松比赛等所有人跑完之后开始排名,都可以用CountDownLatch实现

    使用方式
    1. 用构造器初始化计时器初始值
    2. countDown()函数将当前计数返回,表示当前线程已完成
    3. await()用在主线程中等待所有线程返回。然后进行下一步动作
    CountDownLatch实现马拉松比赛
    1. 代码
    public class CountDownLatchDemo {
        public static Map<Long, Long> map = new TreeMap<Long, Long>();
        public static final int RUNNER = 10;
        private static class Runner implements Runnable{
            CountDownLatch countDownLatch;
            public Runner(CountDownLatch countDownLatch) {
                this.countDownLatch = countDownLatch;
            }
            @Override
            public void run() {
                long id = Thread.currentThread().getId();
                System.out.println("运动员"+id+"开始");
                long start = System.currentTimeMillis();
                int count = 0;
                for(int i = 0; i < 100000; i++) {
                    count++;
                }
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                long end = System.currentTimeMillis();
                System.out.println("运动员"+id+"用时"+(end-start));
                countDownLatch.countDown();
                map.put(id, start-end);
            }
        }
        public static void main(String[] args) {
            CountDownLatch countDownLatch = new CountDownLatch(RUNNER);
            for(int i = 0; i < RUNNER; i++) {
                new Thread(new Runner(countDownLatch)).start();
            }
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("比赛结束,开始排名");
            int count = 0;
            for(long id:map.keySet()) {
                count++;
                System.out.println("第"+count+"名:运动员"+id+"用时"+map.get(id));
            }
        }
    }
    
    
    1. 运行结果


      image.png
    实现原理

    CountDownLatch在内部构造一个静态sync类,sync类实现了AQS接口,AQS是由共享资源和同步双向队列组成,使用了模板方式。在使用AQS时实现独占锁时需要重写tryAcquire和tryRealse方法,在使用AQS实现共享锁时需要重写tryReleaseShared和tryAcquireShared。

    1. sync锁重写的tryAcquireShared,state值会在new CountDownLatch时初始化,然后通过countDown()方法将state值减一。tryAcquireShared()方法表示state值为0时返回1,否则返回-1.
       protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    
    1. sync重写的tryReleaseShared方法,就是不断循环通过CAS将把state值减一,state值为0时返回true。
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
    
    1. countDown方法做了什么?
    • 通过sync调用父类的releaseShared方法
        public void countDown() {
            sync.releaseShared(1);
        }
    
    • tryReleaseShared是子类重写的,将state值减一,减为0时会调用doRealaseShare方法唤醒同步队列中等待的线程。
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    
    • doRealeaseShared要保证可以并发释放锁,所以使用了CAS改变状态。从头结点中依次寻找等待中的结点并唤醒。
     private void doReleaseShared() {
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    
    
    1. await()方法内部原理
    • 调用AQS中acquireSharedInterruptibly方法,表示可以响应中断。
        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
    • tryAcquireShared判断state是否为0,为0返回1,方法返回。否则执行doAcquireSharedInterruptibly方法。
        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    
    • 通过addWaiter将共享结点入队,先CAS直接插入尾部,失败的话自旋CAS保证能够入队成功。然后获取前继结点,如果前继为head并且state值为0,将当前结点设为头结点,然后await方法可以返回。shouldParkAfterFailedAcquire是用来将前继结点标记为SIGNAL。
        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    实现旅游发签证
    1. 代码
    public class CyclicBarrierDemo {
        public static void main(String [] args) {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(3,new TourGuideTask());
            Executor executor = Executors.newFixedThreadPool(3);
            //登哥最大牌,到的最晚
            executor.execute(new TravelArrive(cyclicBarrier,222,5));
            executor.execute(new TravelArrive(cyclicBarrier,333,3));
            executor.execute(new TravelArrive(cyclicBarrier,444,1));
        }
    
    }
    class TourGuideTask implements Runnable{
    
        @Override
        public void run() {
            System.out.println("分发护照");
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    class TravelArrive implements Runnable{
        private CyclicBarrier cyclicBarrier;
        private int time;
        private int no;
        public TravelArrive(CyclicBarrier cyclicBarrier, int no, int time) {
            this.cyclicBarrier = cyclicBarrier;
            this.time = time;
            this.no = no;
        }
        @Override
        public void run() {
            try {
                Thread.sleep(time*100);
                System.out.println("游客"+no+"已到达");
                cyclicBarrier.await();
                System.out.println("游客"+no+"开始旅行");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }
    
        }
    }
    
    

    CyclicBarrier

    简介

    CyclicBarrier也是基于AQS实现的,但是可以复用计数器,如同栅栏一般。与CountDownLatch不同的是,CyclicBarrier循环使用计数器。在旅游时,导游会先等待游客到达之后统一发送护照和签证,这个场景就可以用CyclicBarrier实现

    1. 运行结果


      image.png

    相关文章

      网友评论

          本文标题:CountDownLatch与CyclicBarrier

          本文链接:https://www.haomeiwen.com/subject/qrvnnhtx.html