Semaphore信号量
最多允许N个线程同时执行,如果信号量的许可设置为1与ReentrantLock效果一致。
以下示例重点是Semaphore的基本使用,忽略CountDownLatch。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* 信号量
*
* @author Administrator
* @date 2020年10月15日
*/
public class Semaphore_test {
static int count = 0;
//并发许可为 5
static Semaphore s = new Semaphore(5); //如果许可设置为1与ReentrantLock效果一致。
static CountDownLatch c = new CountDownLatch(10); //闭锁
public static void main(String[] args) {
for(int i=0; i<10; i++) {
new Thread(()->{
try {
TimeUnit.MILLISECONDS.sleep(1); //方便模拟出count是线程不安全的状态值
} catch (InterruptedException e) {
e.printStackTrace();
}
count();
c.countDown();
}).start();
}
try {
c.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(count);
}
//count方法最多5个线程同时执行
public static void count() {
try {
s.acquire();
for(int i=0; i<1000; i++) {
count++;
}
} catch (InterruptedException e) {
}finally {
s.release();
}
}
}
信号量主要的两个方法获取许可acquire方法和释放许可release方法。还有一个有参构造方法。
public Semaphore(int permits) 构造方法,这个方法跟到最后,就是将permits许可数设置成 aqs中的state状态值。
猜想那么这个值在Semaphore中就可以理解为一个阈值。临界区执行的线程数达到阈值了就不允许线程在进入临界区执行了。
/**
* The synchronization state.
*/
private volatile int state;
acquire
acquire() 方法,一直跟到 .Semaphore .Sync #acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //获取许可
doAcquireSharedInterruptibly(arg); //将线程加入到锁池,然后阻塞线程
}
tryAcquireShared方法是aqs的模板方法,被.Semaphore .NonfairSync类重写,以下是重写方法。
此方法通过自旋+cas将state减一,然后返回剩余的许可数量。如果剩余的许可数量小于0说明没有许可了,就会调用doAcquireSharedInterruptibly方法
final int nonfairTryAcquireShared(int acquires) {
//自旋,不断的取值,不断的cas,直到将state成功减一。
for (;;) {
int available = getState(); //取出aqs的状态属性state,该值在Semaphore初始化的时候指定的阈值(许可数量)
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining)) //cas操作
return remaining;
}
}
doAcquireSharedInterruptibly方法是将线程加入到锁池,然后阻塞线程
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); //将线程加入锁池
boolean failed = true;
try {
for (;;) {
//1.取出node的前置节点
final Node p = node.predecessor();
//2.然后判断这个node的前置节点是不是aqs中的头节点,如果是头节点说明马上就到当前这个node节点了,所以需要在尝试获取一下状态(state)值(许可数量),因为在这个过程中很有可能aqs那个头节点已经release了。
if (p == head) {
int r = tryAcquireShared(arg); //3.如果获取到了许可那么就不需要将线程阻塞了
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//如果不符合第一个if判断,再判断是否需要挂起等待,如果需要阻塞线程。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
release() 方法
网友评论