多线程之并发类CountDownLatch、CyclicBarr

作者: 爱打乒乓的程序员 | 来源:发表于2019-04-18 21:51 被阅读62次

CountDownLatch

并发类CountDownLatch是什么?

CountDownLatch:一个或多个线程等待其他线程完成操作。

什么情况下使用?

某一个动作需要等待其它线程完成后才会触发。
举个栗子,一个班上50个人,考完试之后需要计算全班同学的总成绩,这种情况使用CountDownLatch并发类就最合适了,每一个人的成绩等于一个线程,需要等待50个线程执行完之后才能执行最后一个动作(计算总成绩)。

如何使用并发类CountDownLatch?

代码示例:

public class CountDownLatchDemo implements Runnable {

    private static final int threadCount = 3;

    private static CountDownLatch countDownLatch = new CountDownLatch(threadCount);

    private static double sum = 0D;

    public String name;

    public CountDownLatchDemo() {
    }

    public CountDownLatchDemo(String name) {
        this.name = name;
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
        executorService.execute(new CountDownLatchDemo("A同学"));
        executorService.execute(new CountDownLatchDemo("B同学"));
        executorService.execute(new CountDownLatchDemo("C同学"));
        countDownLatch.await();
        System.out.println("所有人的总成绩 " + sum);
    }

    @Override
    public void run() {
        double grade = Math.round(Math.random() * 100);
        System.out.println("计算 " + name + " 成绩,成绩是" + grade);
        sum += grade;
        countDownLatch.countDown();
    }
}

输出结果:

计算 C同学 成绩,成绩是26.0
计算 B同学 成绩,成绩是18.0
计算 A同学 成绩,成绩是53.0
所有人的总成绩 97.0

使用该并发类并不难,熟悉几个主要方法就可以。

  • CountDownLatch(int count): 参数count为计数值
  • await(): 调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
  • await(long timeout,TimeUnit unit): 和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
  • countDown(): 将count值减1
  • getCount():获取count值

好了,知道大概怎么使用这个并发类,接下来开始撸源码!

