[toc]
1.类结构及注释
1.1 类结构
Semaphore是基于AQS实现的信号量,这个类主要用于控制线程的访问数,或者对并发的数量进行控制。以将资源的被获取方的速度限制在特定的值内。
其类结构如下:
image.png
其内部有持有基于AQS的Sync类,Sync类有FairSyn和NonfairSync两个类来实现公平和非公平锁。
1.2 注释部分
Semaphore是一个用于计数的信号量,从概念上讲,信号量实际上是一组许可证,每个acquire都需要获得许可证才能运行。而每个release方法则会将之前获取的许可证释放。但是,再Semaphore中,并没有实际的许可证对象存在,Semaphore只是保留了一个用于计数的数量并进行相应的加减操作。
信号量通常用于限制线程数量,而不能访问某些资源。例如,如下是一个使用信号量控制项目pool访问的类。
class Pool {
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
// Not a particularly efficient data structure; just for demo
protected Object[] items = ... whatever kinds of items being managed
protected boolean[] used = new boolean[MAX_AVAILABLE];
protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null; // not reached
}
protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}}
在获取之前,每个线程必须从信号量获取许可,以确保每个获取项是可以使用的。当线程完成该获取项之后,它将返回到pool中,并向信号量返回一个许可。从而允许另一个线程获取该项。需要注意的是,在调用acquire时,不会保持任何锁同步,因为这将阻止某个项返回到pool中,信号量封装了限制访问池所需的同步,与维护池本身一致性所需的任何同步分开。
初始化一个信号量可以用作互斥锁,这样的话该信号量最多只有一个许可。这通常被称为二进制信号量,因为它只有两种状态,一个许可可用,或者没有许可可用。当以这种方式使用时,二进制信号量具有锁定属性,这与许多Lock的实现不同。这个可以由所有者之外的线程释放,因为信号量本身没有所有权概念。这在某些特殊情况下(如死锁恢复)会很有用。
此类的构造函数可以选择公平参数fairness。设置为false时,此类不保证线程获得许可的顺序。特别是允许插队,也就是说,可以在正在等待的线程之前为调用acquire的线程分配一个许可-从逻辑上讲,新线程将自己置于该线程的头部等待线程的队列。当公平性设置为true时,信号量可确保选择调用任何acquire()方法的线程以处理它们调用这些方法的顺序获得许可(先进先出; FIFO)。请注意,FIFO排序必须适用于这些方法中的特定内部执行点。因此,一个线程有可能在另一个线程之前调acquisition,但在另一个线程之后到达排序点,并且类似地从该方法返回时也是如此。另请注意,未定时的tryAcquire方法不遵循公平性设置,但会采用任何可用的许可。
通常,用于控制资源访问的信号量应初始化为公平,以确保没有线程因访问资源而挨饿。当使用信号量进行其他类型的同步控制时,非公平排序的吞吐量优势通常会超过公平考虑。
此类还提供了方便的方法来同时acquire(int)获取和release(int)释放多个许可。当在没有设置公平的情况下使用这些方法时,请注意无限期推迟的风险增加。
内存一致性分析:调用release()之类的“发布”方法之前,线程中的操作发生在成功的“获取”方法(如acquire()之前)在另一个线程中。
2.内部类Sync
与众多Lock的实现一样,Semaphore也是基于AQS实现的。在其内部有Sync类。Sync继承了AbstractQueueSynhronizer。
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
}
其实现的方法如下:
2.1 nonfairTryAcquireShared
非公平锁的模式下尝试获取共享锁。
final int nonfairTryAcquireShared(int acquires) {
//死循环
for (;;) {
//获得state状态
int available = getState();
//用当前的可用state的值减去acquires
int remaining = available - acquires;
//如果前面减去的结果小于0,或者cas的方式设置state成功,则退出
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
2.2 tryReleaseShared
释放共享锁
protected final boolean tryReleaseShared(int releases) {
//死循环
for (;;) {
//获得当前的状态
int current = getState();
//next为当前state+release的值
int next = current + releases;
//如果next比current小,那么只有一种可能就是越界,导致这个结果为负数
if (next < current) // overflow
//抛出异常
throw new Error("Maximum permit count exceeded");
如果没有异常,则cas设置next 成功则为true
if (compareAndSetState(current, next))
return true;
}
}
2.3 reducePermits
减少许可证
final void reducePermits(int reductions) {
//死循环
for (;;) {
//获得当前的state
int current = getState();
//next为当前的state减去应该减少的reductions
int next = current - reductions;
//如果next任然大于current则说明没有这么多许可证,此时抛出异常
if (next > current) // underflow
throw new Error("Permit count underflow");
//采用cas修改
if (compareAndSetState(current, next))
return;
}
}
2.4 drainPermits
将许可证清空
final int drainPermits() {
//死循环
for (;;) {
//current为当前的state
int current = getState();
//如果current为0或者cas将current设置为0
if (current == 0 || compareAndSetState(current, 0))
//返回
return current;
}
}
3.NonfairSync与FairSync
3.1 NonfairSync
NonfairSync是Sync的非公平锁的实现,其源码如下:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
实际上只需要实现一个关键的方法,tryAcquireShared。这个方法调用了前面在sync中定义的nonfairTryAcquireShared方法。
3.2 FairSync
FairSync是Sync的公平锁的实现,其源码如下:
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
//死循环
for (;;) {
//判断队列中是否存在前任节点,如果存在,返回-1,将当前线程阻塞
if (hasQueuedPredecessors())
return -1;
//available为当前state
int available = getState();
//remaing为available减去acquires
int remaining = available - acquires;
//如果remaining小于0 或者cas设置为0 则返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
4.构造方法
4.1 Semaphore(int permits)
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
默认情况下创建了一个非公平锁。
4.2 Semaphore(int permits, boolean fair)
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
创建的时候可以指定公平性,所谓非公平锁,在Semaphore中,就是当线程获得锁的时候,可以尝试进行一次插队,如果不成功再进行排队。那么公平锁的话,就是一上来就排队。
5.其他方法
5.1 acquire
从Semaphore中获得许可,阻塞,直到获取到可用的许可证为止。或者线程被中断。
如果能及时获取一个许可,那么减少这个许可的数量。
如果没有可用的许可,则出于线程调度目的,当前线程将被禁用,并处于休眠状态,直到发生以下两种情况之一:
其他一些线程为此信号量调用{release方法,接下来将为当前线程分配许可;或某些其他线程interrupt当前线程。
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
5.2 acquireUninterruptibly
获取一个许可,阻塞,直到许可可用。不支持中断。
如果能及时获取一个许可,那么减少这个许可的数量。
如果没有可用的许可,则当前线程将出于线程调度目的而被禁用,并处于休眠状态,直到某个其他线程为此信号量调用release方法,然后将为当前线程分配许可。
如果当前线程在等待许可时被interrupt中断,则它将继续等待,但是与没有许可的情况相比,为该线程分配许可的时间可能会有所变化。发生中断。当线程确实从该方法返回时,将设置其中断状态。
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
5.3 tryAcquire
仅在调用时可用时,才从此信号量获取许可。如果许可证可用,则获取许可证并立即返回,其值为true,从而将可用许可证数量减少一个。
如果没有许可,则此方法将立即返回值false。
即使已将此信号量设置为使用公平的排序策略,无论是否有其他线程正在等待,对tryAcquire的调用都可能会立即获得许可。这种插入是指,即使再某些情况下,这种行为也会破坏公平性。如果要遵守公平性设置,请使用等效的tryAcquire(long,TimeUnit)和tryAcquire(0,TimeUnit.SECONDS),这些方法还会检测中断。
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
5.4 tryAcquire(long timeout, TimeUnit unit)
如果在给定的等待时间内可用,并且当前线程尚未被中断interrupt ,则从此信号量获取许可。
如果许可证可用,则获取许可证并立即返回,其值为true,从而将可用许可证数量减少一个。
如果没有可用的许可,则出于线程调度的目的,当前线程将被禁用,并处于休眠状态,直到发生以下三种情况之一:
- 其他一些线程为此信号量调用release方法,接下来将为当前线程分配许可;
- 或其他某个线程interrupt当前线程;
- 或经过了指定的等待时间。
如果经过了许可,则返回true,如果当前线程,在进入此方法时已设置其中断状态;或在等待获得许可interrupt被中断,
将引发InterruptedException并清除当前线程的中断状态。
如果经过了指定的等待时间,则返回值false。如果时间小于或等于零,则该方法将根本不等待。
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
5.5 release
释放一个许可证,将其返回给信号量。
发放许可证,将可用许可证数量增加一个。如果有任何线程试图获取许可,则选择一个线程并授予刚刚释放的许可。出于线程调度目的而启用(重新)该线程。
不要求释放许可证的线程必须通过调用acquire获得该许可证。通过在应用程序中编程约定,可以正确使用信号量。
public void release() {
sync.releaseShared(1);
}
5.6 acquire(int permits)
从此信号量获取给定数量的许可,阻塞直到所有许可都可用,或者线程interrupt。
这个方法可以一次性获得多个许可证,如果这些许可是可用的,则及时返回。减少许可的可用总数。
如果没有足够的许可,则出于线程调度的目的,当前线程将被禁用,并处于休眠状态,直到发生以下两种情况之一:
- 其他一些线程为此信号量调用release方法之一,接下来将为当前线程分配许可,并且可用许可的数量可以满足此请求;- 某些其他线程 interrupt 当前线程。
如果当前线程,在进入此方法时已设置其中断状态;或在等待许可时被中断,将抛出InterruptedException,并清除中断状态。
相反,将要分配给该线程的所有许可都分配给其他尝试获取许可的线程,就像通过调用release使许可可用一样。
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
5.7 acquireUninterruptibly(int permits)
从该信号量获取给定数量的许可,直到所有条件都可用为止都将被阻止。
获取给定数量的许可(如果有),然后立即返回,将可用许可的数量减少给定数量。如果没有足够的许可,则当前线程出于线程调度目的而被禁用,并处于休眠状态,直到其他线程调用此信号量的release方法之一,当前线程将被分配许可,并且可用许可的数量可以满足该请求。
如果当前线程在等待许可时interrupt被中断,则它将继续等待,并且其在队列中的位置不受影响。当线程确实从该方法返回时,将设置其中断状态。
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
5.8 tryAcquire(int permits)
仅在调用时所有可用的情况下,从此信号量获取给定的许可数。如果有许可,则获取给定的许可数,并立即返回,值为{@code true},从而减少了给定数量的可用许可证。
如果没有足够的许可证,则此方法将立即返回false,并且可用许可证的数量不变。
即使已将此信号量设置为使用公平的排序策略,无论是否有其他线程正在等待,对tryAcquire的调用都会立即获得许可。即使破坏公平性,这种插入行为在某些情况下也可能有用。如果要遵守公平性设置,请使用几乎等效的tryAcquire(int,long,TimeUnit)tryAcquire(permits,0,TimeUnit.SECONDS)它还会检测到中断。
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
5.9 tryAcquire(int permits, long timeout, TimeUnit unit)
如果所有信号都在给定的等待时间内可用,并且当前线程尚未interrupt中断,则从此信号量获取给定数量的许可。
如果有可用许可证,则获取给定数量的许可证,并立即返回,其值为true,从而将可用许可证的数量减少给定数量。
如果没有足够的许可,则出于线程调度的目的,当前线程将被禁用,并且将处于休眠状态,直到发生以下三种情况之一:
- 其他一些线程为此信号量调用release方法之一,接下来将为当前线程分配许可,并且可用许可的数量可以满足此请求;- 要么其他一些线程interrupt interrupts当前线程;
- 要么经过指定的等待时间。
如果获得许可,则返回值 true。
如果当前线程:
在进入此方法时已设置其中断状态;或在等待获取许可时interrupt被中断,
然后将引发 InterruptedException并清除当前线程的中断状态。相反,将要分配给该线程的所有许可,都分配给其他尝试获取许可的线程,就好像通过调用release()使许可可用一样。如果经过了指定的等待时间,则返回值false。如果时间小于或等于零,则该方法根本不会等待。将分配给此线程的任何许可证,而是分配给其他尝试获取的线程许可,就像通过调用release()来获得许可一样。
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
5.10 release(int permits)
释放给定数量的许可证,将其返回到信号。
释放给定数量的许可证,将可用许可证的数量增加该数量。如果有任何线程试图获取许可,则选择一个线程并给出刚刚释放的许可。如果可用许可的数量满足该线程的请求,则出于线程调度的目的,(重新)启用该线程。如果满足该线程的请求后仍然有可用的许可,则将这些许可依次分配给其他尝试获取许可的线程。
无需要求释放许可的线程必须通过调用acquire获得许可。通过在应用程序中编程约定,可以正确使用信号量。
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
6.总结
本文对Semaphore的源码进行了分析,Semphore类似于交通控制的信号灯,通过许可证的方式,对竞争的资源的并发程度进行了控制。
网友评论