背景
与ReentrantLock
的独占(Exclusive)不同的是 , Semaphore
是共享类型的(Share). 也就是当资源充足的时候 , 允许多个线程获取相应资源同时执行. 可以理解用来解决哲学家用餐问题.
其原理也就是通过初始化state
变量来控制资源数量 , 如果当前state<0
则代表资源用尽 , 需要阻塞等待 , 直到被其他线程释放资源后唤醒.
构造函数
与ReentrantLock相同 , 同样可以有公平锁(FairSync)与非公平锁(NonfairSync) , 同时 , permits
代表当前拥有的资源总数 , Sync会将permits
赋值给state
变量.
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
acquire申请资源
- 调用
acquire
申请Shared
类型的资源
public void acquire(int permits) throws InterruptedException {
// 如果permits小于0的话 , 则认为参数不合法
if (permits < 0) throw new IllegalArgumentException();
// 开始申请资源
sync.acquireSharedInterruptibly(permits);
}
- 尝试调用
tryAcquireShared
来申请资源 , 如果小于0 , 则调用``
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 尝试获取Shared资源
if (tryAcquireShared(arg) < 0)
// 如果当前剩余的可用资源小于0的话 , 则开始等待
doAcquireSharedInterruptibly(arg);
}
- 尝试获取共享数据 , 即获得当前剩余资源总数
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 获取当前可用资源数量
int available = getState();
// 得到剩余资源总数
int remaining = available - acquires;
// 如果剩余资源小于0 , 或者CAS设置失败的话 , 则返回剩余资源数量
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
- 如果剩余资源大于等于0的话 , 则认为申请成功 , 可以继续执行
- 如果剩余资源小于0的话, 则会调用
doAcquireSharedInterruptibly
插入队列中
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 添加Shared类型的节点到队列中
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
// 从队列尾部往前头部遍历
// 获取当前节点的前驱节点
final Node p = node.predecessor();
// 如果遍历到了头节点
if (p == head) {
// 当前为头节点, 尝试获取相关资源
int r = tryAcquireShared(arg);
if (r >= 0) {
// 如果获取大于0的话 , 则将头节点设置成Propagate状态
setHeadAndPropagate(node, r);
// 设置null
p.next = null; // help GC
return;
}
}
// shouldParkAfterFailedAcquire , 找到前驱节点为SINGAL的节点
// 如果找到的话 , 则park当前线程 , 并且抛出中断异常
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
// 捕捉到异常后 , 则将当前Node设置成Cancel状态
cancelAcquire(node);
throw t;
}
}
- 处理前驱节点的信号
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果前驱节点为SIGNAL的话 , 允许执行 , 返回true
return true;
if (ws > 0) {
// 如果waitStatus>0 , 代表为Cancel节点 ,则直接删除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果是waitStatus是Propagate , 则设置成SIGNAL
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
// 返回false , 不允许park
return false;
}
- 在本函数中 , 主要处理头节点的唤醒
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 将Node设置成Head节点
setHead(node);
// 如果当前剩余资源>0 , 或者head的waitStatus<0 , 或者Node的waitStatus<0
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 则得到Node.next
Node s = node.next;
if (s == null || s.isShared())
// 如果Node.next为空 , 或者为Share的话 , 则开始选择线程唤醒
doReleaseShared();
}
}
- 在
doReleaseShared
函数中 , 会unpark
头节点的线程 , 让其唤醒开始执行
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
// 自旋设置头节点Node的waitStatus
unparkSuccessor(h);
}
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
// 如果waitStatus为0 , 则设置成为PROPAGATE
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
release过程
- 调用release方法释放资源
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
- 释放资源成功返回true , 否则返回false
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
- 尝试释放资源 , 如果释放成功 , 则会开始
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 获取当前资源
int current = getState();
// 如果release是负数的话 , 则返回错误
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// 通过CAS操作设置state变量
if (compareAndSetState(current, next))
return true;
}
}
- 尝试寻找waitStatus为Singal的前驱节点唤醒
private void doReleaseShared() {
for (;;) {
Node h = head;
// 如果头节点不为空 , 并且头节点不是尾节点 , 即队列中有节点
if (h != null && h != tail) {
// 判断当前waitStatus
int ws = h.waitStatus;
// 如果当前节点为SIGNAL代表可以执行
if (ws == Node.SIGNAL) {
// 将h节点的waitStatus设置成0失败 , 则自旋设置直到成功
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
// 开始unpark前驱节点
unparkSuccessor(h);
}
// 如果waitStatus为0的话 , 则设置PROPAGATE
else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue;
}
// 如果头节点相同则break , 否则一直循环
if (h == head)
break;
}
}
网友评论