public class CountDownLatch {
    //内部使用Sync继承AQS
    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            //设置计算值
            setState(count);
        }

        //获取计算值
        int getCount() {
            return getState();
        }

        //尝试获取共享锁
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        //尝试释放锁
        protected boolean tryReleaseShared(int releases) {
            for (; ; ) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

    private final Sync sync;

    //CountDownLatch唯一的构造器,参数count值为阻塞线程数
    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

    //阻塞线程,直到count值为0
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    //限定阻塞时间,超出限定时间后,count值还没为0都会执行之后的线程
    public boolean await(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    //释放锁操作,每调用一次,count值减一
    public void countDown() {
        sync.releaseShared(1);
    }

    //获取count值
    public long getCount() {
        return sync.getCount();
    }

    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}

大概流程就是,先调用CountDownLatch构造器设置计算值(count),当主线程(main方法)执行到await()方法的时候就会阻塞主线程,然后执行线程池内的任务,通过执行countDown()方法,将count值不断的减1,直到count值为0的时候就会释放锁,继续执行主线程。

CyclicBarrier

CyclicBarrier:一组线程相互等待,达到一个共同点再继续执行。
比较困惑的一点是,CyclicBarrier和CountDownLatch两个并发类有什么区别?从概念上和例子上来看,貌似都差不多呀。
参考多篇大佬博客,总结出以下个人观点:

源码上区别:
CountDownLatch基于AQS实现的;设置了等待线程数后无法重置;
CyclicBarrier基于Condition和ReentrantLock锁实现;设置等待线程数后可以调用reset()重置;

概念上区别:
CountDownLatch:一个或多个线程等待其他线程执行完成后再执行。(关键词:“其他线程执行完成后”)

例子:线程S需要等待线程A、B、C都执行完成后才会执行。


CyclicBarrier:多个线程相互等待,到达阻塞点(屏障)后等待其他线程,直到所有线程都达到阻塞点后,阻塞解除并唤醒所有等待线程,所有的线程都继续往下执行。(关键词:“阻塞点”)

举个栗子:小A、小B,小C三位朋友约定在广州塔见面之后去喝早茶,分别从三处不同的地方赶往广州塔碰面。但由于小C迟到,小A,小B等待小C,直到小C都到达广州塔后再一同去喝早茶。使用并发类CyclicBarrier解释就是,线程A,线程B被阻塞,直到线程C执行到阻塞点后阻塞解除,唤醒所有线程。线程A、B、C都继续往下执行。


代码示例:

public class CyclicBarrierDemo implements Runnable {

    public static int count = 3;

    public String name;

    public CyclicBarrierDemo(String name) {
        this.name = name;
    }

    public CyclicBarrierDemo() {
    }

    public static CyclicBarrier cyclicBarrier = new CyclicBarrier(count, new Runnable() {
        @Override
        public void run() {
            System.out.println("大家启程去茶楼,喝茶去噜...");
        }
    });

    public static void main(String[] args) {
        //创建线程池  
        ExecutorService executor = Executors.newFixedThreadPool(count);
        try {
            //执行线程  
            executor.execute(new CyclicBarrierDemo("小A"));
            executor.execute(new CyclicBarrierDemo("小B"));
            executor.execute(new CyclicBarrierDemo("小C"));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
        }
    }

    @Override
    public void run() {
        try {
            if ("小C".equals(name)) {
                System.out.println(name + "路上塞车");
                Thread.sleep(1000);
            }
            System.out.println(this.name + "到达广州塔... ");
            cyclicBarrier.await();
            System.out.println(this.name + "到达茶楼 ");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

运行结果:

小B到达广州塔... 
小C路上塞车
小A到达广州塔... 
小C到达广州塔... 
大家启程去茶楼,喝茶去噜...
小C到达茶楼 
小B到达茶楼 
小A到达茶楼 

接下来,膜拜Doug Lea写的源码

public class CyclicBarrier {
    //Generation是CyclicBarrier的一个私有内部类,只有一个成员变量broken来标识当前的barrier是否已“损坏”:
    private static class Generation {
        boolean broken = false;
    }

    private final ReentrantLock lock = new ReentrantLock();
    private final Condition trip = lock.newCondition(); //通过lock得到的一个状态变量
    private final int parties;  //表示总的等待线程的数量。
    private final Runnable barrierCommand;  //当屏障(barrier)撤销时,需要执行的操作
    private Generation generation = new Generation();  //当前的Generation。每当屏障失效或者开闸之后都会自动替换掉。从而实现重置的功能。

    private int count;//实际中仍在等待的线程数,每当有一个线程到达屏障点,count值就会减一;当一次新的运算开始后,count的值被重置为parties。

    //唤醒等待线程,设置下一个Generation
    private void nextGeneration() {
        // 唤醒所有等待线程
        trip.signalAll();
        // 重置进入屏障的线程数量
        count = parties;
        // 新生一代
        generation = new Generation();
    }

    //将当前屏障置为破坏状态、重置count、并唤醒所有被阻塞的线程。
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

    private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
            TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();   //获取锁
        try {
            //保存此时的generation
            final Generation g = generation;
            //判断屏障是否被破坏
            if (g.broken)
                throw new BrokenBarrierException();
            //判断线程是否被中断
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            //正在等待进入屏障的线程数量减一
            int index = --count;
            //判断等待进入屏障的线程数量是否为0
            if (index == 0) {
                // 运行的动作标识
                boolean ranAction = false;
                try {
                    // 保存运行动作   
                    final Runnable command = barrierCommand;
                    // 如果动作不为空则运行
                    if (command != null)
                        command.run();
                    // 设置运行的动作状态
                    ranAction = true;
                    // 进入下一代,唤醒所有线程;重置count和generation值.
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
            // index不为0,无限循环
            for (; ; ) {
                try {
                    // 如果没有设置定时就一直等待
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)  // 如果设置了定时
                        nanos = trip.awaitNanos(nanos);  // 等待定时的时间后,重新唤醒
                } catch (InterruptedException ie) {
                    if (g == generation && !g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }
                // 取消阻塞后,重新判断状态
                if (g.broken)
                    throw new BrokenBarrierException();
                if (g != generation)
                    return index;
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        //表示必须同时到达屏障(barrier)的线程个数
        this.parties = parties;
        //表示处于等待状态的线程个数
        this.count = parties;
        //表示当等待线程数为0时,会执行的动作
        this.barrierCommand = barrierAction;
    }

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

    //返回参与相互等待的线程数
    public int getParties() {
        return parties;
    }

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe);
        }
    }

    public int await(long timeout, TimeUnit unit)
            throws InterruptedException,
            BrokenBarrierException,
            TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

    // 判断此屏障是否处于中断状态。
    public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }

    //将屏障重置为其初始状态。
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();
            nextGeneration();
        } finally {
            lock.unlock();
        }
    }

    //返回当前在屏障处等待的参与者数目,此方法主要用于调试和断言。
    public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }
}

其中,使用CyclicBarrier并发类的核心就是dowait方法,结合源码分析,得出以下的流程图:

CyclicBarrier-dowait-flow.png

Semaphor

并发类Semaphor是什么?

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

什么情况下使用Semaphor?

用于限制获取某种资源的线程数量。

怎样使用Semaphor?

public class SemaphoreDemo {

    private static final int permits = 4;

