一、前言
问题引入
@Controller
public class TradeController {
@Autowired
private TradeService tradeService;
@RequestMapping("/order")
public String order() {
tradeService.decStock();
return "success";
}
}
@Service
public class TradeService {
Logger logger = Logger.getLogger(TradeService.class);
@Autowired
JdbcTemplate jdbcTemplate;
/**
* 扣减库存
* @return
*/
public String decStock() {
Integer stock = jdbcTemplate.queryForObject("select stock from goods_stock where id = 1", Integer.class);
if (stock <= 0) {
logger.info("库存不足,下单失败!");
return "库存不足,下单失败!";
}
stock--;
jdbcTemplate.update("update goods_stock set stock = ? where id = 1", stock);
logger.info("下单成功,当前剩余库存:" + stock);
return "下单成功,当前剩余库存:" + stock;
}
}
如上代码,用Jmeter模拟30个请求同时下单,结果30个请求都下单成功,产生了超卖问题。
自定义AQS解决
下面实现自定义一个同步器来实现自定义锁
/**
* 2021/7/1
* 自定义AQS实现
*/
public class MyLock {
private volatile int state = 0;
private Thread lockHolder;
// 要用线程安全的队列作为等待队列,基于CAS实现
// linkedBlockedQueue基于AQS实现,不能用
private ConcurrentLinkedDeque<Thread> waiters = new ConcurrentLinkedDeque<>();
public int getState() {
return state;
}
public void setState(int state) {
this.state = state;
}
public Thread getLockHolder() {
return lockHolder;
}
public void setLockHolder(Thread lockHolder) {
this.lockHolder = lockHolder;
}
public void lock() {
Thread currentThread = Thread.currentThread();
if (acquire()) {
return;
}
waiters.add(currentThread);
// 自旋
for (; ; ) {
// 队列里第一个线程才能抢锁
if (currentThread == waiters.peek() && acquire()) {
// 队列头线程拿到锁 踢出等待队列
waiters.poll();
return;
}
// 阻塞当前线程 放弃CPU使用权
LockSupport.park();
}
}
public void unLock() {
if (Thread.currentThread() != lockHolder) {
throw new RuntimeException("lockHolder is not current thread");
}
if (compareAndSwapState(getState(), 0)) {
setLockHolder(null);
// 唤醒队列里第一个线程
Thread first = waiters.peek();
if (first != null) {
LockSupport.unpark(first);
}
}
}
// 是否能加锁成功
private boolean acquire() {
Thread currentThread = Thread.currentThread();
if (getState() == 0) { // 同步器尚未被持有
// 没人排队/自己是队列头,才能去尝试原子操作改变state
if ((waiters.size() == 0 || currentThread == waiters.peek()) && compareAndSwapState(0, 1)) {
setLockHolder(currentThread);
return true;
}
}
return false;
}
// 利用Unsafe类实现原子操作改变值
public final boolean compareAndSwapState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
private static final Unsafe unsafe = reflectGetUnsafe();
// 偏移量
private static final long stateOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset(MyLock.class.getDeclaredField("state"));
} catch (Exception e) {
throw new Error();
}
}
// 反射获取Unsafe类
private static Unsafe reflectGetUnsafe() {
try {
Field field = Unsafe.class.getDeclaredField("theUnsafe");
field.setAccessible(true);
return (Unsafe) field.get(null);
} catch (Exception e) {
return null;
}
}
}
public class TradeService {
Logger logger = Logger.getLogger(TradeService.class);
@Autowired
JdbcTemplate jdbcTemplate;
// 单例 创建锁对象
MyLock myLock = new MyLock();
public String decStock() {
myLock.lock(); // 加锁
Integer stock = jdbcTemplate.queryForObject("select stock from goods_stock where id = 1", Integer.class);
if (stock <= 0) {
logger.info("库存不足,下单失败!");
myLock.unLock(); // 业务失败 释放锁
return "库存不足,下单失败!";
}
stock--;
jdbcTemplate.update("update goods_stock set stock = ? where id = 1", stock);
logger.info("下单成功,当前剩余库存:" + stock);
myLock.unLock(); // 业务成功 释放锁
return "下单成功,当前剩余库存:" + stock;
}
}
二、概述
抽象同步框架,可以用来实现一个依赖状态的同步器
1.可以实现独占与共享两种模式:
-
独占:互斥,资源只能同时被一个线程占有,如ReentrantLock、Mutex
-
共享:资源不互斥,可以被多个线程占有,如Semaphre、CountDownLatch
一般来说同时只会实现一种模式,但也有ReentrantReadWriteLock同时实现独占和共享两种方式
2.核心属性:
-
state:同步器的状态,即共享资源。访问方式为:getState()、setState()、compareAndSetState()
-
exclusiveOwnerThread:当前持有共享资源的线程
-
head:同步队列头
-
tail:同步队列尾
3.特性:
-
可中断
-
可重入
4.自定义AQS
一般自定义同步器的时候,只需要自定义共享资源的获取和释放方式。至于等待队列的维护,AQS已经定义好了,不需要重写。所以主要重写以下几种方法:
-
isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
-
tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
-
tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
-
tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
-
tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
[图片上传失败...(image-ed6b0e-1649076723271)]
三、源码
waitStatus表示Node节点的等待状态
waitStatus | 判断结果 | 说明 |
---|---|---|
0 | 初始化状态 | 该节点尚未被初始化完成 |
1 | 取消状态(CANCELLED) | 说明该线程中断或者等待超时,需要移除该线程,进入该状态后节点不会变化 |
-1 | 有效状态(SIGNAL) | 下一个节点等着自己唤醒。节点入队会把前继节点状态更新为SIGNAL |
-2 | 有效状态(CONDITION) | 结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。 |
-3 | 有效状态(PROPAGATE) | 共享模式下,自己不仅会唤醒后继节点,同时也可能唤醒后继节点的后继节点 |
AQS中阻塞队列采用双向链表结构,使用prev、next连接;而条件队列采用单向链表,采用nextWaiter连接
nextWaiter状态标志 | 说明 |
---|---|
SHARED(共享模式) = new Node() | 立即唤醒下一个节点 |
EXCLUSIVE(独占模式) = null | 等待当前线程执行完再唤醒 |
其他非空值 | 根据条件决定怎么唤醒下一个节点 |
1.acquire()
acquire()是获取共享资源的顶级入口,获取到资源则直接返回,否则入等待队列,直到获取到共享资源,这个过程忽略中断的影响(如果需要可中断,可以调用acquireInterruptibly())。获取到资源之后就可以执行临界区的代码了
public final void acquire(int arg) {
// 尝试获取共享资源
if (!tryAcquire(arg) &&
//
acquireQueued(
// 加入等待队列尾部
addWaiter(Node.EXCLUSIVE), arg)
)
// 如果在acquireQueued()被中断过 这里自己补一个中断
selfInterrupt();
}
1.1tryAcquire()
AQS中,这个方法不能被直接调用,需要子类重写。这里不定义为抽象方法的好处在于,独占模式下只需重写tryAcquire(),共享模式写重写tryAcquireShared(),如果定义为抽象方法则都需要实现。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
以ReentrantLock为例,非公平锁中中,会直接CAS尝试获取共享资源,公平锁中,会先检查队列中有没有正在等待的线程,才去获取共享资源
1.2addWaiter()
如果tryAcquire()加锁失败,会addWaiter(Node.EXCLUSIVE), arg),创建一个独占模式的节点加入到队列尾部
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 尝试快速入队
// 这里直接进行了一次CAS加到尾部的尝试,失败才去自旋 为什么要这样呢?直接调用enq(node)效果似乎也一样?
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 自旋 空队列则CAS地初始化队列,队列有节点就把节点挂到队列尾部
enq(node);
return node;
}
1.3acquireQueued()
把当前节点加入队列尾部之后,acquireQueued()
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取自己的前驱节点
final Node p = node.predecessor();
// 如果前驱节点是头结点(正持有资源的节点) 自己是第二个节点,那么可能有机会获得资源
// 尝试获取资源(非公平锁直接获取,公平锁会先检查队列中有没有线程在等待再获取)
// 第一次进来就直接看能不能获取资源 也可能是等待后,被唤醒(前驱节点释放资源/中断)循环走到这里
if (p == head && tryAcquire(arg)) {
// 获取到资源 把当前节点设为头结点,前驱节点置空,保存的线程置空(头结点没必要保存线程了)
setHead(node);
// 去掉之前的头节点
p.next = null; // help GC
failed = false;
// 返回等待过程中有没有被中断过
return interrupted;
}
// 如果自己不是第二个节点 或者自己虽然是第二个节点,但是由于非公平锁,新来的线程可以不入队列,直接获取锁,所以这里可能被其他线程抢先了
// 检查是否可以休息,找到可以安心休息的地方。将前驱节点waitStatus改为-1,即后续来唤醒自己
if (shouldParkAfterFailedAcquire(p, node) &&
// park 阻塞等待
// 检查中断标志 如果是被中断了而不是被unPark()
// 后面tryAcquire方法()获取到资源之后会返回中断标志,acquire()里自行产生一个中断标志
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
尝试获取资源失败后,检查自己是否可以安心休息,如果不能就找到一个可以安心休息的地方
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 前驱节点的状态已经是SIGNAL,到时候会来唤醒自己,所以可以安心休息
if (ws == Node.SIGNAL)
return true;
// 前驱节点状态是被取消
if (ws > 0) {
do {
// 一直往前找没有取消的节点
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 把自己挂到它后面
pred.next = node;
} else {
// 走到这里意味着状态是0/PROPAGATE 则把前驱节点状态置为SIGNAL
// 这里就算成功了,也不会直接返回true开始休息,而是再一轮尝试获取资源,获取不到再park
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
// 等待
LockSupport.park(this);
// 被唤醒 返回自己是不是因为中断被唤醒 中断信号能打断一个等待中的线程,终止等待
return Thread.interrupted();
}
总结:如果当前是第二个节点,就去尝试获取资源,如果成功了直接返回。否则的话,寻找一个合适的休息点,开始等待,直到被前驱节点unPark()或被中断,才继续判断自己是否是第二个节点,去尝试获取资源。这个方法会返回等待过程中是否被中断过,后续用来自行产生一个中断。这也是不响应中断的核心,等待过程不可中断,只有获取到资源之后,才可以被中断
1.4selfInterrupt()
自行产生一个中断
1.5总结
1.先尝试获取资源
2.获取不到则把当前线程加入队列尾,标记独占模式
3.检查自己是否为第二个节点,是则尝试获取锁,不是则进入等待,等轮到自己了,前驱节点会unPark自己,自己再去尝试获取资源。如果被中断过,则返回出去
4.获取资源成功,如果被中断过,则产生一个中断
2.release()
释放资源的顶级入口,释放完资源,会唤醒队列中后一个正在等待的线程
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
// 判断头节点不为空,状态不为0
if (h != null && h.waitStatus != 0)
// 唤醒后继节点 找到队列里离head最近的一个没取消的node,unpark恢复其运行
unparkSuccessor(h);
return true;
}
return false;
}
2.1tryRelease()
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
2.2unparkSuccessor()
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 当前结点状态为负 则置为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 一般要唤醒的就是后一个节点,但是可能该节点已取消 所以要从尾结点一直往前找,找到真正有效的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
疑问:这里为什么要从后往前找?
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
// 要挂上去的节点 prev是一定能成功指向之前的尾结点的
node.prev = pred;
// CAS把尾结点设置为当前节点
if (compareAndSetTail(pred, node)) {
// 但这里可能还没有执行到 此时之前的尾结点指向null
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
3.acquireShared()
共享模式下,尝试获取指定量的资源,获取失败则进入等待队列,直到获取到资源。该过程忽略中断
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
3.1tryAcquireShared()
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
负值:失败,需要执行doAcquireShared()进入队列
0:成功,但没有剩余资源
正值:成功,还有剩余资源,其他线程还可以获取
3.2doAcquireShared()
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
// 自己成为head
setHead(node);
// 如果还有剩余量 继续唤醒下一个线程
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
3.3总结
先尝试获取资源,没有获取到则进入队列等待,直到获取到资源。与独占模式相比,自己拿到资源之后,还会继续唤醒下一个线程
4.releaseShared()
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//尝试释放资源
doReleaseShared();//唤醒后继结点
return true;
}
return false;
}
4.1tryReleaseShared()
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
4.2doReleaseShared()
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);//唤醒后继
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)// head发生变化
break;
}
}
网友评论