Java并发工具
并发工具实现一般都将实现了AQS的自定义同步器sync定义为内部类,而同步类实现某个接口对外服务,sync只需实现state的 获取-释放方式tryAcquire-tryRelease,线程排队、等待、唤醒都由上层AQS实现
闭锁CountDownLatch
用以协调多个线程之间的同步和通信
说明 | 函数 |
---|---|
等待计数器归零(可中断) | await() |
限时等待计数器归零 | await(long time,TimeUnit unit) |
计数器-- | countDown() |
获取当前计数器 | getCount() |
使用共享式AQS,await对应acquireSharedInterruptibly(1),countDown对应releaseShared(1)
//尝试获取:state归零方可成功获取
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//尝试释放:当前state--
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
计数信号量 Semephore
允许n个任务同时访问某个资源,可将信号量看作向外分发资源的许可证
作用 | API |
---|---|
构造一个信号量 | Semaphore(int permits, boolean fair) |
获取一个【指定个】资源,可中断 | acquire(【int permits】) |
释放一个【指定个】资源 | release(【int permits】) |
可用资源数 | availablePermits |
减少一个资源 | drainPermits |
减少制定个资源 | reducePermits(int reduction) |
获取入队获取资源的Sync队列长度 | getQueueLength |
获取入队获取资源的线程集合 | getQueuedThreads |
获取一个【指定个】资源,不可中断 | acquireUninterruptibly(【int permits】) |
尝试获取一个【指定个】资源<br />返回是否成功<br />非公平型 | tryAcquire(【int permits】) |
尝试在指定时限内获取一个【指定个】资源<br />返回是否成功 | tryAcquire(【int permits】,long timeout, TimeUnit unit) |
公平型——tryAcquireShared要判断是否有其他节点排在此节点前,无方可进行获取
非公平型——tryAcquireShared直接获取资源
//非公平尝试获取:state-获取数 >= 0方可获取,state-获取数
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
//尝试释放:state+释放数
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
可重入锁 ReentrantLock
递归无阻塞的同步机制
作用 | API |
---|---|
锁 | lock |
解锁 | unLock |
可中断锁 | lockInterruptibly |
限时等待锁<br />配合Condition的方法await(),signal(),signalAll()使用(必须在获得锁后) | tryLock |
//非公平尝试获取:无线程占有锁——占有锁并设置资源为获取数,当前线程占有锁——state+获取数
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//尝试释放:当前线程拥有锁时方可释放,state-释放数,若state归0则释放锁
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
可重入读写锁 ReentrantReadWriteLock
ReentrantReadWriteLock 读写锁,适用于读多写少的并发情况
有写锁,除写锁持有线程外无法获取读锁和写锁
作用 | API |
---|---|
构造一个可重入读写锁 | ReentrantReadWriteLock(boolean fair) |
获取读锁ReentrantReadWriteLock.ReadLock | readLock |
获取写锁ReentrantReadWriteLock.WriteLock | writeLock |
state 高16位读状态,低16位写状态
读锁使用共享模式,写锁使用独占模式
写锁
public void lock() {
sync.acquire(1);
}
//独占式尝试获取,条件无读锁且无其他线程写锁,state+获取数
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
//有读线程或者有其他写线程——失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//当前线程有其他写操作——state++
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
setState(c + acquires);
return true;
}
//尝试state+获取资源数,设置当前线程独占锁
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
public void unlock() {
sync.release(1);
}
//独占式尝试释放:state-释放数,state归零则释放锁
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
读锁
public void lock() {
sync.acquireShared(1);
}
//共享式尝试获取:有其他写线程-失败->当前读线程不应阻塞&读锁未达上限->state+1个读单元
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
//有其他写线程且非当前线程——失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
//读锁不应阻塞且读锁未达到上限(65535),CAS设置state+1个读单元(65536)
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//读锁应该阻塞或达到上限
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
//有写锁且非当前线程——失败
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
//读锁应该阻塞
} else if (readerShouldBlock()) {
//第一个获取读锁的线程是当前线程
if (firstReader == current) {
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
//共享式尝试释放
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
循环栅栏 CyclicBarrier
允许一组线程互相等待直到全部到达栅栏位置
作用 | API |
---|---|
构造一个循环栅栏 | CyclicBarrier(int 互相等待数量,[Runnabl全部抵达栅栏时的回调方法]) |
等待直到所有线程都调用此方法<br />返回此现场到达栅栏的序号 | await |
重置栅栏 | reset |
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
//可重入锁
lock.lock();
try {
final Generation g = generation;
//栅栏已被破坏——破栅栏异常
if (g.broken)
throw new BrokenBarrierException();
//线程被中断——破环栅栏,中断异常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//序号
int index = --count;
//最终到达栅栏者——回调事件并唤醒所有之前抵达的线程
if (index == 0) {
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 等待其他线程抵达
for (;;) {
try {
if (!timed)
//无时限等待(开锁)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
交换数据栅栏 Exchanger
允许两个任务交换对象
方法
V exchange(V data) 调用线程陷入阻塞状态直到另一线程调用此方法,然后以线程安全的方式交换数据,释放线程
网友评论