美文网首页并发编程Java Concurrency
Java并发编程之并发工具类CountDownLatch,Cyc

Java并发编程之并发工具类CountDownLatch,Cyc

作者: 干天慈雨 | 来源:发表于2021-07-14 13:57 被阅读0次

    1 前言

    在JDK的并发包里提供了几个非常有用的并发工具类。CountDownLatch、CyclicBarrier和 Semaphore工具类提供了一种并发流程控制的手段,Exchanger工具类则提供了在线程间交换数 据的一种手段。本文会对这些并发工具类进行介绍。

    2 等待多线程完成的CountDownLatch

    2.1 CountDownLatch案例演示

    countdownlatch 是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完毕再执行。从命名可以解读到 countdown 是倒数的意思,类似于我们倒计时的概念。
    countdownlatch 提供了两个方法,一个是 countDown,一个是 await,countdownlatch 初始化的时候需要传入一个整数,在这个整数倒数到 0 之前,调用了 await 方法的程序都必须要等待,然后通过 countDown 来倒数。
    现在有一个需求就是:学校放学了,等学生们全部走完之后再关闭教室的门,如果不用CountDownLatch的话代码如下:

    public class WithoutCountDownLatchDemo {
        public static void main(String[] args) {
            for (int i = 0; i < 6; i++) {
                new Thread(() -> {
                    System.out.println(Thread.currentThread().getName() + "同学离开了");
                }).start();
            }
            System.out.println(Thread.currentThread().getName() + "要关门了,此时教室已经没有人了~");
        }
    }
    

    输出结果如下:

    Thread-0同学离开了
    main要关门了,此时教室已经没有人了~
    Thread-1同学离开了
    Thread-2同学离开了
    Thread-3同学离开了
    Thread-4同学离开了
    Thread-5同学离开了
    

    同学还没有走光,但是门却先关了,这样显然是不对的,那么使用了CountDownLatch的话代码是怎样的呢?

    public class CountDownLatchDemo {
        public static void main(String[] args) {
            CountDownLatch countDownLatch = new CountDownLatch(6);
            for (int i = 0; i < 6; i++) {
                new Thread(() -> {
                    System.out.println(Thread.currentThread().getName()+"同学离开了");
                    countDownLatch.countDown();
                }).start();
            }
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"要关门了,此时教室已经没人了~");
        }
    }
    

    运行结果如下:

    Thread-0同学离开了
    Thread-2同学离开了
    Thread-1同学离开了
    Thread-3同学离开了
    Thread-5同学离开了
    Thread-4同学离开了
    main要关门了,此时教室已经没人了~
    

    等到同学们全部走完之后,才开始关门,这样才是正确的!
    从如下代码中:

    public class CountDownLatchDemo2 {
        public static void main(String[] args) throws InterruptedException {
            CountDownLatch countDownLatch=new CountDownLatch(3);
            new Thread(() -> {
                System.out.println(""+Thread.currentThread().getName()+"-执行中");
                countDownLatch.countDown();
                System.out.println(""+Thread.currentThread().getName()+"-执行完毕");
            },"t1").start();
            new Thread(()->{
                System.out.println(""+Thread.currentThread().getName()+"-执行中");
                countDownLatch.countDown();
                System.out.println(""+Thread.currentThread().getName()+"-执行完毕");
            },"t2").start();
            new Thread(()->{
                System.out.println(""+Thread.currentThread().getName()+"-执行中");
                countDownLatch.countDown();
                System.out.println(""+Thread.currentThread().getName()+"-执行完毕");
            },"t3").start();
            countDownLatch.await();
            System.out.println("所有线程执行完毕");
        }
    }
    
    

    可以看出有点类似 join 的功能,但是比 join 更加灵活。CountDownLatch 构造函数会接收一个 int 类型的参数作为计数器的初始值,当调用 CountDownLatch 的countDown 方法时,这个计数器就会减一。通过 await 方法去阻塞去阻塞主流程。


    流程图

    2.2 CountDownLatch源码分析

    2.2.1 CountDownLatch类图

    CountDownLatch继承图

    对于 CountDownLatch,我们仅仅需要关心两个方法,一个是 countDown() 方法,另一个是 await() 方法。countDown() 方法每次调用都会将 state 减 1,直到state 的值为 0;而 await 是一个阻塞方法,当 state 减 为 0 的时候,await 方法才会返回。await 可以被多个线程调用,大家在这个时候脑子里要有个图:所有调用了await 方法的线程阻塞在 AQS 的阻塞队列中,等待条件满(state == 0),将线程从队列中一个个唤醒过来。

    2.2.2 acquireSharedInterruptibly

    countdownlatch 也用到了 AQS,在 CountDownLatch 内部写了一个 Sync 并且继承了 AQS 这个抽象类重写了 AQS中的共享锁方法。首先看到下面这个代码,这块代码主要是判断当前线程是否获取到了共享锁 ; ( 在CountDownLatch 中 , 使 用 的 是 共 享 锁 机 制 ,因为CountDownLatch 并不需要实现互斥的特性)

        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //state 如果不等于 0,说明当前线程需要加入到共享锁队列中
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    

    2.2.3 doAcquireSharedInterruptibly

    1. addWaiter 设置为 shared 模式
    2. tryAcquire 和 tryAcquireShared 的返回值不同,因此会多出一个判断过程
    3. 在 判 断 前 驱 节 点 是 头 节 点 后 , 调 用 了setHeadAndPropagate 方法,而不是简单的更新一下头节点。
        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);
            //创建一个共享模式的节点添加到队列中
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        // 就判断尝试获取锁
                        int r = tryAcquireShared(arg);
                        //r>=0 表示获取到了执行权限,这个时候因为 state!=0,所以不会执行这段代码
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    //阻塞线程
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    2.2.4 图解分析

    加入这个时候有 3 个线程调用了 await 方法,由于这个时候 state 的值还不为 0,所以这三个线程都会加入到 AQS队列中。并且三个线程都处于阻塞状态。


    图解分析

    2.2.5 CountDownLatch.countDown

    由于线程被 await 方法阻塞了,所以只有等到countdown 方法使得 state=0 的时候才会被唤醒,我们来看看 countdown 做了什么

    1. 只有当 state 减为 0 的时候,tryReleaseShared 才返回 true, 否则只是简单的 state = state - 1
    2. 如果 state=0, 则调用 doReleaseShared 唤醒处于 await 状态下的线程
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    

    CountDownLatch中的tryReleaseShared

            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }
    

    AQS.doReleaseShared
    共享锁的释放和独占锁的释放有一定的差别,前面唤醒锁的逻辑和独占锁是一样,先判断头结点是不是SIGNAL 状态,如果是,则修改为 0,并且唤醒头结点的下一个节点。
    PROPAGATE: 标识为 PROPAGATE 状态的节点,是共享锁模式下的节点状态,处于这个状态下的节点,会对线程的唤醒进行传播。

        private void doReleaseShared() {
            /*
             * Ensure that a release propagates, even if there are other
             * in-progress acquires/releases.  This proceeds in the usual
             * way of trying to unparkSuccessor of head if it needs
             * signal. But if it does not, status is set to PROPAGATE to
             * ensure that upon release, propagation continues.
             * Additionally, we must loop in case a new node is added
             * while we are doing this. Also, unlike other uses of
             * unparkSuccessor, we need to know if CAS to reset status
             * fails, if so rechecking.
             */
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    // 这个 CAS 失败的场景是:执行到这里的时候,刚好有一个节点入队,入队会将这个 ws 设置为 -1
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                // 如果到这里的时候,前面唤醒的线程已经占领了 head,那么再循环
                // 通过检查头节点是否改变了,如果改变了就继续循环
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    

    h == head:说明头节点还没有被刚刚用unparkSuccessor 唤醒的线程(这里可以理解为ThreadB)占有,此时 break 退出循环。
    h != head:头节点被刚刚唤醒的线程(这里可以理解为ThreadB)占有,那么这里重新进入下一轮循环,唤醒下一个节点(这里是 ThreadB )。我们知道,等到ThreadB 被唤醒后,其实是会主动唤醒 ThreadC...
    doAcquireSharedInterruptibly
    一旦 ThreadA 被唤醒,代码又会继续回到doAcquireSharedInterruptibly 中来执行。如果当前 state满足=0 的条件,则会执行 setHeadAndPropagate 方法

        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);
            //创建一个共享模式的节点添加到队列中
            boolean failed = true;
            try {
                for (;;) {//被唤醒的线程进入下一次循环继续判断
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    

    setHeadAndPropagate
    这个方法的主要作用是把被唤醒的节点,设置成 head 节 点。 然后继续唤醒队列中的其他线程。由于现在队列中有 3 个线程处于阻塞状态,一旦 ThreadA被唤醒,并且设置为 head 之后,会继续唤醒后续的ThreadB

       private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            setHead(node);
            /*
             * Try to signal next queued node if:
             *   Propagation was indicated by caller,
             *     or was recorded (as h.waitStatus either before
             *     or after setHead) by a previous operation
             *     (note: this uses sign-check of waitStatus because
             *      PROPAGATE status may transition to SIGNAL.)
             * and
             *   The next node is waiting in shared mode,
             *     or we don't know, because it appears null
             *
             * The conservatism in both of these checks may cause
             * unnecessary wake-ups, but only when there are multiple
             * racing acquires/releases, so most need signals now or soon
             * anyway.
             */
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }
    
    

    图解分析

    图解分析

    3 同步屏障CyclicBarrier

    CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一 组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会 开门,所有被屏障拦截的线程才会继续运行。

    3.1 CyclicBarrier使用场景

    使用场景:5个工程师一起来公司应聘,招聘方式分为笔试和面试。首先,要等人到齐后,开始笔试;笔试结束之后,再一起参加面试。把5个人看作5个线程,代码如下:
    Main类:

    public class Main {
        public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
            CyclicBarrier barrier = new CyclicBarrier(5);
            for (int i = 0; i < 5; i++) {
                new MyThread("线程-" + (i + 1), barrier).start();
            }
        }
    }
    

    MyThread类:

    public class MyThread extends Thread{
    
        private final CyclicBarrier barrier;
        private final Random random = new Random();
        public MyThread(String name, CyclicBarrier barrier) {
            super(name);
            this.barrier = barrier;
        }
        @Override public void run() {
            try {
                Thread.sleep(random.nextInt(2000));
                System.out.println(Thread.currentThread().getName() + " - 已经到达公司");
                barrier.await();
                Thread.sleep(random.nextInt(2000));
                System.out.println(Thread.currentThread().getName() + " - 已经笔试结束");
                barrier.await();
                Thread.sleep(random.nextInt(2000));
                System.out.println(Thread.currentThread().getName() + " - 已经面试结束");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            super.run();
        }
    }
    
    

    在整个过程中,有2个同步点:第1个同步点,要等所有应聘者都到达公司,再一起开始笔试;第2个同步点,要等所有应聘者都结束笔试,之后一起进入面试环节。

    3.2 CyclicBarrier实现原理

    CyclicBarrier基于ReentrantLock+Condition实现。

    public class CyclicBarrier { 
        private final ReentrantLock lock = new ReentrantLock(); 
        // 用于线程之间相互唤醒 
        private final Condition trip = lock.newCondition(); 
        // 线程总数 
        private final int parties; 
        private int count; 
        private Generation generation = new Generation(); 
        // ... 
    }
    

    下面详细介绍 CyclicBarrier 的实现原理。先看构造方法:

        public CyclicBarrier(int parties, Runnable barrierAction) {
            if (parties <= 0) throw new IllegalArgumentException();
            // 参与方数量
            this.parties = parties;
            this.count = parties;
            // 当所有线程被唤醒时,执行barrierCommand表示的Runnable。
            this.barrierCommand = barrierAction;
        }
    

    接下来看一下await()方法的实现过程。

        public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
    
     private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                final Generation g = generation;
    
                if (g.broken)
                    throw new BrokenBarrierException();
                // 响应中断
                if (Thread.interrupted()) {
                    // 唤醒所有阻塞的线程
                    breakBarrier();
                    throw new InterruptedException();
                }
                // 每个线程调用一次await(),count都要减1
                int index = --count;
                // 当count减到0的时候,此线程唤醒其他所有线程
                if (index == 0) {  // tripped
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();
                        ranAction = true;
                        nextGeneration();
                        return 0;
                    } finally {
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    
                // loop until tripped, broken, interrupted, or timed out
                for (;;) {
                    try {
                        if (!timed)
                            trip.await();
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            // We're about to finish waiting even if we had not
                            // been interrupted, so this interrupt is deemed to
                            // "belong" to subsequent execution.
                            Thread.currentThread().interrupt();
                        }
                    }
    
                    if (g.broken)
                        throw new BrokenBarrierException();
    
                    if (g != generation)
                        return index;
    
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    
        private void breakBarrier() {
            generation.broken = true;
            count = parties;
            trip.signalAll();
        }
    
        private void nextGeneration() {
            // signal completion of last generation
            trip.signalAll();
            // set up next generation
            count = parties;
            generation = new Generation();
        }
    

    以上几点的说明:

    1. CyclicBarrier是可以被重用的。以上一节的应聘场景为例,来了5个线程,这5个线程互相等待,到齐后一起被唤醒,各自执行接下来的逻辑;然后,这5个线程继续互相等待,到齐后再一起被唤醒。每一轮被称为一个Generation,就是一次同步点。
    2. CyclicBarrier 会响应中断。5 个线程没有到齐,如果有线程收到了中断信号,所有阻塞的线程也会被唤醒,就是上面的breakBarrier()方法。然后count被重置为初始值(parties),重新开始。
    3. 上面的回调方法,barrierAction只会被第5个线程执行1次(在唤醒其他4个线程之前),而不是5个线程每个都执行1次。

    3.3 CyclicBarrier与CountDownLatch 区别

    CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的
    CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以使用多次,所以CyclicBarrier能够处理更为复杂的场景;

    4 控制并发线程数的Semaphore

    Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以 保证合理的使用公共资源

    4.1 Semaphore的使用场景

    public class SemaphoreDemo {
        public static void main(String[] args) {
            Semaphore semaphore=new Semaphore(3);//此时海底捞有3个空桌
            for (int i = 0; i < 6; i++) {
                new Thread(() -> {
                    try {
                        semaphore.acquire();
                        System.out.println("第"+Thread.currentThread().getName()+"等待者抢到座位。");
                        //假设每桌客人吃饭时间为3S
                        TimeUnit.SECONDS.sleep(3);
                        System.out.println("第"+Thread.currentThread().getName()+"客人吃完饭离开。");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        semaphore.release();
                    }
                },String.valueOf(i)).start();
            }
        }
    }
    
    

    运行结果如下:

    第0等待者抢到座位。
    第1等待者抢到座位。
    第2等待者抢到座位。
    第1客人吃完饭离开。
    第0客人吃完饭离开。
    第2客人吃完饭离开。
    第4等待者抢到座位。
    第5等待者抢到座位。
    第3等待者抢到座位。
    第5客人吃完饭离开。
    第3客人吃完饭离开。
    第4客人吃完饭离开。
    

    4.2 Semaphore源码分析

    假设有n个线程来获取Semaphore里面的10份资源(n > 10),n个线程中只有10个线程能获取到,其他线程都会阻塞。直到有线程释放了资源,其他线程才能获取到。


    示例

    当初始的资源个数为1的时候,Semaphore退化为排他锁。正因为如此,Semaphone的实现原理和锁十分类似,是基于AQS,有公平和非公平之分。Semaphore相关类的继承体系如下图所示:


    继承体系

    创建 Semaphore 实例的时候,需要一个参数 permits,这个基本上可以确定是设置给 AQS 的 state 的,然后每个线程调用 acquire 的时候,执行 state = state - 1,release 的时候执行 state = state + 1,当然,acquire 的时候,如果 state = 0,说明没有资源了,需要等待其他线程 release。
    Semaphore 分公平策略和非公平策略

    4.2.1 FairSync

       /**
         * Fair version
         */
        static final class FairSync extends Sync {
            private static final long serialVersionUID = 2014338818796000944L;
    
            FairSync(int permits) {
                super(permits);
            }
    
            protected int tryAcquireShared(int acquires) {
                for (;;) {
                    // 区别就在于是不是会先判断是否有线程在排队,然后才进行 CAS 减操作
                    if (hasQueuedPredecessors())
                        return -1;
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
        }
    

    4.2.2 NofairSync

    通过对比发现公平和非公平的区别就在于是否多了一个hasQueuedPredecessors 的判断

       /**
         * NonFair version
         */
        static final class NonfairSync extends Sync {
            private static final long serialVersionUID = -2694183684443567898L;
    
            NonfairSync(int permits) {
                super(permits);
            }
    
            protected int tryAcquireShared(int acquires) {
                return nonfairTryAcquireShared(acquires);
            }
        }
    
        final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
    

    后面的代码和 CountDownLatch 的是完全一样,都是基于共享锁的实现。

    5 线程间交换数据的Exchanger

    Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交 换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过 exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也 执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产 出来的数据传递给对方。

    5.1 使用场景

    Exchanger用于线程之间交换数据,代码如下:

    public class ExchangerDemo {
        private static final Random random = new Random();
    
        public static void main(String[] args) {
            // 建一个多线程共用的exchange对象
            // 把exchange对象传给3个线程对象。每个线程在自己的run方法中调用exchange,把自 己的数据作为参数
            // 传递进去,返回值是另外一个线程调用exchange传进去的参数
            Exchanger<String> exchanger = new Exchanger<>();
    
            new Thread("线程1") {
                @Override
                public void run() {
                    while (true) {
                        try {
                            // 如果没有其他线程调用exchange,线程阻塞,直到有其他线程调 用exchange为止。
                            String otherData = exchanger.exchange("交换数据1");
                            System.out.println(Thread.currentThread().getName() + "得到<==" + otherData);
                            Thread.sleep(random.nextInt(2000));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }.start();
    
    
            new Thread("线程2") {
                @Override
                public void run() {
                    while (true) {
                        try {
                            String otherData = exchanger.exchange("交换数据2");
                            System.out.println(Thread.currentThread().getName() + "得到<==" + otherData);
                            Thread.sleep(random.nextInt(2000));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }.start();
    
            new Thread("线程3") {
                @Override
                public void run() {
                    while (true) {
                        try {
                            String otherData = exchanger.exchange("交换数据3");
                            System.out.println(Thread.currentThread().getName() + "得到<==" + otherData);
                            Thread.sleep(random.nextInt(2000));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }.start();
        }
    }
    
    

    在上面的例子中,3个线程并发地调用exchange(...),会两两交互数据,如1/2、1/3和2/3。

    5.2 Exchanger实现原理

    Exchanger的核心机制和Lock一样,也是CAS+park/unpark。

    5.2.1 Exchanger内部代码

    首先,在Exchanger内部,有两个内部类:Participant和Node,代码如下:

    public class Exchanger<V> {
        //...
        // 添加了Contended注解,表示伪共享与缓存行填充
        @sun.misc.Contended static final class Node {
            int index;              // Arena index
            int bound;              // Last recorded value of Exchanger.bound
            int collides;           // 本次绑定中,CAS操作失败次数
            int hash;               // 自旋伪随机
            Object item;            // 本线程要交换的数据
            volatile Object match;  //  对方线程交换来的数据
            // 当前线程
            volatile Thread parked; // 当前线程阻塞的时候设置该属性,不阻塞为null。
        }
    
        /** The corresponding thread local class */
        static final class Participant extends ThreadLocal<java.util.concurrent.Exchanger.Node> {
            public java.util.concurrent.Exchanger.Node initialValue() { return new java.util.concurrent.Exchanger.Node(); }
        }
        //...
    }
    

    每个线程在调用exchange(...)方法交换数据的时候,会先创建一个Node对象。
    这个Node对象就是对该线程的包装,里面包含了3个重要字段:第一个是该线程要交互的数据,第二个是对方线程交换来的数据,最后一个是该线程自身。
    一个Node只能支持2个线程之间交换数据,要实现多个线程并行地交换数据,需要多个Node,因此在Exchanger里面定义了Node数组:

        /**
         * Elimination array; null until enabled (within slotExchange).
         * Element accesses use emulation of volatile gets and CAS.
         */
        private volatile Node[] arena;
    

    5.2.2 exchange(V x)实现分析

    明白了大致思路,下面来看exchange(V x)方法的详细实现:

        @SuppressWarnings("unchecked")
        public V exchange(V x) throws InterruptedException {
            Object v;
            Object item = (x == null) ? NULL_ITEM : x; // translate null args
            if ((arena != null ||
                 (v = slotExchange(item, false, 0L)) == null) &&
                ((Thread.interrupted() || // disambiguates null return
                  (v = arenaExchange(item, false, 0L)) == null)))
                throw new InterruptedException();
            return (v == NULL_ITEM) ? null : (V)v;
        }
    

    上面方法中,如果arena不是null,表示启用了arena方式交换数据。如果arena不是null,并且线程被中断,则抛异常
    如果arena不是null,并且arenaExchange的返回值为null,则抛异常。对方线程交换来的null值是封装为NULL_ITEM对象的,而不是null。
    如果slotExchange的返回值是null,并且线程被中断,则抛异常。
    如果slotExchange的返回值是null,并且areaExchange的返回值是null,则抛异常。

    slotExchange的实现:

        /**
         * Exchange function used until arenas enabled. See above for explanation.
         * 如果不启用arenas,则使用该方法进行线程间数据交换。
         * @param item 需要交换的数据
         * @param timed 是否是计时等待,true表示是计时等待
         * @param ns  如果是计时等待,该值表示最大等待的时长。
         * @return 对方线程交换来的数据;如果等待超时或线程中断,或者启用了arena,则返回 null。
         */
        private final Object slotExchange(Object item, boolean timed, long ns) {
            // participant在初始化的时候设置初始值为new Node()
            // 获取本线程要交换的数据节点
            Node p = participant.get();
            // 获取当前线程
            Thread t = Thread.currentThread();
            // 如果线程被中断,则返回null。
            if (t.isInterrupted()) // preserve interrupt status so caller can recheck
                return null;
    
            for (Node q;;) {
                // 如果slot非空,表明有其他线程在等待该线程交换数据
                if ((q = slot) != null) {
                    // CAS操作,将当前线程的slot由slot设置为null
                    // 如果操作成功,则执行if中的语句
                    if (U.compareAndSwapObject(this, SLOT, q, null)) {
                        // 获取对方线程交换来的数据
                        Object v = q.item;
                        // 设置要交换的数据
                        q.match = item;
                        // 获取q中阻塞的线程对象
                        Thread w = q.parked;
                        if (w != null)
                            // 如果对方阻塞的线程非空,则唤醒阻塞的线程
                            U.unpark(w);
                        return v;
                    }
                    // create arena on contention, but continue until slot null
                    // 创建arena用于处理多个线程需要交换数据的场合,防止slot冲突
                    if (NCPU > 1 && bound == 0 &&
                        U.compareAndSwapInt(this, BOUND, 0, SEQ))
                        arena = new Node[(FULL + 2) << ASHIFT];
                }
                // 如果arena不是null,需要调用者调用arenaExchange方法接着获取对方线程交 换来的数据
                else if (arena != null)
                    return null; // caller must reroute to arenaExchange
                else {
                    // 如果slot为null,表示对方没有线程等待该线程交换数据
                    // 设置要交换的本方数据
                    p.item = item;
                    // 设置当前线程要交换的数据到slot
                    // CAS操作,如果设置失败,则进入下一轮for循环
                    if (U.compareAndSwapObject(this, SLOT, null, p))
                        break;
                    p.item = null;
                }
            }
    
            // await release
            // 没有对方线程等待交换数据,将当前线程要交换的数据放到slot中,是一个Node对象
            // 然后阻塞,等待唤醒
            int h = p.hash;
            // 如果是计时等待交换,则计算超时时间;否则设置为0。
            long end = timed ? System.nanoTime() + ns : 0L;
            // 如果CPU核心数大于1,则使用SPINS数,自旋;否则为1,没必要自旋。
            int spins = (NCPU > 1) ? SPINS : 1;
            // 记录对方线程交换来的数据
            Object v;
            // 如果p.match==null,表示还没有线程交换来数据
            while ((v = p.match) == null) {
                // 如果自旋次数大于0,计算hash随机数
                if (spins > 0) {
                    // 生成随机数,用于自旋次数控制
                    h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
                    if (h == 0)
                        h = SPINS | (int)t.getId();
                    else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                        Thread.yield();
                }
                // p是ThreadLocal记录的当前线程的Node。
                // 如果slot不是p表示slot是别的线程放进去的
                else if (slot != p)
                    spins = SPINS;
                else if (!t.isInterrupted() && arena == null &&
                         (!timed || (ns = end - System.nanoTime()) > 0L)) {
                    U.putObject(t, BLOCKER, this);
                    p.parked = t;
                    if (slot == p)
                        U.park(false, ns);
                    p.parked = null;
                    U.putObject(t, BLOCKER, null);
                }
                else if (U.compareAndSwapObject(this, SLOT, p, null)) {
                    // 没有被中断但是超时了,返回TIMED_OUT,否则返回null
                    v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
                    break;
                }
            }
            // match设置为null值 CAS
            U.putOrderedObject(p, MATCH, null);
            p.item = null;
            p.hash = h;
            // 返回获取的对方线程交换来的数据
            return v;
        }
    

    arenaExchange的实现:

       /**
         * Exchange function when arenas enabled. See above for explanation.
         * 当启用arenas的时候,使用该方法进行线程间的数据交换。
         * @param item 本线程要交换的非null数据。
         * @param timed 如果需要计时等待,则设置为true。
         * @param ns 表示计时等待的最大时长。
         * @return 对方线程交换来的数据。如果线程被中断,或者等待超时,则返回null。
         */
        private final Object arenaExchange(Object item, boolean timed, long ns) {
            Node[] a = arena;
            Node p = participant.get();
            // 访问下标为i处的slot数据
            for (int i = p.index;;) {                      // access slot at i
                int b, m, c; long j;                       // j is raw array offset
                Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
                // 如果q不是null,则将数组的第j个元素由q设置为null
                if (q != null && U.compareAndSwapObject(a, j, q, null)) {
                    // 获取对方线程交换来的数据
                    Object v = q.item;                     // release
                    // 设置本方线程交换的数据
                    q.match = item;
                    Thread w = q.parked;
                    if (w != null)
                        // 如果对方线程非空,则唤醒对方线程
                        U.unpark(w);
                    return v;
                }
                // 如果自旋次数没达到边界,且q为null
                else if (i <= (m = (b = bound) & MMASK) && q == null) {
                    // 提供本方数据
                    p.item = item;                         // offer
                    // 将arena的第j个元素由null设置为p
                    if (U.compareAndSwapObject(a, j, null, p)) {
                        long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
                        Thread t = Thread.currentThread(); // wait
                        // 自旋等待
                        for (int h = p.hash, spins = SPINS;;) {
                            // 获取对方交换来的数据
                            Object v = p.match;
                            // 如果对方交换来的数据非空
                            if (v != null) {
                                // 将p设置为null,CAS操作
                                U.putOrderedObject(p, MATCH, null);
                                // 清空
                                p.item = null;             // clear for next use
                                p.hash = h;
                                // 返回交换来的数据
                                return v;
                            }
                            // 产生随机数,用于限制自旋次数
                            else if (spins > 0) {
                                h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
                                if (h == 0)                // initialize hash
                                    h = SPINS | (int)t.getId();
                                else if (h < 0 &&          // approx 50% true
                                         (--spins & ((SPINS >>> 1) - 1)) == 0)
                                    Thread.yield();        // two yields per wait
                            }
                            // 如果arena的第j个元素不是p
                            else if (U.getObjectVolatile(a, j) != p)
                                spins = SPINS;       // releaser hasn't set match yet
                            else if (!t.isInterrupted() && m == 0 &&
                                     (!timed ||
                                      (ns = end - System.nanoTime()) > 0L)) {
                                U.putObject(t, BLOCKER, this); // emulate LockSupport
                                p.parked = t;              // minimize window
                                if (U.getObjectVolatile(a, j) == p)
                                    U.park(false, ns);
                                p.parked = null;
                                U.putObject(t, BLOCKER, null);
                            }
                            // arena的第j个元素是p并且CAS设置arena的第j个元素由p设置 为null成功
                            else if (U.getObjectVolatile(a, j) == p &&
                                     U.compareAndSwapObject(a, j, p, null)) {
                                if (m != 0)                // try to shrink
                                    U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
                                p.item = null;
                                p.hash = h;
                                i = p.index >>>= 1;        // descend
                                // 如果线程被中断,则返回null值
                                if (Thread.interrupted())
                                    return null;
                                if (timed && m == 0 && ns <= 0L)
                                    // 如果超时,返回TIMED_OUT。
                                    return TIMED_OUT;
                                break;                     // expired; restart
                            }
                        }
                    }
                    else
                        p.item = null;                     // clear offer
                }
                else {
                    if (p.bound != b) {                    // stale; reset
                        p.bound = b;
                        p.collides = 0;
                        i = (i != m || m == 0) ? m : m - 1;
                    }
                    else if ((c = p.collides) < m || m == FULL ||
                             !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
                        p.collides = c + 1;
                        i = (i == 0) ? m : i - 1;          // cyclically traverse
                    }
                    else
                        i = m + 1;                         // grow
                    p.index = i;
                }
            }
        }
    
    

    相关文章

      网友评论

        本文标题:Java并发编程之并发工具类CountDownLatch,Cyc

        本文链接:https://www.haomeiwen.com/subject/jbqspltx.html