信号量 Semaphore 是一个并发工具类,用来控制可同时并发的线程数,其内部维护了一组虚拟许可,通过构造器指定许可的数量,每次线程执行操作时先通过acquire方法获得许可,执行完毕再通过release方法释放许可。如果无可用许可,那么 acquire 方法将一直阻塞,直到其它线程释放许可。
Semaphore 通常用于线程对统一资源的访问控制。需要注意的是它没有实现 Lock 接口,也不支持 Condition。官方示例:
class Pool {
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
// Not a particularly efficient data structure; just for demo
protected Object[] items = ... whatever kinds of items being managed
protected boolean[] used = new boolean[MAX_AVAILABLE];
protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null; // not reached
}
protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}
在获取项目之前,每个线程必须从信号灯获取许可,以确保可以使用该项目。线程完成该项目后,将其返回到池中,并向信号量返回一个许可,从而允许另一个线程获取该项目。
Semaphore 内部使用了共享锁,没有独占锁和 ReentrantLock 正好相反,还有一个 ReadLock in ReentrantReadWriteLock 的区别是 Semaphore 可以在构造方法中指定共享锁的数量,ReadLock 只能是默认的 65535,而且大于 65535 还会抛出异常。Semaphore 在大于指定的共享锁数量时会阻塞。
同样 Semaphore 有三个静态内部类 Sync,NonFairSync,FairSync。
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
//调用过程:
//Semaphore#acquire-> NonFairSync#tryAcquireShared -> 本方法
final int nonfairTryAcquireShared(int acquires) {
for (;;) {//采用自旋直到 没有空闲的共享锁或 CAS 修改 state 的值成功
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
//调用过程:
//Semaphore#release -> AQS#releaseShared -> 本方法
protected final boolean tryReleaseShared(int releases) {
for (;;) {//自旋直到 CAS 成功
int current = getState();
int next = current + releases;
if (next < current) // overflow,表示 releases 不能是一个负值
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
//减少许可证的数量
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
//把当前的可用的许可证清空
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
/**
* 非公平锁实现
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
/**
* 公平锁实现
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
网友评论