控制并发线程数Semaphore
生活中,我们过桥,如果桥就能过3个人,那么一次就只能走三个人,如果多了,那么就会有人掉河里了。这就是Semaphore控制人数过桥。(而此处就是Semaphore控制只能有特定数量的线程访问指定资源)。
继承与实现关系
public class Semaphore implements java.io.Serializable
Semaphore中的自定义的同步器
/**
*
* 自定义同步器继承AQS,使用AQS的状态state来控制同时访问的线程数(流量)
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
//创建指定线程数访问的同步器构造器
Sync(int permits) {
setState(permits);
}
//获取允许线程同时访问的数量
final int getPermits() {
return getState();
}
//采用非公平的方式尝试获取共享状态下的同步状态值
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//获取当前同步状态值
int available = getState();
//计算剩余的同步状态值
int remaining = available - acquires;
/**
* 如果剩余的同步状态值小于0或者当前的同步状态值为available
* 将当前同步状态值更新为remaining
*/
if (remaining < 0 ||
compareAndSetState(available, remaining))
//返回当前最新的同步状态值
return remaining;
}
}
//采用共享方式释放同步状态值
protected final boolean tryReleaseShared(int releases) {
//死循环
for (;;) {
//获取当前的同步状态值
int current = getState();
//计算如果释放同步状态值之后,得到的结果next
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//如果当前同步状态值为current,那么更新当前的同步状态值为next
if (compareAndSetState(current, next))
//返回true
return true;
}
}
//减少并发线程的数量
final void reducePermits(int reductions) {
//死循环
for (;;) {
//获取当前的同步状态值
int current = getState();
//计算出剩下的并发线程数
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
//如果当前同步状态值为current,那么更新当前的同步状态值为next
if (compareAndSetState(current, next))
return;
}
}
//将并发的线程数量调整为0
final int drainPermits() {
//死循环
for (;;) {
//获取当前的同步状态值
int current = getState();
/**
* 如果当前的同步状态值为0或者当前同步状态值等于current
* 那么将当前的同步状态值current更新为0
*/
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
非公平同步器
/**
* 非公平同步器
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
//非公平同步器构造器
NonfairSync(int permits) {
super(permits);
}
//在共享模式下尝试获取同步状态值
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
公平同步器
/**
* 公平同步器
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
//公平同步器构造器
FairSync(int permits) {
super(permits);
}
//在共享模式下尝试获取同步状态值
protected int tryAcquireShared(int acquires) {
//死循环
for (;;) {
//当前队列是否有前驱
if (hasQueuedPredecessors())
return -1;
//获取当前的同步状态值
int available = getState();
//计算出剩下的并发线程数
int remaining = available - acquires;
/**
* 如果剩下的并发线程数小于0或者当前同步状态值等于available
* 那么将当前的同步状态值更新为remaining
*/
if (remaining < 0 ||
compareAndSetState(available, remaining))
//返回计算出的最新同步状态值
return remaining;
}
}
}
构造器
/**
* 创建一个指定并发线程数的非公平同步器构造器
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
/**
* 创建一个指定并发线程数、是否公平的同步器构造器
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
方法
/**
* 创建一个指定并发线程数的非公平同步器构造器
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
/**
* 创建一个指定并发线程数、是否公平的同步器构造器
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
/**
* 在信号量Semaphore中获取一个许可
* 在获取一个许可前线程将会阻塞,否则线程被中断
* 整体许可数将减少1
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
*
* 在信号量Semaphore中获取一个许可
* 在获取一个许可前线程将会阻塞,虽然等待时被中断,但是仍然将继续等待
* 整体许可数将减少1
*/
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
/**
* 仅在调用时此信号量存在一个可用许可,才从信号量获取许可。
*/
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
/**
* 如果在给定的等待时间内,此信号量有可用的许可并且当前线程未被中断,则从此信号量获取一个许可
*/
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
* 释放一个许可,将其返回给信号量
*/
public void release() {
sync.releaseShared(1);
}
/**
* 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断
*/
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
/**
* 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞
*/
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
/**
* 仅在调用时此信号量中有给定数目的许可时,才从此信号量中获取这些许可
*/
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
/**
* 如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可。
*/
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
/**
* 释放给定数目的许可,将其返回到信号量
*/
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
/**
* 返回此信号量中当前可用的许可数
*/
public int availablePermits() {
return sync.getPermits();
}
/**
* 获取并返回立即可用的所有许可
*/
public int drainPermits() {
return sync.drainPermits();
}
/**
* 根据指定的缩减量减小可用许可的数目
*/
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
/**
* 如果此信号量的公平设置为 true,则返回 true
*/
public boolean isFair() {
return sync instanceof FairSync;
}
/**
* 查询是否有线程正在等待获取
*/
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
/**
* 返回正在等待获取的线程的估计数目
*/
public final int getQueueLength() {
return sync.getQueueLength();
}
/**
* 返回一个 collection,包含可能等待获取的线程
*/
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
应用例子
public class SemaphoreTest {
private static final int PERSON_NUM=4;
private static ExecutorService es=Executors.newFixedThreadPool(PERSON_NUM);
private static Semaphore s=new Semaphore(3,true);
public static void release(Semaphore s,String name){
s.release();
System.out.println(name+"已经离开桥了!");
}
public static void main(String[] args) {
es.execute(new Runnable(){
@Override
public void run() {
try {
s.acquire();
System.out.println("甲上桥了!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
es.execute(new Runnable(){
@Override
public void run() {
try {
s.acquire();
System.out.println("乙上桥了!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
es.execute(new Runnable(){
@Override
public void run() {
try {
s.acquire();
System.out.println("丙上桥了!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
es.execute(new Runnable(){
@Override
public void run() {
try {
release(s,"甲");
s.acquire();
System.out.println("丁上桥了!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
结果:
甲上桥了!
乙上桥了!
丙上桥了!
甲已经离开桥了!
丁上桥了!
解释:虽然线程池里面有甲乙丙丁四个线程准备过桥Semaphore,但是Semaphore只能让三个人过桥,所以,甲没离开桥上时,丁是无法上桥的,所以甲离开之后,丁就可以上桥了。
---------------------------该源码为jdk1.7版本的
网友评论