一、什么是Semaphore?
计数信号量
二、Semaphore有什么用途?
像是一个装着资源的容器
三、Semaphore应用场景
学生饭堂打饭
四、代码测试
以下测试的是Semaphore的API
Semaphore semaphore_nofair = new Semaphore(5);//默认是非公平锁
Semaphore semaphore_fair = new Semaphore(5, true);
//isFair()
System.out.println("是否公平FIFO:" + semaphore_fair.isFair());
System.out.println("是否公平FIFO:" + semaphore_nofair.isFair());
//avaliablePermits():获取当前可用的许可证数量
try {
semaphore_fair.acquire();
semaphore_fair.acquire(3);//acquire是消耗资源
semaphore_fair.release(10);//release是创建资源
semaphore_fair.acquire(6);
semaphore_nofair.acquire(3);
semaphore_nofair.release(10);
semaphore_nofair.acquire(6);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("获取当前可用的许可证数量:开始---" + semaphore_fair.availablePermits());//5
System.out.println("获取当前可用的许可证数量:开始---" + semaphore_nofair.availablePermits());//6
五、源码解析
有三个内部类,一个是普通的Sync类(父类),一个公平的FairSync(子类),一个是非公平的NonFairSync(子类)
1.Sync类
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
//获取有多少许可证
final int getPermits() {
return getState();
}
//非公平锁的获取
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
//尝试释放共享锁,释放成功就调用CAS来重置许可证的数量,然后返回true
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
//减少许可证的数量,也是调用CAS
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
//获取剩余的许可证,然后调用CAS
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
2.NonFairSync和FairSync
/**
* NonFair version
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);//调用父类Sync的方法
}
}
/**
* Fair version
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
二者区别:FairSync与NonFairSync的区别就在于会首先判断当前队列中有没有线程在等待,如果有,就老老实实进入到等待队列;而不像NonfairSync一样首先试一把,说不定就恰好获得了一个许可,这样就可以插队了。
3.构造方法
public Semaphore(int permits) {
sync = new NonfairSync(permits);//默认是非公平
}
/**
* Creates a {@code Semaphore} with the given number of
* permits and the given fairness setting.
*
* @param permits the initial number of permits available.
* This value may be negative, in which case releases
* must occur before any acquires will be granted.
* @param fair {@code true} if this semaphore will guarantee
* first-in first-out granting of permits under contention,
* else {@code false}
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
4.普通方法
//获取或消耗一个资源(可中断)
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//获取或消耗n个资源(可中断)
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
//获取或消耗一个不可中断资源
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
//获取或消耗n个不可中断资源
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
//创建或释放一个资源
public void release() {
sync.releaseShared(1);
}
//创建或释放n个资源
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
//获取当前有多少个资源
public int availablePermits() {
return sync.getPermits();
}
/**
* Acquires and returns all permits that are immediately available.
*
* @return the number of permits acquired
*/
//获取并消耗当前有多少个资源,调用此方法后,资源数量变为0
public int drainPermits() {
return sync.drainPermits();
}
//减少n个资源
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
//当前的Semaphore是否是公平的
public boolean isFair() {
return sync instanceof FairSync;
}
/**
* Queries whether any threads are waiting to acquire. Note that
* because cancellations may occur at any time, a {@code true}
* return does not guarantee that any other thread will ever
* acquire. This method is designed primarily for use in
* monitoring of the system state.
*
* @return {@code true} if there may be other threads waiting to
* acquire the lock
*/
//当前是否有等待的资源的线程
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
/**
* Returns an estimate of the number of threads waiting to acquire.
* The value is only an estimate because the number of threads may
* change dynamically while this method traverses internal data
* structures. This method is designed for use in monitoring of the
* system state, not for synchronization control.
*
* @return the estimated number of threads waiting for this lock
*/
//当前等待资源的线程有多少个
public final int getQueueLength() {
return sync.getQueueLength();
}
六、总结:
Semaphore是信号量,用于管理一组资源。其内部是基于AQS的共享模式,AQS的状态表示许可证的数量,在许可证数量不够时,线程将会被挂起;而一旦有一个线程释放一个资源,那么就有可能重新唤醒等待队列中的线程继续执行。
网友评论