美文网首页闲碎
音视频开发之旅(54) - Lock、重入锁、读写锁和Condi

音视频开发之旅(54) - Lock、重入锁、读写锁和Condi

作者: yabin小站 | 来源:发表于2021-08-22 19:20 被阅读0次

    目录

    1. Lock的意义和使用
    2. 同步器AbstractQueuedSynchronizer
    3. 重入锁ReentrantLock
    4. 读写锁ReentrantReadWriteLock
    5. Condition

    一、 Lock的意义和使用

    在Java 1.5之前java程序是通过synchronized来实现锁功能的。Java1.5引入了Lock接口以及相关的实现类,提供了和synchronized类似的同步功能。相比synchronized的隐式加锁和释放。Lock需要手动操作。少了便捷性,但却可以更加灵活和高效。

    ReentrantLock底层实现依赖于特殊的CPU指令,比如发送lock指令和unlock指令,不需要用户态和内核态的切换,所以效率高(这里和volatile底层原理类似),而synchronized底层由监视器锁(monitor)是依赖于底层的操作系统的Mutex Lock需要用户态和内核态的切换,所以效率低。

    lock的使用方式和注意事项如下:

        Lock l = ...; //Lock接口的具体实现类,比如ReentrantLock
         l.lock();
         try {
           // access the resource protected by this lock
         } finally {
           l.unlock();
         }
    

    需要注意以下3点:

    1. lock和unlock必须要成对出现。否则会出现异常
    2. 在finally中释放锁,保证在获取锁之后,一定会得到释放
    3. lock不要在try中,因为如果在获取锁时发生了异常,会自动释放锁。而在finally中又会unlock一次,导致IllegalMonitorStateException

    Lock的API:

    • void Lock():获取锁,调用该方法后, 当前线程会获取锁
    • lockInterruptibly() 可中断的获取锁,在锁获取的过程中可以中断当前线程
    • tryLock(): 尝试非阻塞的获取锁,调用该方法后立即返回,true代表可以获取,否则不可以
     Lock lock = ...;
     if (lock.tryLock()) {
       try {
         // manipulate protected state
       } finally {
         lock.unlock();
       }
     } else {
       // perform alternative actions
     }
    

    Returns:
    true if the lock was acquired and false otherwise

    • tryLock(long time,TimeUnit unit) 超时获取锁,有三种可能;
      1. 当前线程在超时时间内获取锁
      2. 当前线程在超时时间内被中断
      3. 超时时间结束,返回false
    • unlock(): 释放锁
    • Condition newCondition() 获取等待通知组件,该组件与当前的锁绑定,当前线程只有获取了锁,才能调用该组件的wait(),调用后,当前线程释放锁。

    二、同步器AbstractQueuedSynchronizer

    同步器是实现同步组件的关键,锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现的细节;同步器面向的是锁的实现者,它简化了锁的实现,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层造作。锁和同步器很和的隔离了使用者和实现者所关注的领域。只有掌握了同步器的工作原理才能更深入地理解并发包中其他的并发组件

    2.1 API方法

    队列同步器AbstractQueuedSynchronizer,是用来构建锁活着其他同步组件的基础框架,它使用一个int成员变量表示同步状态,通过内置FIFO队列里完成资源获取线程的排队工作。它没有实现任何同步接口,仅仅是定义了一些同步状态的获取和释放的方法供自定义同步器组件使用

    同步器提供了如下3个状态相关的方法

    • getState(): 获取当前同步状态
    • setState(): 设置当前同步状态
    • compareAndSetState(int expect, int update):使用CAS设置状态,该方法保证状态设置的原子性。

    依次可以实现不同类型的同步组件(ReentrantLock、ReentrantWriteLock、CountDownLock等)

    自定义同步器可以重写的方法如下:

    • tryAcquire(int arg): 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后使用CAS设置同步状态
    • tryRelease(int arg): 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态
    • tryAcquireShared(int arg): 共享式获取同步状态,返回大于等于0的值,表示成功
    • tryReleaseShared(int arg):共享式释放同步状态
    • isHeldExclusively():当前同步器释放在独占模式下被线程占用,一般表示释放被当前线程锁独占。

    下面我们来看个官方提供的一个一个同步器的实现

    class Mutex implements Lock, java.io.Serializable {
     
        // Our internal helper class
         private static class Sync extends AbstractQueuedSynchronizer {
           // 获取当前线程是否处于locked状态
           protected boolean isHeldExclusively() {
             return getState() == 1;
           }
    
           // 尝试请求获取,通过CAS的方式保证原子性
           public boolean tryAcquire(int acquires) {
             assert acquires == 1; // Otherwise unused
             if (compareAndSetState(0, 1)) {
               setExclusiveOwnerThread(Thread.currentThread());
               return true;
             }
             return false;
           }
    
           // 尝试释放锁,把state设置为0
           protected boolean tryRelease(int releases) {
             assert releases == 1; // Otherwise unused
             if (getState() == 0) throw new IllegalMonitorStateException();
             setExclusiveOwnerThread(null);
             setState(0);
             return true;
           }
     
           // Provides a Condition
           Condition newCondition() { return new ConditionObject(); }
     
           // Deserializes properly
           private void readObject(ObjectInputStream s)
               throws IOException, ClassNotFoundException {
             s.defaultReadObject();
             setState(0); // reset to unlocked state
           }
         }
    

    同步器内部实现的方法

    • acquire(int arg): 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则会进入同步队列等待,该方法会调用重写的tryAcquire方法
    • acquireInterruptibly(int arg): 在acquire(int arg)基础上增加了可被中断的功能
    • tryAcquireNanos(int arg, long nanos): 在acquireInterruptibly(int arg)的基础上增加了超时限制
    • acquireShared(int arg): 与独占式的获取同步状态的区别在于,同一时刻可以有多个线程获取同步状态
    • acquireSharedInterruptibly(int arg):在acquireShared(int arg)基础上增加了可被中断的功能
    • tryAcquireSharedNanos(int arg, long nanos): 在acquireSharedInterruptibly(int arg)的基础上增加了超时限制
    • release(int arg): 独占式释放同步状态,该方法会在释放同步状态后,将同步队列的第一个节点包含的线程唤醒
    • releaseShare(int arg) : 共享式释放同步状态
    • Collection<Thread> getQueuedThread(): 获取等待在同步队列的线程集合

    同步器的实现

    同步器依赖内部同步队列(一个FIFO双向队列)来完成同步状态的管理。主要包括 同步队列、独占式同步状态的获取和释放、共享式同步状态获取与释放、尝试获取同步状态等同步器的核心数据结构和模版方法
    同步队列中的节点(Node)用来保存获取同步状态失败的线程的引用、等待状态以及前驱和后继节点

    Node如下

    static final class Node {
          
            static final Node SHARED = new Node();
           
            static final Node EXCLUSIVE = null;
    
           
            static final int CANCELLED =  1;
           
            static final int SIGNAL    = -1;
          
            static final int CONDITION = -2;
           
            static final int PROPAGATE = -3;
    
            /**
             * Status field, taking on only the values:
             *   SIGNAL:     后继节点处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,是后继节点运行
             *
             *   CANCELLED:  在同步队列中等待的线程由于超时或者被中断,需要从同步队列中取消等待
             *
             *   CONDITION:  节点在等待队列中,节点线程等待在Condition上,当其他线程对Dondition调用了signal()方法后,该节点将会从等待队列中移到同步队列中,就到对同步状态的获取。
             *
             *   PROPAGATE:  下一次共享式同步状态获取将会无条件传播下去 ??这个没理解
             */
            volatile int waitStatus;
    
    
            volatile Node prev;
    
    
            volatile Node next;
    
    
            volatile Thread thread;
    
    
            Node nextWaiter;
    
     
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    
    
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            Node() {    
            }
    
            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }
    

    下面通过《Java并发编程的艺术》中几张图来说明加入节点和获取节点的方式


    三、重入锁 ReentrantLock

    重入锁ReentrantLock支持一个线程对资源的重复枷锁,该线程自己不会阻塞自己。
    常用的使用方式如下

    class X {
       private final ReentrantLock lock = new ReentrantLock();
    
       public void m() {
         lock.lock();  // block until condition holds
         try {
           // ... method body
         } finally {
           lock.unlock()
         }
       }
     }
    

    synchronized也是支持重入的。它是通过MarkWord来等进行判断。而ReentrantLock又是如何实现的呐?要实现该特性,需要解决两个问题

    1. 线程再次获得锁:锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,再次成功获取。
    2. 锁的最终释放:线程重复n次获取了锁,随后在第n次释放了该锁后,其他线程能够获取该锁,这就需要一个计算器。
      ReentrantLock还支持设置公平锁和非公平锁(默认 非公平锁,效率会更高)
       //默认是构造 非公平锁  
       public ReentrantLock() {
            sync = new NonfairSync();
        }
        //可以通过fair来设定创建 公平/非公平锁
        public ReentrantLock(boolean fair) {
            sync = fair ? new FairSync() : new NonfairSync();
        }
    

    三、读写锁 ReentrantReadWriteLock

    ReentrantLock是排他锁,即同一时刻只允许一个线程进行访问,而ReentrantReadWriteLock读写锁可以允许同一时刻多个读线程访问,但在写线程访问时,所有的多线程和其他的写线程都会被阻塞;如果存在读锁,正在持有中,则写锁不能被获取,要等待当前激活的读锁释放。通过维护一个读锁和一个写锁,分离读和写,使得并发性有很大的提升。

    下面我们看下官方展示的使用方式

     class RWDictionary {
       private final Map<String, Data> m = new TreeMap<String, Data>();
       private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
       private final Lock r = rwl.readLock();
       private final Lock w = rwl.writeLock();
    
       public Data get(String key) {
         r.lock();
         try { return m.get(key); }
         finally { r.unlock(); }
       }
       public String[] allKeys() {
         r.lock();
         try { return m.keySet().toArray(); }
         finally { r.unlock(); }
       }
       public Data put(String key, Data value) {
         w.lock();
         try { return m.put(key, value); }
         finally { w.unlock(); }
       }
       public void clear() {
         w.lock();
         try { m.clear(); }
         finally { w.unlock(); }
       }
     }
    

    读写锁的实现

    读写锁依赖同步器来实现,读写状态就是其同步器的同步状态,ReentrantLock中同步状态表示锁被一个线程重复获取获取的次数。ReentrantReadWriteLock需要在同步状态上添加多个读线程和一个写线程的状态。
    如果在一个整型变量上维护多种状态,一般使用“按位切割”方式使用这个变量。多谢锁将这个变量分成两部分,高16为表示读,低16位表示写

    如果存在读锁,或者存在写锁并且该写锁不是当前线程,返回fail,当前线程进入等着状态,下面我们看下tryAcquire的实现。

    protected final boolean tryAcquire(int acquires) {
       
                Thread current = Thread.currentThread();
                int c = getState();
                int w = exclusiveCount(c);
                if (c != 0) {
                    // 存在读锁 或者 当前获取线程不是已经获取写锁的线程
                    if (w == 0 || current != getExclusiveOwnerThread())
                        return false;
                    if (w + exclusiveCount(acquires) > MAX_COUNT)
                        throw new Error("Maximum lock count exceeded");
                    // Reentrant acquire
                    setState(c + acquires);
                    return true;
                }
                if (writerShouldBlock() ||
                    !compareAndSetState(c, c + acquires))
                    return false;
                setExclusiveOwnerThread(current);
                return true;
            }
    

    四、Condition

    获取一个Condition必须通过Lock的newCondition()方法。重用的方法如下

    • await() throw InterruptedException: 当前线程进入等待状态知道被通知或者中断,当前线程进入运行状态且从await()方法返回
    • awaitUninterruptibly(): 对中断不敏感的await
    • awaitNanos(long nanosTimeout) throw InterruptedException:在await的基础上增加了超时被中断的功能
    • signal() 唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须获取与Condition相关联的锁
    • signalAll().唤醒锁欧在等代Condition的线程
      下面我们看下官方提供的demo
       class BoundedBuffer {
         final Lock lock = new ReentrantLock();
         final Condition notFull  = lock.newCondition(); 
         final Condition notEmpty = lock.newCondition(); 
      
         final Object[] items = new Object[100];
         int putptr, takeptr, count;
      
         public void put(Object x) throws InterruptedException {
           lock.lock();
           try {
             while (count == items.length)
               notFull.await();
             items[putptr] = x;
             if (++putptr == items.length) putptr = 0;
             ++count;
             notEmpty.signal();
           } finally {
             lock.unlock();
           }
         }
      
         public Object take() throws InterruptedException {
           lock.lock();
           try {
             while (count == 0)
               notEmpty.await();
             Object x = items[takeptr];
             if (++takeptr == items.length) takeptr = 0;
             --count;
             notFull.signal();
             return x;
           } finally {
             lock.unlock();
           }
         }
       }
    

    资料

    图书《Java并发编程的艺术》
    Java并发编程-无锁CAS与Unsafe类及其并发包Atomic

    收获

    1. 了解同步器及其在Lock锁中的作用
    2. 学习重入锁、读写锁的使用和实现
    3. 学习Condition的使用

    感谢你的阅读
    下一篇我们学习Java并发容器,欢迎关注公众号“音视频开发之旅”,一起学习成长。
    欢迎交流

    相关文章

      网友评论

        本文标题:音视频开发之旅(54) - Lock、重入锁、读写锁和Condi

        本文链接:https://www.haomeiwen.com/subject/dpdhiltx.html