1 前言
在JDK的并发包里提供了几个非常有用的并发工具类。CountDownLatch、CyclicBarrier和 Semaphore工具类提供了一种并发流程控制的手段,Exchanger工具类则提供了在线程间交换数 据的一种手段。本文会对这些并发工具类进行介绍。
2 等待多线程完成的CountDownLatch
2.1 CountDownLatch案例演示
countdownlatch 是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完毕再执行。从命名可以解读到 countdown 是倒数的意思,类似于我们倒计时的概念。
countdownlatch 提供了两个方法,一个是 countDown,一个是 await,countdownlatch 初始化的时候需要传入一个整数,在这个整数倒数到 0 之前,调用了 await 方法的程序都必须要等待,然后通过 countDown 来倒数。
现在有一个需求就是:学校放学了,等学生们全部走完之后再关闭教室的门,如果不用CountDownLatch的话代码如下:
public class WithoutCountDownLatchDemo {
public static void main(String[] args) {
for (int i = 0; i < 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "同学离开了");
}).start();
}
System.out.println(Thread.currentThread().getName() + "要关门了,此时教室已经没有人了~");
}
}
输出结果如下:
Thread-0同学离开了
main要关门了,此时教室已经没有人了~
Thread-1同学离开了
Thread-2同学离开了
Thread-3同学离开了
Thread-4同学离开了
Thread-5同学离开了
同学还没有走光,但是门却先关了,这样显然是不对的,那么使用了CountDownLatch的话代码是怎样的呢?
public class CountDownLatchDemo {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"同学离开了");
countDownLatch.countDown();
}).start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"要关门了,此时教室已经没人了~");
}
}
运行结果如下:
Thread-0同学离开了
Thread-2同学离开了
Thread-1同学离开了
Thread-3同学离开了
Thread-5同学离开了
Thread-4同学离开了
main要关门了,此时教室已经没人了~
等到同学们全部走完之后,才开始关门,这样才是正确的!
从如下代码中:
public class CountDownLatchDemo2 {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch=new CountDownLatch(3);
new Thread(() -> {
System.out.println(""+Thread.currentThread().getName()+"-执行中");
countDownLatch.countDown();
System.out.println(""+Thread.currentThread().getName()+"-执行完毕");
},"t1").start();
new Thread(()->{
System.out.println(""+Thread.currentThread().getName()+"-执行中");
countDownLatch.countDown();
System.out.println(""+Thread.currentThread().getName()+"-执行完毕");
},"t2").start();
new Thread(()->{
System.out.println(""+Thread.currentThread().getName()+"-执行中");
countDownLatch.countDown();
System.out.println(""+Thread.currentThread().getName()+"-执行完毕");
},"t3").start();
countDownLatch.await();
System.out.println("所有线程执行完毕");
}
}
可以看出有点类似 join 的功能,但是比 join 更加灵活。CountDownLatch 构造函数会接收一个 int 类型的参数作为计数器的初始值,当调用 CountDownLatch 的countDown 方法时,这个计数器就会减一。通过 await 方法去阻塞去阻塞主流程。
流程图
2.2 CountDownLatch源码分析
2.2.1 CountDownLatch类图
CountDownLatch继承图对于 CountDownLatch,我们仅仅需要关心两个方法,一个是 countDown() 方法,另一个是 await() 方法。countDown() 方法每次调用都会将 state 减 1,直到state 的值为 0;而 await 是一个阻塞方法,当 state 减 为 0 的时候,await 方法才会返回。await 可以被多个线程调用,大家在这个时候脑子里要有个图:所有调用了await 方法的线程阻塞在 AQS 的阻塞队列中,等待条件满(state == 0),将线程从队列中一个个唤醒过来。
2.2.2 acquireSharedInterruptibly
countdownlatch 也用到了 AQS,在 CountDownLatch 内部写了一个 Sync 并且继承了 AQS 这个抽象类重写了 AQS中的共享锁方法。首先看到下面这个代码,这块代码主要是判断当前线程是否获取到了共享锁 ; ( 在CountDownLatch 中 , 使 用 的 是 共 享 锁 机 制 ,因为CountDownLatch 并不需要实现互斥的特性)
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//state 如果不等于 0,说明当前线程需要加入到共享锁队列中
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
2.2.3 doAcquireSharedInterruptibly
- addWaiter 设置为 shared 模式
- tryAcquire 和 tryAcquireShared 的返回值不同,因此会多出一个判断过程
- 在 判 断 前 驱 节 点 是 头 节 点 后 , 调 用 了setHeadAndPropagate 方法,而不是简单的更新一下头节点。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
//创建一个共享模式的节点添加到队列中
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 就判断尝试获取锁
int r = tryAcquireShared(arg);
//r>=0 表示获取到了执行权限,这个时候因为 state!=0,所以不会执行这段代码
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//阻塞线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
2.2.4 图解分析
加入这个时候有 3 个线程调用了 await 方法,由于这个时候 state 的值还不为 0,所以这三个线程都会加入到 AQS队列中。并且三个线程都处于阻塞状态。
图解分析
2.2.5 CountDownLatch.countDown
由于线程被 await 方法阻塞了,所以只有等到countdown 方法使得 state=0 的时候才会被唤醒,我们来看看 countdown 做了什么
- 只有当 state 减为 0 的时候,tryReleaseShared 才返回 true, 否则只是简单的 state = state - 1
- 如果 state=0, 则调用 doReleaseShared 唤醒处于 await 状态下的线程
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
CountDownLatch中的tryReleaseShared
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
AQS.doReleaseShared
共享锁的释放和独占锁的释放有一定的差别,前面唤醒锁的逻辑和独占锁是一样,先判断头结点是不是SIGNAL 状态,如果是,则修改为 0,并且唤醒头结点的下一个节点。
PROPAGATE: 标识为 PROPAGATE 状态的节点,是共享锁模式下的节点状态,处于这个状态下的节点,会对线程的唤醒进行传播。
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 这个 CAS 失败的场景是:执行到这里的时候,刚好有一个节点入队,入队会将这个 ws 设置为 -1
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果到这里的时候,前面唤醒的线程已经占领了 head,那么再循环
// 通过检查头节点是否改变了,如果改变了就继续循环
if (h == head) // loop if head changed
break;
}
}
h == head:说明头节点还没有被刚刚用unparkSuccessor 唤醒的线程(这里可以理解为ThreadB)占有,此时 break 退出循环。
h != head:头节点被刚刚唤醒的线程(这里可以理解为ThreadB)占有,那么这里重新进入下一轮循环,唤醒下一个节点(这里是 ThreadB )。我们知道,等到ThreadB 被唤醒后,其实是会主动唤醒 ThreadC...
doAcquireSharedInterruptibly
一旦 ThreadA 被唤醒,代码又会继续回到doAcquireSharedInterruptibly 中来执行。如果当前 state满足=0 的条件,则会执行 setHeadAndPropagate 方法
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
//创建一个共享模式的节点添加到队列中
boolean failed = true;
try {
for (;;) {//被唤醒的线程进入下一次循环继续判断
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
setHeadAndPropagate
这个方法的主要作用是把被唤醒的节点,设置成 head 节 点。 然后继续唤醒队列中的其他线程。由于现在队列中有 3 个线程处于阻塞状态,一旦 ThreadA被唤醒,并且设置为 head 之后,会继续唤醒后续的ThreadB
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
图解分析
3 同步屏障CyclicBarrier
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一 组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会 开门,所有被屏障拦截的线程才会继续运行。
3.1 CyclicBarrier使用场景
使用场景:5个工程师一起来公司应聘,招聘方式分为笔试和面试。首先,要等人到齐后,开始笔试;笔试结束之后,再一起参加面试。把5个人看作5个线程,代码如下:
Main类:
public class Main {
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(5);
for (int i = 0; i < 5; i++) {
new MyThread("线程-" + (i + 1), barrier).start();
}
}
}
MyThread类:
public class MyThread extends Thread{
private final CyclicBarrier barrier;
private final Random random = new Random();
public MyThread(String name, CyclicBarrier barrier) {
super(name);
this.barrier = barrier;
}
@Override public void run() {
try {
Thread.sleep(random.nextInt(2000));
System.out.println(Thread.currentThread().getName() + " - 已经到达公司");
barrier.await();
Thread.sleep(random.nextInt(2000));
System.out.println(Thread.currentThread().getName() + " - 已经笔试结束");
barrier.await();
Thread.sleep(random.nextInt(2000));
System.out.println(Thread.currentThread().getName() + " - 已经面试结束");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
super.run();
}
}
在整个过程中,有2个同步点:第1个同步点,要等所有应聘者都到达公司,再一起开始笔试;第2个同步点,要等所有应聘者都结束笔试,之后一起进入面试环节。
3.2 CyclicBarrier实现原理
CyclicBarrier基于ReentrantLock+Condition实现。
public class CyclicBarrier {
private final ReentrantLock lock = new ReentrantLock();
// 用于线程之间相互唤醒
private final Condition trip = lock.newCondition();
// 线程总数
private final int parties;
private int count;
private Generation generation = new Generation();
// ...
}
下面详细介绍 CyclicBarrier 的实现原理。先看构造方法:
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
// 参与方数量
this.parties = parties;
this.count = parties;
// 当所有线程被唤醒时,执行barrierCommand表示的Runnable。
this.barrierCommand = barrierAction;
}
接下来看一下await()方法的实现过程。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
// 响应中断
if (Thread.interrupted()) {
// 唤醒所有阻塞的线程
breakBarrier();
throw new InterruptedException();
}
// 每个线程调用一次await(),count都要减1
int index = --count;
// 当count减到0的时候,此线程唤醒其他所有线程
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
以上几点的说明:
- CyclicBarrier是可以被重用的。以上一节的应聘场景为例,来了5个线程,这5个线程互相等待,到齐后一起被唤醒,各自执行接下来的逻辑;然后,这5个线程继续互相等待,到齐后再一起被唤醒。每一轮被称为一个Generation,就是一次同步点。
- CyclicBarrier 会响应中断。5 个线程没有到齐,如果有线程收到了中断信号,所有阻塞的线程也会被唤醒,就是上面的breakBarrier()方法。然后count被重置为初始值(parties),重新开始。
- 上面的回调方法,barrierAction只会被第5个线程执行1次(在唤醒其他4个线程之前),而不是5个线程每个都执行1次。
3.3 CyclicBarrier与CountDownLatch 区别
CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的
CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以使用多次,所以CyclicBarrier能够处理更为复杂的场景;
4 控制并发线程数的Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以 保证合理的使用公共资源
4.1 Semaphore的使用场景
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore=new Semaphore(3);//此时海底捞有3个空桌
for (int i = 0; i < 6; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println("第"+Thread.currentThread().getName()+"等待者抢到座位。");
//假设每桌客人吃饭时间为3S
TimeUnit.SECONDS.sleep(3);
System.out.println("第"+Thread.currentThread().getName()+"客人吃完饭离开。");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
},String.valueOf(i)).start();
}
}
}
运行结果如下:
第0等待者抢到座位。
第1等待者抢到座位。
第2等待者抢到座位。
第1客人吃完饭离开。
第0客人吃完饭离开。
第2客人吃完饭离开。
第4等待者抢到座位。
第5等待者抢到座位。
第3等待者抢到座位。
第5客人吃完饭离开。
第3客人吃完饭离开。
第4客人吃完饭离开。
4.2 Semaphore源码分析
假设有n个线程来获取Semaphore里面的10份资源(n > 10),n个线程中只有10个线程能获取到,其他线程都会阻塞。直到有线程释放了资源,其他线程才能获取到。
示例
当初始的资源个数为1的时候,Semaphore退化为排他锁。正因为如此,Semaphone的实现原理和锁十分类似,是基于AQS,有公平和非公平之分。Semaphore相关类的继承体系如下图所示:
继承体系
创建 Semaphore 实例的时候,需要一个参数 permits,这个基本上可以确定是设置给 AQS 的 state 的,然后每个线程调用 acquire 的时候,执行 state = state - 1,release 的时候执行 state = state + 1,当然,acquire 的时候,如果 state = 0,说明没有资源了,需要等待其他线程 release。
Semaphore 分公平策略和非公平策略
4.2.1 FairSync
/**
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
// 区别就在于是不是会先判断是否有线程在排队,然后才进行 CAS 减操作
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
4.2.2 NofairSync
通过对比发现公平和非公平的区别就在于是否多了一个hasQueuedPredecessors 的判断
/**
* NonFair version
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
后面的代码和 CountDownLatch 的是完全一样,都是基于共享锁的实现。
5 线程间交换数据的Exchanger
Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交 换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过 exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也 执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产 出来的数据传递给对方。
5.1 使用场景
Exchanger用于线程之间交换数据,代码如下:
public class ExchangerDemo {
private static final Random random = new Random();
public static void main(String[] args) {
// 建一个多线程共用的exchange对象
// 把exchange对象传给3个线程对象。每个线程在自己的run方法中调用exchange,把自 己的数据作为参数
// 传递进去,返回值是另外一个线程调用exchange传进去的参数
Exchanger<String> exchanger = new Exchanger<>();
new Thread("线程1") {
@Override
public void run() {
while (true) {
try {
// 如果没有其他线程调用exchange,线程阻塞,直到有其他线程调 用exchange为止。
String otherData = exchanger.exchange("交换数据1");
System.out.println(Thread.currentThread().getName() + "得到<==" + otherData);
Thread.sleep(random.nextInt(2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
new Thread("线程2") {
@Override
public void run() {
while (true) {
try {
String otherData = exchanger.exchange("交换数据2");
System.out.println(Thread.currentThread().getName() + "得到<==" + otherData);
Thread.sleep(random.nextInt(2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
new Thread("线程3") {
@Override
public void run() {
while (true) {
try {
String otherData = exchanger.exchange("交换数据3");
System.out.println(Thread.currentThread().getName() + "得到<==" + otherData);
Thread.sleep(random.nextInt(2000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
}
}
在上面的例子中,3个线程并发地调用exchange(...),会两两交互数据,如1/2、1/3和2/3。
5.2 Exchanger实现原理
Exchanger的核心机制和Lock一样,也是CAS+park/unpark。
5.2.1 Exchanger内部代码
首先,在Exchanger内部,有两个内部类:Participant和Node,代码如下:
public class Exchanger<V> {
//...
// 添加了Contended注解,表示伪共享与缓存行填充
@sun.misc.Contended static final class Node {
int index; // Arena index
int bound; // Last recorded value of Exchanger.bound
int collides; // 本次绑定中,CAS操作失败次数
int hash; // 自旋伪随机
Object item; // 本线程要交换的数据
volatile Object match; // 对方线程交换来的数据
// 当前线程
volatile Thread parked; // 当前线程阻塞的时候设置该属性,不阻塞为null。
}
/** The corresponding thread local class */
static final class Participant extends ThreadLocal<java.util.concurrent.Exchanger.Node> {
public java.util.concurrent.Exchanger.Node initialValue() { return new java.util.concurrent.Exchanger.Node(); }
}
//...
}
每个线程在调用exchange(...)方法交换数据的时候,会先创建一个Node对象。
这个Node对象就是对该线程的包装,里面包含了3个重要字段:第一个是该线程要交互的数据,第二个是对方线程交换来的数据,最后一个是该线程自身。
一个Node只能支持2个线程之间交换数据,要实现多个线程并行地交换数据,需要多个Node,因此在Exchanger里面定义了Node数组:
/**
* Elimination array; null until enabled (within slotExchange).
* Element accesses use emulation of volatile gets and CAS.
*/
private volatile Node[] arena;
5.2.2 exchange(V x)实现分析
明白了大致思路,下面来看exchange(V x)方法的详细实现:
@SuppressWarnings("unchecked")
public V exchange(V x) throws InterruptedException {
Object v;
Object item = (x == null) ? NULL_ITEM : x; // translate null args
if ((arena != null ||
(v = slotExchange(item, false, 0L)) == null) &&
((Thread.interrupted() || // disambiguates null return
(v = arenaExchange(item, false, 0L)) == null)))
throw new InterruptedException();
return (v == NULL_ITEM) ? null : (V)v;
}
上面方法中,如果arena不是null,表示启用了arena方式交换数据。如果arena不是null,并且线程被中断,则抛异常
如果arena不是null,并且arenaExchange的返回值为null,则抛异常。对方线程交换来的null值是封装为NULL_ITEM对象的,而不是null。
如果slotExchange的返回值是null,并且线程被中断,则抛异常。
如果slotExchange的返回值是null,并且areaExchange的返回值是null,则抛异常。
slotExchange的实现:
/**
* Exchange function used until arenas enabled. See above for explanation.
* 如果不启用arenas,则使用该方法进行线程间数据交换。
* @param item 需要交换的数据
* @param timed 是否是计时等待,true表示是计时等待
* @param ns 如果是计时等待,该值表示最大等待的时长。
* @return 对方线程交换来的数据;如果等待超时或线程中断,或者启用了arena,则返回 null。
*/
private final Object slotExchange(Object item, boolean timed, long ns) {
// participant在初始化的时候设置初始值为new Node()
// 获取本线程要交换的数据节点
Node p = participant.get();
// 获取当前线程
Thread t = Thread.currentThread();
// 如果线程被中断,则返回null。
if (t.isInterrupted()) // preserve interrupt status so caller can recheck
return null;
for (Node q;;) {
// 如果slot非空,表明有其他线程在等待该线程交换数据
if ((q = slot) != null) {
// CAS操作,将当前线程的slot由slot设置为null
// 如果操作成功,则执行if中的语句
if (U.compareAndSwapObject(this, SLOT, q, null)) {
// 获取对方线程交换来的数据
Object v = q.item;
// 设置要交换的数据
q.match = item;
// 获取q中阻塞的线程对象
Thread w = q.parked;
if (w != null)
// 如果对方阻塞的线程非空,则唤醒阻塞的线程
U.unpark(w);
return v;
}
// create arena on contention, but continue until slot null
// 创建arena用于处理多个线程需要交换数据的场合,防止slot冲突
if (NCPU > 1 && bound == 0 &&
U.compareAndSwapInt(this, BOUND, 0, SEQ))
arena = new Node[(FULL + 2) << ASHIFT];
}
// 如果arena不是null,需要调用者调用arenaExchange方法接着获取对方线程交 换来的数据
else if (arena != null)
return null; // caller must reroute to arenaExchange
else {
// 如果slot为null,表示对方没有线程等待该线程交换数据
// 设置要交换的本方数据
p.item = item;
// 设置当前线程要交换的数据到slot
// CAS操作,如果设置失败,则进入下一轮for循环
if (U.compareAndSwapObject(this, SLOT, null, p))
break;
p.item = null;
}
}
// await release
// 没有对方线程等待交换数据,将当前线程要交换的数据放到slot中,是一个Node对象
// 然后阻塞,等待唤醒
int h = p.hash;
// 如果是计时等待交换,则计算超时时间;否则设置为0。
long end = timed ? System.nanoTime() + ns : 0L;
// 如果CPU核心数大于1,则使用SPINS数,自旋;否则为1,没必要自旋。
int spins = (NCPU > 1) ? SPINS : 1;
// 记录对方线程交换来的数据
Object v;
// 如果p.match==null,表示还没有线程交换来数据
while ((v = p.match) == null) {
// 如果自旋次数大于0,计算hash随机数
if (spins > 0) {
// 生成随机数,用于自旋次数控制
h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
if (h == 0)
h = SPINS | (int)t.getId();
else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield();
}
// p是ThreadLocal记录的当前线程的Node。
// 如果slot不是p表示slot是别的线程放进去的
else if (slot != p)
spins = SPINS;
else if (!t.isInterrupted() && arena == null &&
(!timed || (ns = end - System.nanoTime()) > 0L)) {
U.putObject(t, BLOCKER, this);
p.parked = t;
if (slot == p)
U.park(false, ns);
p.parked = null;
U.putObject(t, BLOCKER, null);
}
else if (U.compareAndSwapObject(this, SLOT, p, null)) {
// 没有被中断但是超时了,返回TIMED_OUT,否则返回null
v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
break;
}
}
// match设置为null值 CAS
U.putOrderedObject(p, MATCH, null);
p.item = null;
p.hash = h;
// 返回获取的对方线程交换来的数据
return v;
}
arenaExchange的实现:
/**
* Exchange function when arenas enabled. See above for explanation.
* 当启用arenas的时候,使用该方法进行线程间的数据交换。
* @param item 本线程要交换的非null数据。
* @param timed 如果需要计时等待,则设置为true。
* @param ns 表示计时等待的最大时长。
* @return 对方线程交换来的数据。如果线程被中断,或者等待超时,则返回null。
*/
private final Object arenaExchange(Object item, boolean timed, long ns) {
Node[] a = arena;
Node p = participant.get();
// 访问下标为i处的slot数据
for (int i = p.index;;) { // access slot at i
int b, m, c; long j; // j is raw array offset
Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);
// 如果q不是null,则将数组的第j个元素由q设置为null
if (q != null && U.compareAndSwapObject(a, j, q, null)) {
// 获取对方线程交换来的数据
Object v = q.item; // release
// 设置本方线程交换的数据
q.match = item;
Thread w = q.parked;
if (w != null)
// 如果对方线程非空,则唤醒对方线程
U.unpark(w);
return v;
}
// 如果自旋次数没达到边界,且q为null
else if (i <= (m = (b = bound) & MMASK) && q == null) {
// 提供本方数据
p.item = item; // offer
// 将arena的第j个元素由null设置为p
if (U.compareAndSwapObject(a, j, null, p)) {
long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;
Thread t = Thread.currentThread(); // wait
// 自旋等待
for (int h = p.hash, spins = SPINS;;) {
// 获取对方交换来的数据
Object v = p.match;
// 如果对方交换来的数据非空
if (v != null) {
// 将p设置为null,CAS操作
U.putOrderedObject(p, MATCH, null);
// 清空
p.item = null; // clear for next use
p.hash = h;
// 返回交换来的数据
return v;
}
// 产生随机数,用于限制自旋次数
else if (spins > 0) {
h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift
if (h == 0) // initialize hash
h = SPINS | (int)t.getId();
else if (h < 0 && // approx 50% true
(--spins & ((SPINS >>> 1) - 1)) == 0)
Thread.yield(); // two yields per wait
}
// 如果arena的第j个元素不是p
else if (U.getObjectVolatile(a, j) != p)
spins = SPINS; // releaser hasn't set match yet
else if (!t.isInterrupted() && m == 0 &&
(!timed ||
(ns = end - System.nanoTime()) > 0L)) {
U.putObject(t, BLOCKER, this); // emulate LockSupport
p.parked = t; // minimize window
if (U.getObjectVolatile(a, j) == p)
U.park(false, ns);
p.parked = null;
U.putObject(t, BLOCKER, null);
}
// arena的第j个元素是p并且CAS设置arena的第j个元素由p设置 为null成功
else if (U.getObjectVolatile(a, j) == p &&
U.compareAndSwapObject(a, j, p, null)) {
if (m != 0) // try to shrink
U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);
p.item = null;
p.hash = h;
i = p.index >>>= 1; // descend
// 如果线程被中断,则返回null值
if (Thread.interrupted())
return null;
if (timed && m == 0 && ns <= 0L)
// 如果超时,返回TIMED_OUT。
return TIMED_OUT;
break; // expired; restart
}
}
}
else
p.item = null; // clear offer
}
else {
if (p.bound != b) { // stale; reset
p.bound = b;
p.collides = 0;
i = (i != m || m == 0) ? m : m - 1;
}
else if ((c = p.collides) < m || m == FULL ||
!U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {
p.collides = c + 1;
i = (i == 0) ? m : i - 1; // cyclically traverse
}
else
i = m + 1; // grow
p.index = i;
}
}
}
网友评论