    private static Semaphore semaphore = new Semaphore(permits);

    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < 12; i++) {
            final int index = i;
            executor.execute(() -> {
                try {
                    semaphore.acquire();
                    show(index);
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }

    public static void show(int index) throws InterruptedException {
        Thread.sleep(1000);
        System.out.println(new Date().toString() + " --- " + Thread.currentThread().getName());
    }
}

运行结果:

Tue Feb 12 10:56:30 CST 2019 --- pool-1-thread-1
Tue Feb 12 10:56:30 CST 2019 --- pool-1-thread-2
Tue Feb 12 10:56:30 CST 2019 --- pool-1-thread-6
Tue Feb 12 10:56:30 CST 2019 --- pool-1-thread-5
Tue Feb 12 10:56:31 CST 2019 --- pool-1-thread-7
Tue Feb 12 10:56:31 CST 2019 --- pool-1-thread-4
Tue Feb 12 10:56:31 CST 2019 --- pool-1-thread-8
Tue Feb 12 10:56:31 CST 2019 --- pool-1-thread-3
Tue Feb 12 10:56:32 CST 2019 --- pool-1-thread-9
Tue Feb 12 10:56:32 CST 2019 --- pool-1-thread-12
Tue Feb 12 10:56:32 CST 2019 --- pool-1-thread-10
Tue Feb 12 10:56:32 CST 2019 --- pool-1-thread-11

由运行结果可以看出,每隔一秒就有4个线程执行,可以说明,使用并发类Semaphor可以限制并发线程数量。

使用并发类Semaphor十分简单,只需要掌握几个常用的方法。

  • Semaphore(int permits):创建给定的许可数和使用默认非公平策略。
  • Semaphore(int permits,boolean fair):创建给定的许可数和根据fair判断使用非公平或公平策略。
  • acquire():从信号量获取一个许可,在获取到一个许可前一直将线程阻塞,或者线程被中断。
  • acquire(int n):从信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。
  • release():释放一个许可,将其返回给信号量。
  • release(int n):释放给定数目的许可,将其返回到信号量。
  • availablePermits():返回此信号量中当前可用的许可数。

接下来,开始撸源码!

public class Semaphore implements java.io.Serializable {
    private static final long serialVersionUID = -3222578661600680210L;
    private final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }

        //获取当前许可数
        final int getPermits() {
            return getState();
        }

        final int nonfairTryAcquireShared(int acquires) {
            for (; ; ) {
                //获取当前许可数
                int available = getState();
                //剩余许可数
                int remaining = available - acquires;
                //如果剩余许可数小于0或者通过CAS操作成功设置剩余许可数为当前许可数则返回剩余许可数
                if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        //尝试释放共享锁
        protected final boolean tryReleaseShared(int releases) {
            for (; ; ) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        //通过CAS自旋减少许可数
        final void reducePermits(int reductions) {
            for (; ; ) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        //通过CAS自旋清空许可数
        final int drainPermits() {
            for (; ; ) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (; ; ) {
                //获取共享锁与非公平模式的唯一不同就是,公平模式下会判断是否有线程在等待队列中
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

    //创建给定的许可数和使用默认非公平模式。
    public Semaphore(int permits) {
        sync = new Semaphore.NonfairSync(permits);
    }

    //创建给定的许可数和根据fair判断使用非公平或公平模式。
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new Semaphore.FairSync(permits) : new Semaphore.NonfairSync(permits);
    }

    //从信号量获取一个许可,在获取到一个许可前一直将线程阻塞,或者线程被中断
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    //从信号量获取一个许可,不会中断
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }

    //尝试获取一个许可
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }

    //尝试在给定时间内获取一个许可
    public boolean tryAcquire(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    //尝试获取给定数量的许可数
    public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }

    //尝试在规定时间内获取给定的许可数
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
            throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }

    //释放一个许可
    public void release() {
        sync.releaseShared(1);
    }

    //释放给定数量的许可
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

    //从信号量获取给定许可,不会中断
    public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }

    //释放给定数量的许可数
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

    //获取当前可用的许可数
    public int availablePermits() {
        return sync.getPermits();
    }

    //清空许可数
    public int drainPermits() {
        return sync.drainPermits();
    }

    //减少许可数
    protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }

    //是否使用公平模式
    public boolean isFair() {
        return sync instanceof Semaphore.FairSync;
    }

    //判断是否有等待线程
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    //获取等待队列线程的数量
    public final int getQueueLength() {
        return sync.getQueueLength();
    }

    //获取等待队列线程集合
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }

    public String toString() {
        return super.toString() + "[Permits = " + sync.getPermits() + "]";
    }
}

结合源码,画了一张使用并发类Semaphor非公平模式下调用acquire()方法的流程图:

Semaphor流程图

参考资料:
https://blog.csdn.net/caoxiaohong1005/article/details/80000062
https://www.cnblogs.com/go2sea/p/5625536.html
https://blog.csdn.net/qq_19431333/article/details/70212663
《Java并发编程的艺术》

相关文章

网友评论

    本文标题:多线程之并发类CountDownLatch、CyclicBarr

    本文链接:https://www.haomeiwen.com/subject/gfbewqtx.html