美文网首页
不怕难之ReentrantLock及其扩展

不怕难之ReentrantLock及其扩展

作者: 天外流星for | 来源:发表于2018-06-14 16:07 被阅读0次

    一、问题导读

    1.  ReentrantLock公平锁和非公平锁有什么区别

    2.  obj.wait() 和 condition.await() 有什么区别

    3.  Condition中同步队列和条件队列是如何交互的

    二、ReentrantLock实现原理

    ReentrantLock 在内部用了内部类 Sync 来管理锁,所以真正的获取锁和释放锁是由 Sync 的实现类来控制的。

    Sync 有两个实现,分别为 NonfairSync(非公平锁)和 FairSync(公平锁),默认为NonfairSync(非公平锁)

    public ReentrantLock(boolean fair) {

        sync = fair ? new FairSync() : new NonfairSync();

    }

    线程抢锁之公平锁

    static final class FairSync extends Sync {

        final void lock() {

            acquire(1);  // 请求获取锁

        }

      // 因为有可能直接就成功了呢,也就不需要进队列排队了,

      // 对于公平锁的语义就是:本来就没人持有锁,根本没必要进队列等待

        public final void acquire(int arg) {

            if (!tryAcquire(arg) &&

                 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 入同步队列

                selfInterrupt();

        }

        protected final boolean tryAcquire(int acquires) {

            final Thread current = Thread.currentThread();

            int c = getState();

            if (c == 0) { // 没有线程占用锁

                // 1. 和非公平锁相比,这里多了一个判断:是否有线程在等待           

            if (!hasQueuedPredecessors() &&

                    compareAndSetState(0, acquires)) {

                    setExclusiveOwnerThread(current);

                    return true;

                }

            }

            else if (current == getExclusiveOwnerThread()) { // 重入过程

                int nextc = c + acquires;

                if (nextc < 0)

                    throw new Error("Maximum lock count exceeded");

                setState(nextc);

                return true;

            }

            return false;

        }

    }

    线程抢锁之非公平锁

    static final class NonfairSync extends Sync {

        final void lock() {

            // 2. 和公平锁相比,这里会直接先进行一次CAS,成功就返回了       

          if (compareAndSetState(0, 1))

                setExclusiveOwnerThread(Thread.currentThread());

            else           

          acquire(1);

        }

        // AbstractQueuedSynchronizer.acquire(int arg)   父类中方法

          public final void acquire(int arg) {

            if (!tryAcquire(arg) &&

                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

                selfInterrupt();

        }

        protected final boolean tryAcquire(int acquires) {

            return nonfairTryAcquire(acquires);

        }

    }

    boolean nonfairTryAcquire(int acquires) {

        final Thread current = Thread.currentThread();

        int c = getState();

        if (c == 0) {

            if (compareAndSetState(0, acquires)) {  // 这里直接抢锁

                setExclusiveOwnerThread(current);

                return true;

            }

        }

        else if (current == getExclusiveOwnerThread()) {

            int nextc = c + acquires;

            if (nextc < 0) // overflow           

                throw new Error("Maximum lock count exceeded");

            setState(nextc);

            return true;

        }

        return false;

    }

    总结:公平锁和非公平锁只有两处不同

    非公平锁在调用 lock 后,首先就会调用 CAS 进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了。

    非公平锁在 CAS 失败后,和公平锁一样都会进入到 tryAcquire 方法,在 tryAcquire 方法中,如果发现锁这个时候被释放了(state == 0),非公平锁会直接 CAS 抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面。

    公平锁和非公平锁就这两点区别,如果这两次 CAS 都不成功,那么后面非公平锁和公平锁是一样的,都要进入到阻塞队列等待唤醒。

    相对来说,非公平锁会有更好的性能,因为它的吞吐量比较大。当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列中的线程长期处于饥饿状态。


    同步队列入队流程

    线程解锁

    // 唤醒的代码还是比较简单的,跟加锁类似

    public void unlock() {

        sync.release(1);

    }

    public final boolean release(int arg) {

        // 往后看吧 

        if (tryRelease(arg)) {

            Node h = head;

            if (h != null && h.waitStatus != 0)

                unparkSuccessor(h); // 通知后置节点

            return true;

        }

        return false;

    }

    // 回到ReentrantLock看tryRelease方法

    protected final boolean tryRelease(int releases) {

        int c = getState() - releases;

        if (Thread.currentThread() != getExclusiveOwnerThread())

            throw new IllegalMonitorStateException();

        // 是否完全释放锁   

          boolean free = false;

        // 其实就是重入的问题,如果c==0,也就是说没有嵌套锁了,可以释放了,否则还不能释放掉   

    if (c == 0) {

            free = true;

            setExclusiveOwnerThread(null);

        }

        setState(c);

        return free;

    }

    // 唤醒后继节点// 从上面调用处知道,参数node是head头结点

    private void unparkSuccessor(Node node) {

       int ws = node.waitStatus;

        // 如果head节点当前waitStatus<0, 将其修改为0 

        if (ws < 0)

            compareAndSetWaitStatus(node, ws, 0);

       // 下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待(waitStatus==1)   

        // 从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的   

    Node s = node.next;

        if (s == null || s.waitStatus > 0) {

            s = null;

            // 从后往前找,仔细看代码,不必担心中间有节点取消(waitStatus==1)的情况       

            for (Node t = tail; t != null && t != node; t = t.prev)

                if (t.waitStatus <= 0)

                    s = t;

        }

        if (s != null)

            // 唤醒线程       

         LockSupport.unpark(s.thread);

    }

    三、Condition实现原理

    同步队列与条件队列交互流程

    阻塞await实现原理

    // 首先,这个方法是可被中断的,这个方法会阻塞,直到调用 signal 方法(指 signal() 和 signalAll(),下同),或被中断

    public final void await() throws InterruptedException {

        if (Thread.interrupted())

            throw new InterruptedException();

        // 添加到 condition 的条件队列中   

          Node node = addConditionWaiter();

        // 释放锁,返回值是释放锁之前的 state 值   

       int savedState = fullyRelease(node);

        int interruptMode = 0;

        // 这里退出循环有两种情况,之后再仔细分析   

       // 1. isOnSyncQueue(node) 返回 true,即当前 node 已经转移到阻塞队列了   

       // 2. checkInterruptWhileWaiting(node) != 0 会到 break,然后退出循环,代表的是线程中断   

      while (!isOnSyncQueue(node)) {

            LockSupport.park(this);

            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)

                break;

        }

        // 被唤醒后,将进入阻塞队列,等待获取锁   

        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)

            interruptMode = REINTERRUPT;

        if (node.nextWaiter != null) // clean up if cancelled       

             unlinkCancelledWaiters();  // 清除队列中已经取消的节点

        if (interruptMode != 0)

            reportInterruptAfterWait(interruptMode);

    }

    整个await的过程如下:  

    1.将当前线程加入Condition锁队列。特别说明的是,这里不同于AQS的队列,这里进入的是Condition的FIFO队列。

    2.释放锁。这里可以看到将锁释放了,否则别的线程就无法拿到锁而发生死锁。

    3.自旋(while)挂起,直到被唤醒(signal把他重新放回到AQS的等待队列)或者超时或者CACELLED等。

    4.获取锁(acquireQueued)。并将自己从Condition的FIFO队列中释放,表明自己不再需要锁(我已经拿到锁了)。


    唤醒signal实现原理

    // 唤醒等待了最久的线程// 其实就是,将这个线程对应的 node 从条件队列转移到同步队列

       public final void signal() {

        // 调用 signal 方法的线程必须持有当前的独占锁   

        if (!isHeldExclusively())

            throw new IllegalMonitorStateException();

        Node first = firstWaiter;

        if (first != null)

            doSignal(first);  //唤醒操作

    }

    // 从条件队列队头往后遍历,找出第一个需要转移的 node// 因为前面我们说过,有些线程会取消排队,但是还在队列中

    private void doSignal(Node first) {

        do {

      if ( (firstWaiter = first.nextWaiter) == null)

                lastWaiter = null;

            // 因为 first 马上要被移到阻塞队列了,和条件队列的链接关系在这里断掉       

             first.nextWaiter = null;

        } while (!transferForSignal(first) &&

                (first = firstWaiter) != null);

    final boolean transferForSignal(Node node) {

        // CAS 如果失败,说明此 node 的 waitStatus 已不是 Node.CONDITION,说明节点已经取消,   

         // 既然已经取消,也就不需要转移了,方法返回,转移后面一个节点    // 否则,将 waitStatus 置为 0   

    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))

            return false;

        //enq(node): 自旋进入阻塞队列的队尾    // 注意,这里的返回值 p 是 node 在阻塞队列的前驱节点   

         Node p = enq(node);

        int ws = p.waitStatus;

        // ws > 0 说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程。

        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))

            // 如果前驱节点取消或者 CAS 失败,会进到这里唤醒线程,之后的操作看下一节        LockSupport.unpark(node.thread);

        return true;

    }

    整个signal过程如下

    从等待队列的队首开始,尝试对队首节点执行唤醒操作;如果节点CANCELLED,就尝试唤醒下一个节点;如果再CANCELLED则继续迭代。

    对每个节点执行唤醒操作时,将节点加入同步队列,再等待同步队列被唤醒。


    案例剖析

    public class TestCondition {

    public static void main(String[] args) {

    final ReentrantLock reentrantLock =new ReentrantLock();

            final Condition condition = reentrantLock.newCondition();

            new Thread(()-> {

                   reentrantLock.lock();

                    System.out.println(Thread.currentThread().getName() +"拿到锁了");

                    System.out.println(Thread.currentThread().getName() +"等待信号");

                    try {

                      condition.await();

                    }catch (InterruptedException e) {

                     e.printStackTrace();

                    }

                    System.out.println(Thread.currentThread().getName() +"拿到信号");

                    reentrantLock.unlock();

            }, "线程1").start();

            new Thread(()-> {

                     reentrantLock.lock();

                    System.out.println(Thread.currentThread().getName() +"拿到锁了");

                    try {

                      Thread.sleep(3000);

                    }catch (InterruptedException e) {

                     e.printStackTrace();

                    }

                    System.out.println(Thread.currentThread().getName() +"发出信号");

                    condition.signal();

                    reentrantLock.unlock();

                    System.out.println(Thread.currentThread().getName() +"unlock结束");

            }, "线程2").start();

        }

    }

    同步队列和条件队列交互过程

    AQS的同步队列,Condition的条件队列开始交互:

    1. 线程1调用reentrantLock.lock时,尝试获取锁。如果成功,则返回,从AQS的队列中移除线程;否则阻塞,保持在AQS的等待队列中。

    2. 线程1调用await方法被调用时,对应操作是被加入到Condition的等待队列中,等待signal信号;同时释放锁。

    3. 锁被释放后,会唤醒AQS队列中的头结点,所以线程2会获取到锁。

    4. 线程2调用signal方法,这个时候Condition的等待队列中只有线程1一个节点,于是它被取出来,并被加入到AQS的等待队列中。注意,这个时候,线程1 并没有被唤醒,只是被加入AQS等待队列。

    5. signal方法执行完毕,线程2调用unLock()方法,释放锁。这个时候因为AQS中只有线程1,于是,线程1被唤醒,线程1恢复执行。

    所以:

    发送signal信号只是将Condition队列中的线程加到AQS的等待队列中。只有到发送signal信号的线程调用reentrantLock.unlock()释放锁后,这些线程才会被唤醒。

    可以看到,整个协作过程是靠结点在AQS的等待队列和Condition的等待队列中来回移动实现的,Condition作为一个条件类,很好的自己维护了一个等待信号的队列,并在适时的时候将结点加入到AQS的等待队列中来实现的唤醒操作。

    总结:

    ConditionObject由AQS提供,它实现了类似wiat、notify和notifyAll类似的功能。Condition必须与一个独占锁绑定使用,在await或signal之前必须现持有独占锁。Condition队列是一个单向链表,他是公平的,按照先进先出的顺序从队列中被“唤醒”,所谓唤醒指的是完成Condition对象上的等待,被移到Sync锁等待队列中,有参与竞争锁的资格(Sync队列有公平&非公平两种模式,注意区别)。

    相关文章

      网友评论

          本文标题:不怕难之ReentrantLock及其扩展

          本文链接:https://www.haomeiwen.com/subject/qrhyeftx.html