美文网首页Java-解读
AQS之独占和共享锁

AQS之独占和共享锁

作者: 有章 | 来源:发表于2018-08-19 13:09 被阅读1237次

    AQS独占和共享锁,ReentantLock为独占锁,ReentantReadWriteLock中readLock()为共享锁,writeLock()为独占锁。
    读锁与读锁可以共享
    读锁与写锁不可以共享
    写锁与写锁不可以共享
    AQS内部维护了CLH的FIFO的队列,队列中的节点为Node类型的元素

    CLH队列
    图片转载自https://www.cnblogs.com/waterystone/p/4920797.html

    Node节点结构:

    static final class Node {
        int waitStatus;
        Node prev;
        Node next;
        Node nextWaiter;
        Thread thread;
    }
    waitStatus:等待状态,具体类型如下:
        CANCELLED:值为1 取消 
        SIGNAL:值为-1  唤醒后继节点 
        CONDITION:值为-2  表示当前节点在condition队列
        PROPAGATE:值为-3 共享模式下acquireShared需要向下传播
        0:无状态,表示当前节点在队列中等待获取锁
    prev:前向节点
    next:后继节点
    nextWaiter:condition队列中的下一个等待节点
    thread:当前线程
    

    AQS中会维护head、tail节点,head节点中实际不存储线程,只保留队列头部节点的引用
    state:
    临界资源,当未获取到锁时,state=0,获取到锁则会置为state=1
    ReentantLock为可重入锁,同一线程多次获取锁时,只会相应增加state的值,释放锁时,会对state递减,当state=0时则释放锁。
    CountDownLatch中,初始化new CountDownLatch(arg),arg则为state的值,当其他线程调用countDown()时,则减少state,state=0时,则立即去唤醒后继的节点

    ReentantLock中实现了三个内部类:
    Sync:继承AQS
    FairSync:继承Sync. 实现公平锁
    NoFairSync:继承Sync 实现非公平锁,ReentrantLock默认实现

    现在已非公平锁为例进行说明:

    lock方法:
    1.CAS将state状态改为1,成功则将exclusiveOwnerThread设置为当前线程
    2.CAS失败,执行acquire(1)
    static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            /**
             * Performs lock.  Try immediate barge, backing up to normal
             * acquire on failure.
             */
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    
    //AQS中
    1.尝试获取锁,成功则返回,否则执行2
    2.执行addWaiter方法,创建独占节点加入队列尾
    3.然后自旋的获取锁,获取成功,则返回中断标示
    4.获取失败则找到安全点(前续节点为signal -1),则将自己park
    5.检查是否被中断,如果是则抛出中断异常
     public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    ReentrantLock中实现tryAcquire(arg)方法

     protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
    
     final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    addWaiter方法

    1.将当前线程封装成独占节点
    2.尾节点不为null,则CAS方法将node节点添加到队列尾部,否则执行3
    3.enq方法,自旋的将node添加到队列尾部,并返回node节点
    在添加到对列尾部时,都会先设置node的prev节点,所以判断一个节点是否在队列中,如果node节点的prev==null,则一定不在队列中。如果prev!=null,也不能说节点一定在队列中,有可能正在自旋的设置pred的next节点为node节点
    private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    
    private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    acquireQueued方法
    1.获取节点的前续节点p,如果p==head,则去尝试获取锁。成功后将node设置为head节点,返回中断标示,否则执行2

    1. shouldParkAfterFailedAcquire方法为了找到当前节点的安全点,即前续节点的waitStatus=Node.SIgnal(唤醒后续节点),然后执行3。否则自旋
    2. 当前线程park,然后返回中断标示
    3. 如果线程被中断过,则将中断标示设置为true
     final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    parkAndCheckInterrupt方法判断中断标示,为什么要判断中断标示?

        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

    非常简单,阻塞当前线程,然后返回线程的中断状态并复位中断状态
    在博客【http://www.ideabuffer.cn/2017/03/15/深入理解AbstractQueuedSynchronizer(一)】中有很详细的讲解
    ···
    注意interrupted()方法的作用,该方法是获取线程的中断状态,并复位,也就是说,如果当前线程是中断状态,则第一次调用该方法获取的是true,第二次则是false。而isInterrupted()方法则只是返回线程的中断状态,不执行复位操作。
    ···
    如果acquireQueued执行完毕,返回中断状态,回到acquire方法中,根据返回的中断状态判断是否需要执行Thread.currentThread().interrupt()。

    为什么要多做这一步呢?先判断中断状态,然后复位,如果之前线程是中断状态,再进行中断?

    这里就要介绍一下park方法了。park方法是Unsafe类中的方法,与之对应的是unpark方法。简单来说,当前线程如果执行了park方法,也就是阻塞了当前线程,反之,unpark就是唤醒一个线程。

    具体的说明请参考http://blog.csdn.net/hengyunabc/article/details/28126139

    park与wait的作用类似,但是对中断状态的处理并不相同。如果当前线程不是中断的状态,park与wait的效果是一样的;如果一个线程是中断的状态,这时执行wait方法会报java.lang.IllegalMonitorStateException,而执行park时并不会报异常,而是直接返回。

    所以,知道了这一点,就可以知道为什么要进行中断状态的复位了:

    如果当前线程是非中断状态,则在执行park时被阻塞,这是返回中断状态是false;
    如果当前线程是中断状态,则park方法不起作用,会立即返回,然后parkAndCheckInterrupt方法会获取中断的状态,也就是true,并复位;
    再次执行循环的时候,由于在前一步已经把该线程的中断状态进行了复位,则再次调用park方法时会阻塞。
    所以,这里判断线程中断的状态实际上是为了不让循环一直执行,要让当前线程进入阻塞的状态。想象一下,如果不这样判断,前一个线程在获取锁之后执行了很耗时的操作,那么岂不是要一直执行该死循环?这样就造成了CPU使用率飙升,这是很严重的后果

    cancelAcquire方法
    在acquireQueued方法的finally语句块中,如果在循环的过程中出现了异常,则执行cancelAcquire方法,用于将该节点标记为取消状态。该方法代码如下:

    private void cancelAcquire(Node node) {
            // Ignore if node doesn't exist
            if (node == null)
                return;
    
            node.thread = null;
    
            // Skip cancelled predecessors
    //跳过前续取消节点
            Node pred = node.prev;
            while (pred.waitStatus > 0)
                node.prev = pred = pred.prev;
    
            // predNext is the apparent node to unsplice. CASes below will
            // fail if not, in which case, we lost race vs another cancel
            // or signal, so no further action is necessary.
    //前续节点的后续节点
            Node predNext = pred.next;
    
            // Can use unconditional write instead of CAS here.
            // After this atomic step, other Nodes can skip past us.
            // Before, we are free of interference from other threads.
    //将当前节点设置为取消
            node.waitStatus = Node.CANCELLED;
    
            // If we are the tail, remove ourselves.
    //如果当前节点为尾节点,则将尾节点设置为当前节点的pred,并将pred的next节点设置为null,相当于将node节点从队列中移除
            if (node == tail && compareAndSetTail(node, pred)) {
                compareAndSetNext(pred, predNext, null);
            } else {
                // If successor needs signal, try to set pred's next-link
                // so it will get one. Otherwise wake it up to propagate.
                int ws;
    //
    1.如果当前节点的前续节点不为head
    2.前续节点ws不为-1则将前续节点的ws设置为-1
    3.以上2步都为true时,校验前续节点的线程不为null
    3点均满足则pred的next设置为node的后续节点,将node移除
                if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                     (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                    pred.thread != null) {
                    Node next = node.next;
                    if (next != null && next.waitStatus <= 0)
                        compareAndSetNext(pred, predNext, next);
                } else {
    //
    1.如果node的前续节点为head或者设置状态失败,则唤醒后续节点
                    unparkSuccessor(node);
                }
    
                node.next = node; // help GC
            }
        }
    

    这里直接unpark后继节点的线程,然后将next指向了自己。

    这里可能会有疑问,既然要删除节点,为什么都没有对prev进行操作,而仅仅是修改了next?【以下摘自参考博客中的内容】
    要明确的一点是,这里修改指针的操作都是CAS操作,在AQS中所有以compareAndSet开头的方法都是尝试更新,并不保证成功,图中所示的都是执行成功的情况。

    那么在执行cancelAcquire方法时,当前节点的前继节点有可能已经执行完并移除队列了(参见setHead方法),所以在这里只能用CAS来尝试更新,而就算是尝试更新,也只能更新next,不能更新prev,因为prev是不确定的,否则有可能会导致整个队列的不完整,例如把prev指向一个已经移除队列的node。

    什么时候修改prev呢?其实prev是由其他线程来修改的。回去看下shouldParkAfterFailedAcquire方法,该方法有这样一段代码:

    do {
        node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
    

    该段代码的作用就是通过prev遍历到第一个不是取消状态的node,并修改prev。
    这里为什么可以更新prev?因为shouldParkAfterFailedAcquire方法是在获取锁失败的情况下才能执行,因此进入该方法时,说明已经有线程获得锁了,并且在执行该方法时,当前节点之前的节点不会变化(因为只有当下一个节点获得锁的时候才会设置head),所以这里可以更新prev,而且不必用CAS来更新。


    unlock方法
    调用了sync的release(1)方法

    public void unlock() {
            sync.release(1);
        }
    

    AQS中的release方法
    1.尝试释放锁,释放成功则执行2
    2.如果头节点不为空且不为初始化状态,则唤醒后继节点

     public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
    ReentrantLock重写了tryRelease方法
    1.校验当前线程是否为获取锁时设置的exclusiveOwnerThread
    2.如果state==0,则将exclusiveOwnerThread设置为null,返回true,释放成功,否则3
    3.否则减少重入锁的次数,返回false
     protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    

    unparkSuccessor方法

     private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    主要功能就是要唤醒下一个线程,这里s == null || s.waitStatus > 0判断后继节点是否为空或者是否是取消状态,然后从队列尾部向前遍历找到最前面的一个waitStatus小于0的节点,至于为什么从尾部开始向前遍历,回想一下cancelAcquire方法的处理过程,cancelAcquire只是设置了next的变化,没有设置prev的变化,在最后有这样一行代码:node.next = node,如果这时执行了unparkSuccessor方法,并且向后遍历的话,就成了死循环了,所以这时只有prev是稳定的。

    ReentrantLock默认实现了非公平锁

    public ReentrantLock() {
            sync = new NonfairSync();
        }
    

    也可以设置为公平锁:

    ReentrantLock lock=new ReentrantLock(true);
    构造函数:
    public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    

    公平锁、非公平锁中实现的tryRelease方法一样,tryAcquire()方法则不同,非公平锁实现如下:

    protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    

    会在临界资源为0时判断当前是否还有前项节点存在:
    1.head!=tail
    2.h.next为空
    3.h.next !=null时s中线程不是当前线程

    public final boolean hasQueuedPredecessors() {
            // The correctness of this depends on head being initialized
            // before tail and on head.next being accurate if the current
            // thread is first in queue.
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }
    

    在AQS中,state可以标示锁的数量,也可以标示其他状态,state由子类负责去定义,AQS只负责维护state,通过state实现对临界资源的访问
    【参考博客】
    http://www.ideabuffer.cn/2017/03/15/深入理解AbstractQueuedSynchronizer(一)

    相关文章

      网友评论

        本文标题:AQS之独占和共享锁

        本文链接:https://www.haomeiwen.com/subject/gcjciftx.html