美文网首页
多线程和并发(八):JUC相关类

多线程和并发(八):JUC相关类

作者: lilykeke | 来源:发表于2021-09-09 09:23 被阅读0次

    1. CountDownLatch

    1.1 说明

    一种同步辅助工具,允许一个或多个线程等待其他线程执行的一组操作完成。

    给定一个计数值。当每个线程完成后,调用{@link countDown}方法给计数值减一。在当前计数达到零之前,调用{@link#await await}方法的线程将一直阻塞,直到计数值为0后将释放所有等待线程。

    1.2 源码

    public class CountDownLatch {
        /**
         * Synchronization control For CountDownLatch.
         * Uses AQS state to represent count.
         */
        private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            Sync(int count) {
                setState(count);
            }
    
            int getCount() {
                return getState();
            }
    
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }
    
        private final Sync sync;
    
        /**
         * 初始化
         */
        public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    
        /**
         * 导致当前线程等待,直到count为0
         */
        public void await() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
        /**
         * Causes the current thread to wait until the latch has counted down to
         * zero, unless the thread is {@linkplain Thread#interrupt interrupted},
         * or the specified waiting time elapses.
         *
         */
        public boolean await(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
        }
    
        /**
         * Decrements the count of the latch, releasing all waiting threads if
         * the count reaches zero.
         */
        public void countDown() {
            sync.releaseShared(1);
        }
    
        /**
         * Returns the current count.
         *
         * <p>This method is typically used for debugging and testing purposes.
         *
         * @return the current count
         */
        public long getCount() {
            return sync.getCount();
        }
    }
    

    1.3 示例

    public class CountDownLatchDemo {
        public static void main(String[] args) {
            CountDownLatch countDownLatch = new CountDownLatch(15);
            for (int i = 0; i < 15; i++) {
                new Thread(() -> {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName());
                    countDownLatch.countDown();
    
                }, ""+ i).start();
            }
    
            try {
                countDownLatch.await();
                System.out.println("main thread");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    结果:

    2. CyclicBarrier

    2.1 说明

    一种同步辅助工具,允许一组线程全部等待对方到达一个公共屏障点。CyclicBarrier在涉及固定大小的线程方的程序中非常有用,这些线程偶尔必须相互等待。这个屏障被称为循环的,因为它可以在等待的线程被释放后重新使用。

    CyclicBarrier支持可选的可运行命令,该命令在参与方中的最后一个线程到达之后,但在释放任何线程之前,在每个屏障点运行一次。此屏障操作有助于在任何一方继续之前更新共享状态。

    2.2 源码

    public class CyclicBarrier {
     
        private static class Generation {
            boolean broken = false;
        }
    
        /** The lock for guarding barrier entry */
        private final ReentrantLock lock = new ReentrantLock();
        /** Condition to wait on until tripped */
        private final Condition trip = lock.newCondition();
        //每次拦截的线程数,在构造时进行赋值
        private final int parties;
       
        private final Runnable barrierCommand;
        /** The current generation */
        private Generation generation = new Generation();
    
        /**
         * Number of parties still waiting. Counts down from parties to 0
         * on each generation.  It is reset to parties on each new
         * generation or when broken.
         */
        private int count; //内部计数器与parties相等
    
        /**
         * Updates state on barrier trip and wakes up everyone.
         * Called only while holding lock.
         */
        private void nextGeneration() {
            // signal completion of last generation
            trip.signalAll();
            // set up next generation
            count = parties;
            generation = new Generation();
        }
    
        /**
         * Sets current barrier generation as broken and wakes up everyone.
         * Called only while holding lock.
         */
        private void breakBarrier() {
            generation.broken = true;
            count = parties;
            trip.signalAll();
        }
    
        /**
         * Main barrier code, covering the various policies.
         */
        private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                final Generation g = generation;
    
                if (g.broken)
                    throw new BrokenBarrierException();
    
                if (Thread.interrupted()) {
                    breakBarrier(); //若线程中断,唤醒所有线程
                    throw new InterruptedException();
                }
    
                int index = --count; //计数减一
                if (index == 0) {  // tripped
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();  //执行指定任务
                        ranAction = true;
                        nextGeneration(); //唤醒所有线程,转到下一代
                        return 0;
                    } finally {
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    
                // 如果计数器不为0则执行此循环
                for (;;) {
                    try {
                        //根据传入的参数来决定是定时等待还是非定时等待
                        if (!timed)
                            trip.await();
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    if (g != generation)
                        return index;
    
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    
        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            this.parties = parties;
            this.count = parties;
            this.barrierCommand = barrierAction;
        }
    
        public CyclicBarrier(int parties) {
            this(parties, null);
        }
    
        public int getParties() {
            return parties;
        }
    
        public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
    
        public int await(long timeout, TimeUnit unit)
            throws InterruptedException,
                   BrokenBarrierException,
                   TimeoutException {
            return dowait(true, unit.toNanos(timeout));
        }
    
        public boolean isBroken() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return generation.broken;
            } finally {
                lock.unlock();
            }
        }
    
        public void reset() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                breakBarrier();   // break the current generation
                nextGeneration(); // start a new generation
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * Returns the number of parties currently waiting at the barrier.
         * This method is primarily useful for debugging and assertions.
         *
         * @return the number of parties currently blocked in {@link #await}
         */
        public int getNumberWaiting() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return parties - count;
            } finally {
                lock.unlock();
            }
        }
    }
    

    应用场景:多个线程计算最后汇总
    CyclicBarrier 和CountDownLatch 区别
    1.CountDownLatch 只能使用一次,CyclicBarrier 的计数器可以使用reset()方法重置。所以CyclicBarrier 可以处理更复杂的逻辑。
    例如,计算发生错误可以重置计数器,并让线程重新执行一次。
    2.CyclicBarrier 还提供其他方法,如getNumberWaiting方法可以获得被阻塞的线程数等

    相关文章

      网友评论

          本文标题:多线程和并发(八):JUC相关类

          本文链接:https://www.haomeiwen.com/subject/cpdejktx.html