Semaphore也叫信号量,在JDK1.5被引入,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。
Semaphore和ReentrantLock类似,获取许可有公平策略和非公平许可策略,默认情况下使用非公平策略。
Semaphore内部维护了一组虚拟的许可,许可的数量可以通过构造函数的参数指定。
访问特定资源前,必须使用acquire方法获得许可,如果许可数量为负数,该线程则一直阻塞,直到有可用许可。访问资源后,使用release释放许可。
下面看一下Semaphore的主要结构
public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210L;
/** All mechanics via AbstractQueuedSynchronizer subclass */
private final Sync sync;
/**
* Synchronization implementation for semaphore. Uses AQS state
* to represent permits. Subclassed into fair and nonfair
* versions.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
}
/**
* NonFair version
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
}
/**
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
}
}
上面是Semaphore代码的精简版,只保留了主要结构。Semaphore的所有操作都是通过Sync完成的,Sync又是一个静态内部抽象类,继承的AQS。Semaphore的公平/非公平是NonfairSync和FairSync实现的。
构造函数
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
所有的构造函数最后都会执行到Sync的构造函数,执行setState(permits);
获取许可
acquire 从Semaphore中获取许可,直到有一个许可可用,或者线程被中断。
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
看过AQS的都知道,acquireSharedInterruptibly会调用tryAcquireShared方法。
NonfairSync的tryAcquireShared
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 当前许可数
int available = getState();
// 还剩余的许可数
int remaining = available - acquires;
// 没有剩余许可数了或者CAS更新available成功则返回remaining
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
FairSync的tryAcquireShared
protected int tryAcquireShared(int acquires) {
for (;;) {
// 如果有等待时间更长的线程存在那么返回-1
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
acquireUninterruptibly,和acquire一样,只不过是忽略中断
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
tryAcquire,许可获取成功返回true,没有可用的许可返回false
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
释放许可
public void release() {
sync.releaseShared(1);
}
// AQS的方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// 重写AQS的tryReleaseShared方法
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 许可加1
int current = getState();
int next = current + releases;
// 整型溢出了
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 把stateCAS更新成新的许可数
if (compareAndSetState(current, next))
return true;
}
}
网友评论