JUC--AQS

作者: 噜啦噜啦嘞_d31b | 来源:发表于2021-06-16 13:54 被阅读0次
图片.png

1. AbstractQueuedSynchronizer

抽象队列同步器,是实现同步器的基本组件。当我们需要实现一个锁的时候只需继承改抽象类,然后实现对应的方法逻辑。如果需要需要实现锁的话就重写下面方法在这里定义自己的获取锁的逻辑。

    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
在这里实现自己获取共享锁的逻辑
    }

    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
在这里实现释放共享锁的逻辑
    }

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
在这里实现获取独占锁的逻辑
    }

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
在这里实现释放独占锁的逻辑
    }

因为AQS有一个模板方法,这里是获取锁的逻辑与释放锁的模板方法,只需要定义我们自己的抢占与获取逻辑的,AQS会自动帮我们实现其他的逻辑。

获取独占锁,实现tryAcquire我们自己的抢占要求之后秩序在外面调用acquire方法就可以
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
释放独占锁,实现release我们自己的抢占要求之后秩序在外面调用acquire方法就可以
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
获取共享锁,实现tryAcquireShared我们自己的抢占要求之后秩序在外面调用acquireShared方法就可以
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
获取释放锁,实现tryReleaseShared我们自己的抢占要求之后秩序在外面调用releaseShared方法就可以
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

1.2 成员变量的解读
    private transient volatile Node head;
    private transient volatile Node tail;
    private volatile int ; 
static final class Node {
      //表示被放入队列的线程是因为抢占共享锁被放入
        static final Node SHARED = new Node();
      //表示被放入队列的线程是因为抢占独占锁被放入
        static final Node EXCLUSIVE = null;
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;

//值可赋为上面四种(1,-1,-2,-3)
 /*
分别表示
    1:当前线程被取消
  -1:当前线程需要被唤醒
  -2:线程在条件队列里面等待
  -3:释放贡献资源后需要通知其他节点
*/
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
//被阻塞线程
        volatile Thread thread;
//当前节点是因为独占还是共享
        Node nextWaiter;
}

state :表示当前运行的状态,可以在这里定义自己的逻辑,例如ReenTranLock表示当state为0代表没有被锁定,可以获得锁,当大于0表示重入的次数,读写锁用高十六位和第十六位分别表示获取读锁的线程个数,和写锁的重入次数
head与tail:在AQS内部有双向链表组成的队列存放的是Node节点,Node节点包装是没有获取到锁的线程。head和tail分别是头尾指针。

2 ReenTranLock解读

图片.png

