一、Semaphore实现原理解析
1.1Semaphore实例
image.pngpackage executors;
import java.util.concurrent.Semaphore;
public class SemphoreTest {
public static void main(String[] args) {
//信号量
Semaphore windows = new Semaphore(3);
for(int i=0;i<5;i++){
new Thread(()->{
try {
windows.acquire();
System.out.println(Thread.currentThread().getName()+"--开始买票");
Thread.sleep(5000l);
System.out.println(Thread.currentThread().getName()+"--结束买票");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
windows.release();
}
},"买票者"+i).start();
}
}
}
1.2Semaphore源码解析
new Semaphore(3)创建了一个非公平的同步器,并设置同步器维护的state=3
public class Semaphore implements java.io.Serializable {
//同步器
private final Sync sync;
public Semaphore(int permits) {
//非公平同步器
sync = new NonfairSync(permits);
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits);
}
}
}
windows.acquire()分析
因为资源设置的是3也就是state=3,有三个资源可以同时被占用;前三个线程可以顺利的获取资源
每次获取资源state状态-1。
image.png
从第四个线程开始由于资源state=0 已经没有资源了所以会进行阻塞
image.png
public class Semaphore implements java.io.Serializable {
//同步器
private final Sync sync;
public Semaphore(int permits) {
//非公平同步器
sync = new NonfairSync(permits);
}
public void acquire() throws InterruptedException {
//1.调用AbstractQueuedSynchronizer.acquireSharedInterruptibly(arg)方法
sync.acquireSharedInterruptibly(1);
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
...
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
...
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //2.调回NonfairSync.tryAcquireShared减资源及state=state-1
//state<0说明没有资源了,这时候需要构建等待队列,并当前阻塞线程。
doAcquireSharedInterruptibly(arg);
}
/*
1.构建等待队列,并入队操作
2.设置前驱节点的waitState=-1
3.阻塞当前节点的线程。
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//入等待队列Node.SHARED=new Node()---用于建立初始头或共享标记
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//获取前驱节点
final Node p = node.predecessor();
//如果前驱节点是head节点,尝试获取锁
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//设置头结点并传播
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&//设置前驱节点为-1,如果前驱节点已经是-1了返回true
parkAndCheckInterrupt())//调用LockSupport.park(this);阻塞当前线程
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private Node addWaiter(Node mode) {
/*
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
*/
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//构建队列
enq(node);
return node;
}
/*
设置前驱节点为-1,如果前驱节点已经是-1了返回true
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//设置前驱节-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
}
semaphore.release();
public class Semaphore implements java.io.Serializable {
//同步器
private final Sync sync;
public Semaphore(int permits) {
//非公平同步器
sync = new NonfairSync(permits);
}
public void release() {
sync.releaseShared(1);
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
...
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
...
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//设置state状态state=state+1
doReleaseShared();//
return true;
}
return false;
}
/*
*/
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);//唤醒头结点的第二个节点
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;//头结点的第二个节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//唤醒头节的第二个节点
if (s != null)
LockSupport.unpark(s.thread);
}
}
被唤醒后出队逻辑
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
...
/*
线程唤醒后出队
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//这里如果获取锁成功
int r = tryAcquireShared(arg);
if (r >= 0) {
//设置head为当前node节点,并释放原有的head节点
//设置后,从tail开始循环遍历所有节点唤醒队列中的线程
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())//线程在这里被唤醒,唤醒后再次进入循环体
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
//释放原有head节点,让当前节点设置为head
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
}
网友评论