线程间的基本协作方式请参考 多线程协作方式。
在此基础上,结合源码梳理一下每种方式的实现原理。
Synchronized
·synchronized·是Java原生的互斥同步锁,使用方便,对于·synchronized·修饰的方法或同步块,无需再显式释放锁。·synchronized·底层是通过·monitorenter·和·monitorexit·两个字节码指令来实现加锁解锁操作的。
除Synchronized之外的线程同步工具
除了Synchronized,其余的线程协作方式都是在Java API层面的。这些工具基本都依赖于AbstractQueuedSynchronizer
。AbstractQueuedSynchronizer
是Java并发很强大的同步器,它内部维护了一个等待锁的双向链表。链表节点是自定义的Node(定义参考以下源码),每个Node包含了一个线程对象,它的前后指针,以及节点的状态。对于线程的阻塞和唤醒又是基于LockSupport
提供的park, unpark
实现。
LockSupport
,是线程阻塞的原语。详细请参考 JAVA并发梳理(一)LockSupport。
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
AbstractQueuedSynchronizer
还提供了几个钩子方法供其子类覆写来提供各自的实现方式,比如:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
ReentrantLock, Semaphore, CountDownLatch等内部都是通过继承AbstractQueuedSynchronizer
来实现的。
关于更多关于AbstractQueuedSynchronizer
的实现,请参考 Java AQS AbstractQueuedSynchronizer。
在了解AbstractQueuedSynchronizer
的基础上,我们来看看这些高级的线程协作是怎么做的。
1. ReentrantLock
ReentrantLock
做为API层面的互斥锁,需要显式地去加锁解锁。底层是基于AbstractQueuedSynchronizer
实现的。
ReentrantLock 是之前的一篇总结。下面再做点补充:
关于与ReentrantLock
配合使用的Condition
。其实是一个ConditionObject
对象。类ConditionObject
也是定义在AbstractQueuedSynchronizer
中的。每个ConditionObject
都自己维护一个Node链表,表示等待在这个条件上的线程队列。值得注意的是,当调用signal()/signalAll()之后,这些被通知的线程并没有立马等到锁,而是会被加入到该ConditionObject
对象所关联的ReentrantLock
的等待队列中。
结合线程从运行到等待到阻塞的状态变化过程,其实跟上面的实现是呼应的。
final ConditionObject newCondition() {
return new ConditionObject();
}
理解并仔细体会一下ReentrantLock
和各个ConditionObject
对象背后的队列。
2. Semaphore
如前面所说,首先Semaphore
内部也是基于AbstractQueuedSynchronizer
。它也提供了公平和非公平的实现。跟ReentrantLock
的区别在于,它含有多个许可。
一个简单的测试类:
public class SemaphoreTest {
private static Semaphore semaphore = new Semaphore(5);
public static void main(String[] args) {
for(int i = 0; i < 10; i++) {
new Thread(new Driver(i)).start();
}
}
static class Driver implements Runnable{
int i;
Driver(int i) {
this.i = i;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("Driver " + i + " is driving ... ");
Thread.sleep(1000);
} catch (InterruptedException e) {
} finally {
semaphore.release();
System.out.println("Driver " + i + " stops ... ");
}
}
}
}
3. CountDownLatch
类似计数器的功能,比如某个线程任务,需要等待其他 N 个线程任务执行完才能执行。内部实现基于AbstractQueuedSynchronizer
。
public class CDLTest {
private static CountDownLatch cdl = new CountDownLatch(10);
private static ThreadLocal<Random> r = new ThreadLocal<Random>() {
@Override
protected Random initialValue() {
return new Random(1000);
}
};
public static void main(String[] args) throws InterruptedException {
for(int i = 0; i < 10; i++) {
new Thread(new Phase(i)).start();
}
cdl.await();
System.out.println("Main continues .... ");
}
static class Phase implements Runnable {
int i;
Phase(int i) {
this.i = i;
}
@Override
public void run() {
try {
Thread.sleep(r.get().nextInt(1000));
System.out.println("Phase " + i + " is ready. ");
cdl.countDown();
} catch (InterruptedException e) {
}
}
实际上,这几种基于AbstractQueuedSynchronizer
的工具,基本原理都是等待队列,然后各自实现不同的方式来acquire
和release
,里面有个int State
。
4. ReentrantReadWriteLock
基于AbstractQueuedSynchronizer
实现。每个ReentrantReadWriteLock
分别包含一个WriteLock
对象和一个ReadLock
。也分别有公平锁和非公平锁的实现。
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
内部还是维护一个等待队列(链表)。根据节点状态判断它是在请求读还是写。
final boolean isShared() {
return nextWaiter == SHARED;
}
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
return list;
}
public final Collection<Thread> getExclusiveQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (!p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
5. CyclicBarrier
内部是基于ReentrantLock
和Condition
来实现的。N 个线程互相等待,任何一个线程达到某个状态之前,所有的线程都必须等待。必须等到所有线程都达到某个状态,这 N 个线程才能继续执行。应用场景如,有100个人要去旅游,每到达10个人就开一个团出发。
private static class Generation {
boolean broken = false;
}
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
CyclicBarrier
还可以接收一个参数作为barrierCommand
,指定当计数器一次计数完成之后,系统会执行的动作。
CyclicBarrier.await()
方法可能会抛出两个异常。一个是InterruptedException
,也就是在等待过程中,线程被中断,这使得线程在等待是依然可以响应外部紧急事件;另一个异常是CyclicBarrier
特有的BrokenBarrierException
。 一旦遇到这个一次杨,则表示当前的栅栏已经破损了,可能系统已经没有办法等待所有的线程到齐了。例如,第五个线程被中断,它会得到一个InterruptedException
, 而其他9个线程会分别得到BrokenBarrierException
。这个异常可以避免其他9个线程进行永久的、无谓的等待。
public class CBTest {
private static CyclicBarrier cb = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("5 people is enough. Setting out .... ");
}
});
private static ThreadLocal<Random> r = new ThreadLocal<Random>() {
@Override
protected Random initialValue() {
return new Random(2000);
}
};
public static void main(String[] args) {
for(int i = 0; i < 100; i ++) {
new Thread(new Visitor(i)).start();
}
}
static class Visitor implements Runnable{
int i;
public Visitor(int i) {
this.i = i;
}
@Override
public void run() {
try {
Thread.sleep(r.get().nextInt(2000));
System.out.println(i + " is waiting .... ");
cb.await();
System.out.println(i + " sets out .... ");
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
}
}
}
}
了解一下多线程各种协作方式的实现,可以发现其实每个工具之所以发挥相应的作用都是有其原理的,不是什么神秘的东西。每次看一遍都能有些不一样的收获。下一步希望能把Node中定义的那几种State
在acquire, release
中发挥的作用搞搞清楚。
网友评论