并发十二:CountDownLatch、CyclicBarrie

作者: wangjie2016 | 来源:发表于2018-04-14 15:26 被阅读94次

    J.U.C中提供了三个同步工具CountDownLatch、CyclicBarrier、Semaphore,都是共享锁的特殊应用,用来进行线程的任务协调。

    CountDownLatch

    一个小栗子:

    public class CountDownLatchTest {
        public static void main(String[] args) {
            final CountDownLatch latch = new CountDownLatch(2);
            new Thread() {
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + ":二级表生成");
                        Thread.sleep(10000);
                        latch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                };
            }.start();
    
            new Thread() {
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + ":二级表生成");
                        Thread.sleep(10000);
                        latch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                };
            }.start();
    
            new Thread() {
                public void run() {
                    try {
                        System.out.println("等待二级表生成完成");
                        latch.await();
                        System.out.println(Thread.currentThread().getName() + ":汇总统计");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                };
            }.start();
        }
    }
    

    输出:"Thread-0:二级表生成、Thread-1:二级表生成、等待二级表生成完成",然后开始等待,直到Thread-0、Thread-1执行完成,然后"Thread-2:汇总统计"。

    CountDownLatch是一个倒计时式的计数器,允许线程等待其他N个线程先执行完毕,再开始执行。

    CountDownLatch基于AQS,是一个共享锁,await()使当前线程阻塞等待,countDown()计数器递减。

    // CountDownLatch aqs源码:
    ‘’private static final class Sync 
                    extends AbstractQueuedSynchronizer {
        Sync(int count) {
            setState(count);
        }
        int getCount() {
            return getState();
        }
        //加锁
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
        //解锁
        protected boolean tryReleaseShared(int releases) {
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
    

    count是倒计时的初始数。

    await()调用tryAcquireShared(1)方法获取锁,根据共享锁的实现返回值小于0时线程会被阻塞等待,也就是只有当state==0,才能成功获取锁。

    countDown()调用tryReleaseShared(1)方法进行解锁,当state值为0时,共享锁才算完全释放,会唤醒队列里等待的线程。

    CountDownLatch没有复位操作,当state的值为0时再调用await()就不会阻塞线程了,所以CountDownLatch只能使用一次。

    CyclicBarrier

    一个小栗子:

    public class CyclicBarrierTest {
        public static void main(String[] args) {
            final CyclicBarrier barrier = new CyclicBarrier(3);
            new Thread() {
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + ":计算完成");
                        barrier.await();
                        System.out.println(Thread.currentThread().getName() + ":入库");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                };
            }.start();
    
            new Thread() {
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + ":计算完成");
                        barrier.await();
                        System.out.println(Thread.currentThread().getName() + ":入库");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                };
            }.start();
    
            try {
                Thread.sleep(10000);// 等待
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
            
            new Thread() {
                public void run() {
                    try {
                        System.out.println(Thread.currentThread().getName() + ":计算完成");
                        barrier.await();
                        System.out.println(Thread.currentThread().getName() + ":入库");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                };
            }.start();
        }
    }
    

    输出结果:"Thread-0:计算完成、Thread-1:计算完成",等待,直到"Thread-2:计算完成",然后"Thread-0:入库、Thread-1:入库、Thread-2:入库"。

    CyclicBarrier是可循环的同步屏障,将N个线程进行阻塞,直到阻塞线程的数量到达屏障点时屏障被打破,这N个线程才会继续执行。

    CyclicBarrier使用一个重入锁实现,初始化时传入屏障点parties,即要阻塞的线程数量,还可以传入一个Runnable的实现barrierAction,它会在屏障打破时执行。在屏障未打破前调用await()方法的线程都会被阻塞。

    public class CyclicBarrier {
        /** 屏障状态 */
        private static class Generation {
            boolean broken = false;
        }
        /** 重入锁 */
        private final ReentrantLock lock = new ReentrantLock();
        /** condition */
        private final Condition trip = lock.newCondition();
        /** 屏障点 */
        private final int parties;
        /** 到达屏障点执行的类 */
        private final Runnable barrierCommand;
        /** 当前状态 */
        private Generation generation = new Generation();
        /** 计数 */
        private int count;
        ... ...
    }
    

    阻塞放行流程:

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException, TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();//加锁
        try {
            //当前状态
            final Generation g = generation;
            if (g.broken)//被打破 s1
                throw new BrokenBarrierException();
            //中断
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            //计数
            int index = --count;
            //到达屏障点 s3
            if (index == 0) { 
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    //先执行barrierCommand
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //唤醒所以阻塞线程
                    //重新实例化generation
                    //复位操作
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }
            for (;;) {// s2
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && !g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }
                if (g.broken)
                    throw new BrokenBarrierException();
                if (g != generation) {
                    System.out.println("退出");
                return index;
                }
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }
    

    s1: g.broken==true说明屏障被打破,这时再调用await()会抛出异常。

    如果线程中断则打破屏障并抛出异常,

    计数器count递减并赋给index,index==0进入s2,否则进入s3

    s2:index==0说明被拦截的线程数量已经达到屏障点,如果barrierAction不为空,则直接调用run方法,让其先执行。

    nextGeneration()会唤醒所有在trip上等待的线程、重新赋值count为初始值parties,new 一个Generation赋给generation,这样一来CyclicBarrier就恢复如初了,可以被重新使用,返回0。

    s3: count>0,说明还没有到达屏障点,进入for(;;)循环体,会让线程在条件队列trip上等待,直到屏障被打破。屏障打破时会重新赋值generation,被唤醒的线程会在(g != generation)这个点正常退出循环。

    CyclicBarrier屏障正常打破后进行了复位操作,所以CyclicBarrier可以重复使用。

    Semaphore

    一个小栗子:

    public class SemaphoreTest {
        private static final int tokenCount = 3;
        public static void main(String[] args) {
            final Semaphore tokens = new Semaphore(tokenCount); // 令牌发放者
            for(int i=0;i<10;i++)
                new Request(tokens).start();
        }
    
        static class Request extends Thread {
            private Semaphore tokens;
    
            public Request(Semaphore tokens) {
                this.tokens = tokens;
            }
            @Override
            public void run() {
                try {
                    tokens.acquire();// 申请访问令牌
                    System.out.println(Thread.currentThread().getName()+":访问资源...");
                    Thread.sleep(3000);
                    tokens.release();// 访问完毕归还令牌
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    输出: "Thread-0:访问资源、Thread-1:访问资源、Thread-2:访问资源",等待,然后"Thread-3:访问资源、Thread-5:访问资源、Thread-4:访问资源",不同的电脑顺序可能不一样,能发现都是3个组的访问。

    Semaphore信号,可以限定访问受限资源的线程数量,用来协调访问资源的线程数量,使其处在一个恒定的值。
    网络应用中为了保护服务器不被流量洪峰冲夸,会进行限流,限流会使用令牌桶算法,Semaphore就可以实现令牌桶:访问线程先拿到令牌才能访问,访问完后把令牌归还到桶中以便供其他线程使用,就保证了访问资源的线程数量和令牌数量一至。

    Semaphore是一个共享锁,内部代码布局和ReentrantLock类似,支持公平性设置,如果设置为公平锁,能够使等待最久的线程先获取信号,默认为非公平性的。

    public class Semaphore {
        /** 同步器实例 */
        private final Sync sync;
        /** 父类同步器*/
        abstract static class Sync extends AbstractQueuedSynchronizer {... ...}
        /** 非公平同步器*/
        static final class NonfairSync extends Sync {}
        /** 公平同步器*/
        static final class FairSync extends Sync {}
        /** 构造*/
        public MySemaphore(int permits) {
            sync = new NonfairSync(permits);
        }
        /** 构造*/
        public MySemaphore(int permits, boolean fair) {
            sync = fair ? new FairSync(permits) : new NonfairSync(permits);
        }
        ... ... 
    }
    

    加锁解锁流程和前面的写的共享锁一致。
    不同的是信号获取的逻辑中没有对重入的处理,一个线程可以多次获取信号,每次获取都会让总量减1。
    信号的释放时没有对信号的总量进行控制,比如初始的信号是5个,已经获取了5个,释放来了7个,这时可用的信号是7个,也就是说在释放的可以对信号数量进行扩容,如果在使用中需要保持信号数量恒定,一定要保证acquire和release成对出现。

    小结

    1. CountDownLatch和CyclicBarrier都是以计数器的的形式来协调线程同步的,一个显著的区别是CyclicBarrier可重用,CountDownLatch是一次性的。
    2. CountDownLatch和CyclicBarrier还有一个语义层面上的区别是,Count DownLatch是线程等待另外N个线程执行完毕。CyclicBarrier是N个线程相互等待,直到都执行毕。
    3. CountDownLatch强调依赖,CyclicBarrier强调协作。典型的应用场景就是大任务拆解为小任务,然后合并计算结果,比如多线程下载大文件,多个下载线程将自己分配的文件段下载完毕后,合并线程才开始进行文件合并操作。
    4. Semaphore用来限定访问受限资源的线程数量,典型的应用场景是流量控制,比如并发操作数据库,数据库连接池只有10个,必须保证只能有10个线程去获取连接,否则会报错。
    5. Semaphore可以进行简单的服务端限流,比如一个RPC服务器只能支撑200QPS,就可以用Semaphore去限制请求RPC的线程数量。当然对于复杂的服务端限流还得使用更高效令牌桶(Token Bucket)或者漏桶(Leaky Bucket)算法。

    码字不易,转载请保留原文连接https://www.jianshu.com/p/9e0ecc8b1358

    相关文章

      网友评论

        本文标题:并发十二:CountDownLatch、CyclicBarrie

        本文链接:https://www.haomeiwen.com/subject/knakkftx.html