美文网首页
线程学习->01AQS锁

线程学习->01AQS锁

作者: 冉桓彬 | 来源:发表于2018-02-04 23:38 被阅读0次
    AQS锁的代表是AbstractQueuedSynchronizer, 为什么要创建一种与内置锁如此相似的新加锁机制呢? 在大多数情况下, 内置锁都能很好的工作, 但在功能上存在一些局限性, 例如, 无法中断一个正在等待获取锁的线程, 或者无法在请求获取一个锁时无限等待下去,

    关于AQS锁, 打算从ArrayBlockingQueue的源码进行入手分析

    // 内部持有的ReentrantLock同时又持有公平锁与非公平锁
    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
         /** Main lock guarding all access */
        final ReentrantLock lock;
        /** Condition for waiting takes */
        private final Condition notEmpty;
        /** Condition for waiting puts */
        private final Condition notFull;
    }
    

    一、ArrayBlockingQueue为入口

    1.1 ArrayBlockingQueue.put生产者
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }
    
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        // 1.当前线程通过这里先尝试获取锁;
        lock.lockInterruptibly();
        try {
            // 2.获取锁成功, 判断队列元素是否已满, 如果满了;
            while (count == items.length)
                // 3.通过Condition.await挂起当前线程, 并释放锁;
                notFull.await();
            // 4.将元素入队;
            enqueue(e);
        } finally {
            // 5.释放锁;
            lock.unlock();
        }
    }
    
    1.2 ArrayBlockingQueue.take消费者
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 1.获取锁;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                // 2.队列为空, 获取锁的线程释放锁;
                notEmpty.await();
            // 3.元素出队操作;
            return dequeue();
        } finally {
            // 4.操作完成后线程释放锁;
            lock.unlock();
        }
    }
    

    针对put和take操作中每次线程的挂起与唤醒, AQS是如何管理这些线程的?

    1. put涉及到的方法:
       (1) ReentrantLock. lockInterruptibly();     模块<二>
       (2) Condition.await();                      模块<三>
       (3) Condition.signal();                     模块<三>
       (4) ReentrantLock.unlock();                 模块<二>
    2. take涉及到的方法:
       (1) ReentrantLock. lockInterruptibly();  
       (2) Condition.await();   
       (3) Condition.signal();
       (4) ReentrantLock.unlock(); 
    

    二、ArrayBlockingQueue.put(ReentrantLock.lock与unlock)

    1. put涉及到以下几个方法:
       (1) ReentrantLock.lockInterruptibly();
       (2) Condition.await();
       (3) Condition.signal();
       (4) ReentrantLock.unlock();
    
    2.1 ReentrantLock.lockInterruptibly获取锁
    // NonfairSync: 非公平锁;
    // FairSync: 公平锁;
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    
    public void lockInterruptibly() {
        // 获取锁, 所以其实ReentrantLock锁的获取与释放操作其实是交给了Sync
        sync.acquireInterruptibly(1);
    }
    
    2.1.1 NonfairSync.acquireInterruptibly
    public final void acquireInterruptibly(int arg) throws InterruptedException {
        // 如果当前线程已经被中断, 抛出异常;
        if (Thread.interrupted())
            throw new InterruptedException();
        // 如果线程处于正常运行状态, 此时首先是尝试获取锁, 与公平锁的区别就在这里;
        if (!tryAcquire(arg))
            // 如果当前线程获取锁失败, 则会进入到这里被挂起, 直到其他线程显示唤醒;
            doAcquireInterruptibly(arg);
    }
    

    公平锁与非公共平锁的区别就在这里了, 非公平锁是首先通过tryAcquire尝试获取锁.

    2.1.2 NonfairSync.tryAcquire尝试获取锁
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        // c代表的是锁被同一个线程获取的次数, 可以看成是锁重入的问题;
        int c = getState();
        // 如果当前锁还没有被线程所持有;
        if (c == 0) {
            // 到这里其实线程并没有持有锁, 所以使用cas, 防止在这个过程中其他线程获取了锁;
            if (compareAndSetState(0, acquires)) {
                // 将current也就是当前线程赋值给exclusiveOwnerThread, 如果成功, 可以认为
                // exclusiveOwnerThread就是锁的持有线程;
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 执行到这里说明情况c != 0, c != 0其实是有两种情况:
        // 1. 锁被其他线程持有;
        // 2. 锁已经被当前线程持有;
        // 所以如果锁是被当前线程持有, 根据锁重入的特点, 直接让c++即可;
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        // 执行到这里说明当前锁被其他线程持有;
        return false;
    }
    
    2.1.3 NonfairSync.doAcquireInterruptibly获取锁失败的线程将被挂起
    // 这里有几个关键点需要注意一下:
    // 1. 线程入队时与Node绑定, 被挂起的线程对应的Node的状态值;
    // 2. Head节点的作用;
    
    // 这段代码稍微复杂一些, AQS的一个核心就是在这里, 对于获取锁失败的线程的处理, 所以打算分
    // 三段进行分析: addWaiter、addWaiter~ shouldParkAfterFailedAcquire、
    // shouldParkAfterFailedAcquire;
    private void doAcquireInterruptibly(int arg) throws InterruptedException {
        // 1. 这里先将获取锁失败的线程入队;
        // 2. 返回的这个node其实就是传进来的node;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                // 获取当前线程对应的node(为了简化, 称为node(thread)) 的前置节点, ;
                final Node p = node.predecessor();
                // 通过对addWaiter的分析可知, node(thread)会被插入到当前Node队列的尾端, 而此时
                // 如果node(thread).pre = node(head), 也就是说当前Node队列中只有node(thread)
                // 这一个有效节点, 所以再次考虑获取锁, 如果获取锁成功;
                if (p == head && tryAcquire(arg)) {
                    // 如果获取锁成功, 则将node(thread)置为node(head), 其实到这里可以明白了
                    // node(head)其实只是占位的作用;
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                // 1.如果p != head: 当前队列中有多个等待线程, 那么node(thread)进行挂起, 这个也是
                //   AQS锁的一个特点;
                // 2.如果p == head 且 tryAcquire = false也就是说node(head)再次获取锁失败;
                // 3. parkAndCheckInterrupt线程被挂起;
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    这段代码稍微复杂一些, AQS的一个核心就是在这里, 对于获取锁失败的线程的处理, 所以打算分三段进行分析:

    • 1、addWaiter: 线程入队操作.
    • 2、addWaiter~ shouldParkAfterFailedAcquire: 线程出队操作.
    • 3、shouldParkAfterFailedAcquire: 入队线程被挂起之前的准备工作.
    2.1.3.1 NonfairSync.addWaiter获取锁失败的线程进行入队操作
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        // pred != null表示当前Node队列已经有值了, 此时要做的就是将当前线程对应的Node插入
        // 到当前Node队列的尾端即可;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }  
        // 如果当前队列没有值, 此时一定要注意创建队列时head与tail节点;
        enq(node);
        return node;
    }
    
    private Node enq(final Node node) {
        // 注意下面创建Node队列的过程, 会先创建一个Node(head)节点, 当前线程对应的Node
        // 并没有插入到Node(head)的位置, 而只是插入到Node(head).next位置, 这里先不考虑原因,
        // 只记住这个过程就好;
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    线程进入Node队列的结构如下所示:
    2.1.3.2 Node出队操作
    for (;;) {
        // 获取当前节点的前置节点;
        final Node p = node.predecessor();
        // 1.这里需要考虑的一个问题是为何当前节点的前置节点是Node(Head)时才会再次尝试获取锁?
        // 2.其实结合上面的图大致可以看出Node(Head仅仅是一个占位节点), 如果当前节点的前置节点
        //   是Node(Head)也是可以看出当前节点前面已经没有等待线程了, 所以当前线程再次尝试获取锁,
        //   这个算是线程被挂起之前的最后一次挣扎了.
        if (p == head && tryAcquire(arg)) {
            // 如果当前线程成功的获取了锁, 此时将当前线程至于Node队列的Head位置, 注意此时成功获取
            // 锁的线程被放置在了Head位置
            setHead(node);
            p.next = null; // help GC
            failed = false;
            return;
        }
    }
    

    稍微总结一下, 获取锁失败的线程首先会进入到Node队列, 被添加在Node队列的tail位置, 然后在被挂起之前, 如果当前线程对应的Node位于Head.next位置, 则会再次尝试获取一次锁, 如果仍然失败, 那么此时就会尝试被挂起.
    到这里, 其实发现是有自旋锁的影子的, 获取锁失败的线程再次尝试获取锁, 失败之后才会被挂起

    2.1.3.3 NonfairSync.shouldParkAfterFailedAcquire线程挂起前的操作
    // 1. 需要注意的是每个Node的waitStatus默认值都是0;
    // 2.每一个node.waitStatus被设置为Node.SIGNAL其实是根据他的后置节点来触发的.
    // 3.一定要注意该方法是在for(;;)中被调用, 所以其实最终会触发return true的执行.
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // ws默认值为0;
        int ws = pred.waitStatus;
        // 1.首次进入这里直接跳过该if语句向下执行.
        // 2.到目前为止还是没有发现哪里有对Node node.waitStatus进行赋值的地方.
        if (ws == Node.SIGNAL)
            return true;
        // 在什么情况下ws才会>0????
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
                pred.next = node;
        } else {
            // 这里是将Node pred.waitStatus设置为Node.SIGNAL.
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

    对这个方法总结一下:

    • 1、获取锁失败的线程在进入Node队列时, waitStatus = 0.
    • 2、进入队列之后, 前置节点为Head的节点再次尝试获取锁, 如果此时获取锁失败, 则将其前置节点的waitStatus置为Node.SIGNAL.
    • 3、设置完成以后, 才会进入真正的挂起状态.
    2.1.4 NonfairSync.parkAndCheckInterrupt线程被挂起
    private final boolean parkAndCheckInterrupt() {
        // 线程被挂起;
        LockSupport.park(this);
        return Thread.interrupted();
    }
    
    2.2 ReentrantLock.unlock唤醒ReentrantLock.lock失败被挂起的线程

    ReentrantLock.lock()被挂起的线程进入沉睡状态以后, 需要ReentrantLock.unlock()进行唤醒.

    public void unlock() {
        // 释放锁;
        sync.release(1);
    }
    
    2.2.1 NonFairSync.release
    public final boolean release(int arg) {
        // 这里需要考虑锁重入的问题, 锁可以被同一个线程持有多次, 所以tryRelease会触发
        // c--操作, 如果c == 0, 表示锁完成被释放, 此时才会考虑唤醒Node队列中等待线程;
        if (tryRelease(arg)) {
            Node h = head;
            // 在前面的分析中也已经知道, 线程被挂起之前waitStatus会被设置为Node.SIGNAL = -1;
            if (h != null && h.waitStatus != 0)
                // 唤醒等待的线程, 这里传入的是head节点;
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    2.2.2 NonFairSync.unparkSuccessor
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        //1.唤醒线程时, 其实并没有改变head节点的waitStatus值.
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        //unlock唤醒的其实是Head.next节点, 所以也对应前面lock中所说的, Head节点其实是一个占位节点;
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // 仅仅只是唤醒线程, 并没有改变线程对应节点的waitStatus值.
            LockSupport.unpark(s.thread);
    }
    

    总结:

    • 1、unlock唤醒线程是从Head节点的后置节点开始进行唤醒, 因此head节点仅仅是占位的作用.
    • 2、被唤醒的线程的前置节点如果刚好是head节点, 则会替换掉head节点.
    • 3、被唤醒的线程的waitStatus值此时仍然为Node.SIGNAL, 这个值直到该节点的后置节点被唤醒时才会被修改为0.

    三、ArrayBlockingQueue.put(Condition.await与signal)

    分析之前需要考虑以下几个问题:

    • 1、await之后线程被挂起, 如何唤醒lock时被挂起的其他线程?
    • 2、await被挂起的线程是如何被管理的?

    先列出总结:

    • 1、线程触发Condition.await时, 首先会进入到Node(firstWaiter)队列中.
    • 2、然后当前线程释放锁, 如果能够完全释放锁, 则唤醒ReentrantLock.lock时获取锁失败的线程.
    • 3、唤醒Node(Head)队列中的线程之后, 当前线程进入挂起状态.
    • 4、获取ReentrantLock.lock成功的线程通过Condition.signal又会唤醒Node(firstWaiter)挂起的线程, 唤醒的方式是修改Node.waitStatus = 0, 同时将该Node从Node(firstWaiter)中移除, 然后添加到Node(Head)的尾端.
    3.1 Condition.await线程被挂起
    public final void await() throws InterruptedException {
        // 线程在被挂起时, 也会进入到一个Node队列中, 这个队列与通过lock方式的队列是两个队列.
        Node node = addConditionWaiter();
        // 执行锁的释放操作, 同时唤醒lock失败被挂起的线程.
        long savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            // 线程被挂起时waitStatus为Node.CONDITION.
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        // 线程被唤醒之后, 执行到这里尝试获取锁.
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    
    3.1.1 Condition.addConditionWaiter线程进入Node(firstWaiter)队列
    private Node addConditionWaiter() {
        Node t = lastWaiter;
        Node node = new Node(Node.CONDITION);
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }
    
    这段代码执行完成以后, Node队列如下:
    3.1.2 Condition.fullyRelease
    final int fullyRelease(Node node) {
        try {
            int savedState = getState();
            if (release(savedState))
                return savedState;
            throw new IllegalMonitorStateException();
        } catch (Throwable t) {
            node.waitStatus = Node.CANCELLED;
            throw t;
        }
    }
    
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    释放锁, 如果锁被完全释放, 唤醒Node(Head)队列.

    3.1.3 NonfairSync.isOnSyncQueue
    final boolean isOnSyncQueue(Node node) {
        // 默认情况下node.waitStatus = Node.CONDITION, 然后进入if内部被挂起.
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        return findNodeFromTail(node);
    }
    
    3.2 Condition.signal唤醒Condition.await挂起的线程
    public final void signal() {
        Node first = firstWaiter;
        if (first != null)
            // 与unlock有点儿类似, 拿到firstWaiter节点进行唤醒.
            doSignal(first);
    }
    
    3.2.1 Condition.doSignal
    private void doSignal(Node first) {
        do {
            if ((firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
          //在执行while中的逻辑之前会先将Node从Node(firstWaiter)队列中移除.
        } while (!transferForSignal(first) && (first = firstWaiter) != null);
    }
    
    3.2.2 NonFairSync.transferForSignal
    final boolean transferForSignal(Node node) {
        // 首先将waitStatus更改为Node.CONDITION.
        if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
            return false;
        // 将该Node添加到Node(Head)尾端, 而且此时Node.waitStatus被修改为了0.
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
    

    相关文章

      网友评论

          本文标题:线程学习->01AQS锁

          本文链接:https://www.haomeiwen.com/subject/enomzxtx.html