Semaphore是一个计数信号量,可以维护当前访问自身的线程个数,并提供了同步机制。常用于限制可以访问某些资源的线程数量,使用场景:接口限流
如下示例代码,设置初始值为5,表示同时只能最多5个线程访问,我们在下边开启了8个线程,通过打印结果可以看出,启动之后立即有5个线程开始执行,4s之后,这5个线程release掉,其他三个线程才开始执行,可以表明Semaphore可以控制线程访问数量的结论
public class TestSemaphore {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(5);
for (int i = 0; i < 8; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"开始执行"+" time="+System.currentTimeMillis()/1000);
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
}).start();
}
}
}
打印结果:
Thread-0开始执行 time=1586484962
Thread-1开始执行 time=1586484962
Thread-2开始执行 time=1586484962
Thread-3开始执行 time=1586484962
Thread-4开始执行 time=1586484962
Thread-5开始执行 time=1586484966
Thread-7开始执行 time=1586484966
Thread-6开始执行 time=1586484966
源码
Semaphore(5)
从构造方法中可以看出,Semaphore有两种实现,公平和非公平,都是通过继承Sync实现,而Sync又是继承了AbstractQueuedSynchronizer,可以发现目前遇到这几个并发工具类都和AbstractQueuedSynchronizer有联系
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
//同样是维护了一个state值,作为计数使用
setState(permits);
}
}
protected final void setState(int newState) {
state = newState;
}
acquire()
public void acquire() throws InterruptedException {
//这个实现和CountdownLatch是一样的
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//如果中断抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//尝试获取,这个实现不同于CountdownLatch,而是在Sync中有自己的实现
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared的默认非公平实现就是nonfairTryAcquireShared方法,可以看到又是一个for无限循环,而退出循环的条件是我们需要关注的if (remaining < 0 ||compareAndSetState(available, remaining)),由于我们上边示例中设置的state=5,所以可以看到这里的操作就是拿到这个state,然后将它减1,判断是不是小于0(小于0表示满足已经有state个线程调用了acquire方法的条件),假设第一次进来,那么-1后remaining=4,判断<0不满足,然后通过CAS的方式将state值更新为remaining,然后返回remaining=4,跳出循环,如果没有更新成功就进入下一次循环。
这里有点容易混淆,当第一个线程进来的时候,返回的remaining=4,第二个线程进来,返回的remaining=3,也就是线程和remaining的对应关系为
线程 == remaining
1 ====== 4
2 ====== 3
3 ====== 2
4 ====== 1
5 ====== 0
要理解,第state个线程来的时候,返回的remaining正好是0,state+1的时候,remaining才是-1
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
我们再回到上边的方法acquireSharedInterruptibly中,由于if (tryAcquireShared(arg) < 0)不满足,所以方法直接返回了,doAcquireSharedInterruptibly没有被执行。知道第6个线程调用到acquire方法的时候,此时满足remaining=-1 <0 的条件,开始执行doAcquireSharedInterruptibly方法,其实也就是前5个线程都被放行了,到了第6个,需要进入等待状态
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//见过很多次了,ReentrantLock中一样,将线程节点加入链表并返回节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//拿到这个节点的前节点
final Node p = node.predecessor();
//如果前节点就是head,说明这个节点就是等待队列中的唯一一个
//我们假设p == head 成立
if (p == head) {
//判断state值,如果r >= 0说明能容纳的线程还没有满,如果r=0,
//那么正好说明当前线程是第state个线程,则不用休眠
int r = tryAcquireShared(arg);
if (r >= 0) {
//移除这个线程节点,并唤醒某沉睡的线程node
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//走到这里说明r >= 0不满足,说明已经有state个线程在执行中,当前
//线程是第state+1个,所以将这个线程挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
总结一下acquire方法,它的作用就是让前state个线程通过,超过state的线程休眠等待
release()
public void release() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared将state值不断的加1,我们假设某一线程在调用release方法的时候,5个线程都已经acquire过了,也就是说,此时的state=0(当第6个线程调用acquire方法的时候,由于remaining < 0的条件满足,所以不会再给state赋值,而是直接返回了remaining,也就是说state值最小为0)。那么+1之后就是1,也就是说,next可以看作记录的是有多少个线程释放了,每次release之后都会返回true,然后执行后边的doReleaseShared
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
//这里什么情况下+上一个数反而比它本身小?只有一种情况
//state值等于int型最大值,再加1就越界,返回-1,-1<current
//这是一个边界检查的处理
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
doReleaseShared的作用就是释放一个唤醒一个,这样就可以保证,在线程充足的情况下,一直有5个线程在运行
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
网友评论