美文网首页
23.Lock与Condition——互斥锁、公平锁、非公平锁

23.Lock与Condition——互斥锁、公平锁、非公平锁

作者: 段段小胖砸 | 来源:发表于2021-10-25 16:00 被阅读0次

    1.锁的可重入性

    “可重入锁”是指当一个线程调用 object.lock()获取到锁,进入临界区后,再次调用object.lock(),仍然可以获取到该锁。显然,通常的锁都要设计成可重入的,否则就会发生死锁。

    在一个synchronized方法method1()里面调用另外一个synchronized方法method2()。如果synchronized关键字不可重入,那么在method2()处就会发生阻塞,这显然不可行。

    public void synchronized method1() { 
        // ... method2(); 
        // ...
    }
    public void synchronized method2() { 
        // ... 
    }
    

    Lock接口:

        void lock();//获取锁
        void lockInterruptibly() throws InterruptedException; //获取锁,可以被其他线程中断
        boolean tryLock();//获取锁,非阻塞,基于非公平锁tryAcquire,操作不成功直接返回false
        boolean tryLock(long var1, TimeUnit var3) throws InterruptedException; //获取锁。非阻塞,有时间限制
        void unlock();  //释放锁
        Condition newCondition();  
    

    ReentrantLock本身没有代码逻辑,实现都在其内部类Sync中:

    public class ReentrantLock implements Lock, Serializable {
        private final ReentrantLock.Sync sync;
       public void lock() {
            this.sync.acquire(1);
        }
       public void unlock() {
            this.sync.release(1);
        }
    }
    

    可重入锁:synchronized、ReentrantLock

    2.公平锁vs非公平锁

    Sync是一个抽象类,它有两个子类FairSync与NonfairSync,分别对应公平锁和非公平锁。
    ReentrantLock构造方法可以看出,会传入一个布尔类型的变量fair指定锁是公平的还是非公平的,默认为非公平的。其实是为了提高效率,减少线程切换(重入锁再次获取的时候非公平锁可以减少线切换)。

    public ReentrantLock() {
            this.sync = new ReentrantLock.NonfairSync();
        }
    
        public ReentrantLock(boolean fair) {
            this.sync = (ReentrantLock.Sync)(fair ? new ReentrantLock.FairSync() : new ReentrantLock.NonfairSync());
        }
    

    ReentrantLock的类结构*


    其中:
    ReentrantLock implements Lock, Serializable继承并实现了Lock接口的lock、tryLock、unlock、newCondition方法,但是方法的具体实现是在其静态内部类:ReentrantLock.Sync,abstract static class Sync extends AbstractQueuedSynchronizer和Sync的两个子类NonfairSync static final class NonfairSync extends ReentrantLock.Syncstatic final class FairSync extends ReentrantLock.Sync实现

    3.锁实现的基本原理

    Sync的父类AbstractQueuedSynchronizer经常被称作队列同步器(AQS),这个类非常重要,该类的父类是AbstractOwnableSynchronizer。
    此处的锁具备synchronized功能,即可以阻塞一个线程。为了实现一把具有阻塞或唤醒功能的锁,需要几个核心要素:

      1. 需要一个state变量,标记该锁状态,0表示当前对象没有被线程独占。state变量至少有两个值:0、1。对state变量的操作,用CAS保证线程安全。
      1. 需要记录当前是哪个线程持有锁。
      1. 需要底层支持对一个线程进行阻塞或唤醒操作。
      1. 需要有一个队列维护所有阻塞的线程。这个队列也必须是线程安全的无锁队列,也需要使用CAS

    具体的实现以ReentrantLock为例。

    要素1和2,在上面两个类中有对应的体现

    public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { 
        // ... 
        private transient Thread exclusiveOwnerThread; // 记录持有锁的线程 
    }
    public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { 
        private volatile int state; // 记录锁的状态,通过CAS修改state的值。 
        // ... 
    }
    

    state取值不仅可以是0、1,还可以大于1,就是为了支持锁的可重入性。

    要素3:Unsafe类提供了阻塞或唤醒线程的一对操作原语,也就是park/unpark。Synchronizer的唤醒是唤醒所有线程。

    public native void unpark(Object thread); 
    public native void park(boolean isAbsolute, long time);
    

    有一个LockSupport的工具类,对这一对原语做了简单封装:

    public class LockSupport { 
        // ...
        private static final Unsafe U = Unsafe.getUnsafe(); 
        public static void park() { 
            U.park(false, 0L); 
        }
        public static void unpark(Thread thread) { 
            if (thread != null) 
                U.unpark(thread); 
    } }
    

    在当前线程中调用park(),该线程就会被阻塞;在另外一个线程中,调用unpark(Threadthread),传入一个被阻塞的线程,就可以唤醒阻塞在park()地方的线程。LockSupport.park()/unpark()。这对函数非常关键,Concurrent包中Lock的实现即依赖这一对操作原语。
    Unsafe类提供了阻塞或唤醒线程的一对操作原语,也就是park/unpark。个LockSupport的工具类,对这一对原语做了简单封装

    要素4:在AQS中利用双向链表和CAS实现了一个阻塞队列

    public abstract class AbstractQueuedSynchronizer {
         // ... 
        static final class Node { 
            volatile Thread thread; // 每个Node对应一个被阻塞的线程 
            volatile Node prev; 
            volatile Node next; 
            // ... 
        }
        private transient volatile Node head; 
        private transient volatile Node tail; 
        // ... 
    }
    

    阻塞队列是整个AQS核心中的核心。
    初始的时候,head=tail=NULL;然后,在往队列中加入阻塞的线程时,会新建一个空的Node,让head和tail都指向这个空Node;之后,再有线程就在node后面加入被阻塞的线程对象。所以,当head=tail的时候,说明队列为空。

    4.公平与非公平的lock()实现差异

    ReentrantLock的lock方法:

    public void lock() {
            this.sync.acquire(1);
        }
      public final void acquire(int arg) {
            if (!this.tryAcquire(arg) && this.acquireQueued(this.addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg)) {
                selfInterrupt();
            }
        }
    

    NonfairSync:未判断当前线程位置直接去获取锁,没排队是非公平的

    image.png

    FairSync: hasQueuedPredecessors()判断位置是否在队列头部,是排队的是公平的

    image.png

    5.阻塞队列与唤醒机制

    下面进入锁的最为关键的部分,即acquireQueued(...)
    在AbstractQueuedSynchronizer类中

    public final void acquire(int arg) {
            if (!this.tryAcquire(arg) && this.acquireQueued(this.addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg)) {
                selfInterrupt();
            }
        }
    

    tryAcquire先尝试获取一下,如果获取不不到然后继续往下进行。
    先说addWaiter(...)方法,就是为当前线程生成一个Node,然后把Node放入双向链表的尾部。要注意的是,这只是把Thread对象放入了一个队列中而已,线程本身并未阻塞。

    private AbstractQueuedSynchronizer.Node addWaiter(AbstractQueuedSynchronizer.Node mode) {
            AbstractQueuedSynchronizer.Node node = new AbstractQueuedSynchronizer.Node(mode);
            AbstractQueuedSynchronizer.Node oldTail;
            do {
                while(true) {
                    oldTail = this.tail;
                    if (oldTail != null) {
                        node.setPrevRelaxed(oldTail);
                        break;
                    }
                    this.initializeSyncQueue();
                }
            } while(!this.compareAndSetTail(oldTail, node));
            oldTail.next = node;
            return node;
        }
    

    创建节点,尝试将节点追加到队列尾部。获取tail节点,将tail节点的next设置为当前节点。

    如果tail不存在,就初始化队列。

    在addWaiter(...)方法把Thread对象加入阻塞队列之后的工作就要靠acquireQueued(...)方法完成。线程一旦进入acquireQueued(...)就会被无限期阻塞,即使有其他线程调用interrupt()方法也不能将其唤醒,除非有其他线程释放了锁,并且该线程拿到了锁,才会从accquireQueued(...)返回。

    进入acquireQueued(...),该线程被阻塞。在该方法返回的一刻,就是拿到锁的那一刻,也就是被唤醒的那一刻,此时会删除队列的第一个元素(head指针前移1个节点)。

    final boolean acquireQueued(AbstractQueuedSynchronizer.Node node, int arg) {
            boolean interrupted = false;
            try {
                while(true) {
                    AbstractQueuedSynchronizer.Node p = node.predecessor();
                    if (p == this.head && this.tryAcquire(arg)) {
                        this.setHead(node);
                        p.next = null;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node)) {
                        //获取到锁之后执行中断响应操作
                        interrupted |= this.parkAndCheckInterrupt();
                    }
                }
            } catch (Throwable var5) {
                this.cancelAcquire(node);
                if (interrupted) {
                    selfInterrupt();
                }
                throw var5;
    } }
    

    首先,acquireQueued(...)方法有一个返回值,表示什么意思呢?虽然该方法不会中断响应(也就是,在队列中阻塞的时候,有别的线程唤醒此线程,去中断它,并不会得到响应),但它会记录被阻塞期间有没有其他线程向它发送过中断信号。如果有,则该方法会返回true,在parkAndCheckInterrupt方法中进行响应;否则,返回false。

    基于这个返回值,才有了下面的代码:

    static void selfInterrupt() {
            Thread.currentThread().interrupt();
      }
    

    当 acquireQueued(...)返回 true 时,会调用 selfInterrupt(),自己给自己发送中断信号,也就是自己把自己的中断标志位设为true。之所以要这么做,是因为自己在阻塞期间,收到其他线程中断信号没有及时响应,现在要进行补偿。这样一来,如果该线程在lock代码块内部有调用sleep()之类的阻塞方法,就可以抛出异常,响应该中断信号。

    阻塞就发生在下面这个方法中:

    private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

    线程调用 park()方法,自己把自己阻塞起来,直到被其他线程唤醒,该方法返回。

    park()方法返回有两种情况。

    1. 其他线程调用了unpark(Thread t)。
    2. 其他线程调用了t.interrupt()。这里要注意的是,lock()不能响应中断,但LockSupport.park()会响应中断。
    • 也正因为LockSupport.park()可能被中断唤醒,acquireQueued(...)方法才写了一个for死循环。唤醒之后,如果发现自己排在队列头部,就去拿锁;如果拿不到锁,则再次自己阻塞自己。不断重复此过程,直到拿到锁。
    • 被唤醒之后,通过Thread.interrupted()来判断是否被中断唤醒。如果是情况1,会返回false;如果是情况2,则返回true

    6.unlock()实现分析

    unlock不区分公平还是非公平。

    public void unlock() {
        this.sync.release(1);
    }
    

    当前线程要释放锁,先调用tryRelease(arg)方法,如果返回true,则取出head,让head获取锁。

    public final boolean release(int arg) {
        if (this.tryRelease(arg)) {
            AbstractQueuedSynchronizer.Node h = this.head;
            if (h != null && h.waitStatus != 0) {
                this.unparkSuccessor(h);
            }
            return true;
        } else {
            return false;
       }}
    

    对于tryRelease方法:
    首先计算当前线程释放锁后的state值。如果不为0返回空,如果等于零设置排他线程为null,返回true
    如果当前线程不是排他线程,则抛异常,因为只有获取锁的线程才可以进行释放锁的操作。
    此时设置state,没有使用CAS,因为是单线程操作。

    @ReservedStackAccess
    protected final boolean tryRelease(int releases) {
        int c = this.getState() - releases;
        if (Thread.currentThread() != this.getExclusiveOwnerThread()) {
            throw new IllegalMonitorStateException();
        } else {
            boolean free = false;
            if (c == 0) {
                free = true;
                this.setExclusiveOwnerThread((Thread)null);
            }
            this.setState(c);
            return free;
        }
    }
    

    release()里面做了两件事:tryRelease(...)方法释放锁;unparkSuccessor(...)方法唤醒队列中的后继者。

    相关文章

      网友评论

          本文标题:23.Lock与Condition——互斥锁、公平锁、非公平锁

          本文链接:https://www.haomeiwen.com/subject/cypjaltx.html