在ReenTranLock内这个内部类继续AQS,实现类对应的方法以及抢占锁的逻辑,并且提供了抽象方法lock为后续具体实现提供接口。

 abstract static class Sync extends AbstractQueuedSynchronizer {
        abstract void lock();
//实现类非公平锁的抢占逻辑
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
//c为AQS的state为0表示没有线程抢占到
            if (c == 0) {
//通过CAS将状态值设置为抢占
                if (compareAndSetState(0, acquires)) {
//将持有锁的线程设置为当前线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
//如果state不是0,则代表锁已经被线程锁抢占,判断持有锁的线程是不是当前线程这也是可重入的体现。
            else if (current == getExclusiveOwnerThread()) {
                //如果是则将当前state的值加一放入,代表重入的次数
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

    //释放锁的逻辑
        protected final boolean tryRelease(int releases) {
          //将当前state减1,代表当前线程执行完毕
            int c = getState() - releases;
          //如果当前线程不是持有锁的线程则直接抛出异常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
          //free代表是否释放成功
            boolean free = false;
          //如果减1后为0则释放成功,因为还有可重入状态所以只有为0才是真正的释放锁
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
              //将更新的值赋值回去
            setState(c);
            return free;
        }

非公平锁的具体实现

加锁逻辑

   static final class NonfairSync extends Sync {
      
      //调用Lock方法进行抢锁
        final void lock() {
            //直接CAS尝试将state设置为1,设置成功则代表抢占到锁
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                //否则进入模板方法AQS的逻辑。
                acquire(1);
        }
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }


 /*调用acquire之后进入这里然后首先调用tryAcquire进行抢占锁,
这个方法被非公平锁实现使用走的是非
公平锁的逻辑最后调用的是Sync的这个方法nonfairTryAcquire,如果没有抢占到则进入后面逻辑*/
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

addWaiter方法
首先将当前线程包装一个Node节点的对象,然后判断pred是否为空,其实这是判断AQS的双向队列是否被初始化,如果没有的话进行enq()方法进行初始化,如果初始化过了则将node节点加入队列尾部。加入成功之后返回true执行方法acquireQueued,如果acquire抢占成功则回退进入模板方法返回当前线程是否被中断过。

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

enq方法 传入需要加入队列的节点,在方法内部进行循环,再次判断尾结点是否为空,如果没有则使用CAS将头结点设置为空节点(哨兵节点)将tail执向头,然后再第二次循环的时候将node节点加入,这样是防止多线程并发的时候出现问题。

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

acquireQueued方法
这里也是一个循环CAS,首先调用predecessor获取当前线程node节点的pre节点,如果当前节点的头节点是head节点,则进行tryAcquire抢占锁,如果抢占成功则将头结点设置为当前节点,将当前节点前驱节点的next置空帮助垃圾回收,然后返回是否被中断过,如果没有抢占成功则进入下面的If逻辑执行方法shouldParkAfterFailedAcquire

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                  //如果线程被中断则将标志位设置为true
                    interrupted = true;
            }
        } finally {
            if (failed)
              //抢占失败则执行此方法取消节点在队列中的排队
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire方法
先获取当前节点前驱节点的状态如果处于等待被唤醒状态则返回true执行后续逻辑,否则如果当前节点状态大于0则代表当前节点已经被取消所有,需要从队列之中移除,否则将当前节点的状状态置位等待被唤醒的状态,退出继续执行上面的逻辑。即就是判断当前节点的前驱节点是不是头结点,如果是则抢占锁,否为仅需进入此逻辑,这样就要被暂停了

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

parkAndCheckInterrupt方法:将当前线程使用locksupport暂停返回中断当前线程的中断标志。

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

解锁逻辑

直接调用模板方法

    public void unlock() {
        sync.release(1);
    }

release方法
tryRelease方法的逻辑上面已经说过不在赘述,如数返回true则进入if的逻辑主要看一下unparkSuccessor方法

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

unparkSuccessor方法
该方法的参数node节点,即为当前持有锁的线程的节点,获取当前节点的状态如果为小于0则使用CAS将其设置为0,获取当前节点的下一个node节点如果node为null或者waitStatus的值大于0(被取消)则从队列尾部开始遍历,遍历到最后一个非空节点,即离头节点最近的节点,唤醒后继的线程node

 private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

公平锁

与非公平锁的不同之处就是在hasQueuedPredecessors在c==0表示可以抢占锁的时候去先检查队列里面有没有等待的node如果有则不进行抢占,加入队列,如果没有则抢占

 protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

AQS条件队列 ConditionObject

内部这两变量表示条件队列

        private transient Node firstWaiter;
        private transient Node lastWaiter;

await()方法
如果当前线程被中断过则直接抛出异常,否则调用addConditionWaiter将线程包装加入队列,
fullyRelease调用改方法释放当前线程持有的所有锁。然后判断当前节点有没有在AQS的等待队列之中,如果不在则阻塞当前线程

 public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

获取到尾指针,如果当前节点不等于Null而且当前节点的状态值不等于CONDITION则将当前最后一个节点从队列中移除。创建一个新的节点。将节点加入尾部。

 private Node addConditionWaiter() {
            Node t = lastWaiter;
            if (t != null && t.waitStatus != Node.CONDITION) {    
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

signal()唤醒方法

当前线程持有锁的情况下才可以从条件队列之中唤醒线程,
  public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

doSignal方法
执行该方法相当于CAS操作使用transferForSignal方法,将节点的状态值进行改变,然后将节点加入阻塞队列中,如果当前线程的状态大于0则是被取消则唤醒更改线程

  private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

    final boolean transferForSignal(Node node) {
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

相关文章

  • JUC--AQS

    1. AbstractQueuedSynchronizer 抽象队列同步器,是实现同步器的基本组件。当我们需要实现...

  • JUC--AQS简介

    2018-10-01推荐原文 死磕Java并发 AQS,AbstractQueuedSynchronizer,即队...

网友评论

    本文标题:JUC--AQS

    本文链接:https://www.haomeiwen.com/subject/ruxtyltx.html