美文网首页
Lock接口

Lock接口

作者: 闲来也无事 | 来源:发表于2020-11-15 21:48 被阅读0次

    Java并发编程简析

    @[toc]
    并发编程在Java实际开发中,占有举足轻重的地位,在接下来的篇幅中,以java.util.concurrent包下重要的、常用的接口、实现类为切入点,逐步分析并发编程。

    下文的一些插图借鉴了《Java并发编程的艺术》!当然笔者水平有限,还请指正错误!

    1.1、Lock接口简介

    自JDK1.5开始,Java提供了Lock接口,实现锁的机制,相对于Synchronize,Lock更为轻量级、同时增加了锁的可操作性。

    public interface Lock {
        void lock();
        void lockInterruptibly() throws InterruptedException;
        boolean tryLock();
        boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
        void unlock();
        Condition newCondition();
    }
    

    Lock接口一共提供了五个方法。

    1. lock 加锁,如果当前线程无法获取到锁,则阻塞线程直至获取到锁。
    2. tryLock 尝试获取锁,若加锁成功,返回true;若加锁失败,当前线程不会阻塞,而是返回false。
    3. tryLock(long time, TimeUnit unit) 尝试获取锁,并指定超时时间。若当前锁处于空闲状态,并且当前线程未被中断,则可成功获取锁。若锁处于非空闲状态,则阻塞当前线程直至发生以下三个条件之一:
      1. 当前线程加锁成功
      2. 当前线程被中断
      3. 超时时间到
    4. unlock 解锁。

    Lock接口具有多种实现类、并且该接口作为JDK提供的标准接口,也广泛应用于一些开源框架中,掌握Lock接口及其常见实现类,对实际项目开发有重要意义。下面通过介绍其部分实现类,来解开Lock接口的神秘面纱,毕竟多线程相关的东西,总给人一种比较难的感觉。

    1.2、AbstractQueuedSynchronizer队列同步器简介

    以ReentrantLock类切入源码:

    // 部分源码 。。。
    
    // 同步器
    abstract static class Sync extends AbstractQueuedSynchronizer{
        
    }
    // 非公平锁同步器
    static final class NonfairSync extends Sync{
         
    }
    // 公平锁同步器
    static final class FairSync extends Sync{
         
    }
    

    ReentrantLock提供了NonfairSync和FairSync两个静态内部类,以实现非公平锁和公平锁,这两个类的父类继承了SyncAbstractQueuedSynchronizer类,而AbstractQueuedSynchronizer是整个Lock接口实现类的基石,即队列同步器,也就是平时所说的AQS

    AQS使用了模板方法模式,使用者需要继承并重写其中的部分方法:

    // 维护state
    protected final int getState() {
        return state;
    }
    protected final void setState(int newState) {
        state = newState;
    }
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    
    // 独占式获取、释放锁
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    
    // 共享式获取、释放锁
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
    

    以上就是自己实现一个同步器可能需要重写的方法,分为三个部分:

    1. state状态维护。
    2. 独占式获取、释放锁
    3. 共享式获取、释放锁。

    state代表同步状态(The synchronization state),也可以理解为锁。获取锁:0代表初始状态,1代表获取到锁的状态。如果锁可重入,相同线程再次获取则state为2;释放锁:将state值减1,直至为0,代表线程释放了锁。当然state只是一个变量,你可以自定义其值以代表锁不同的状态。

    除了上述的方法之外,AQS还提供了一个静态内部类Node:

    static final class Node {
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
        // 节点状态
        volatile int waitStatus;
        // 前驱节点
        volatile Node prev;
        // 后继节点
        volatile Node next;
        // 线程
        volatile Thread thread;
        // condition的下一个节点
        Node nextWaiter;
    }
    

    Node节点可以组成一个双端队列,prev记录前驱节点,next记录后继节点,而waitStatus记录了节点入队后的状态:

    名称 说明
    CANCELLED 1 表示由于超时或者中断,当前节点被取消。被取消后,当前节点的状态将不再发生任何变化。
    SIGNAL -1 表示后继节点等待获取同步状态,当前节点释放同步状态、中断、取消则唤醒其后继节点,以继续获取同步状态。
    CONDITION -2 表示当前节点在等待condition
    PROPAGATE -3 表示下一次获取共享同步状态将被无条件传递下去
    0 初始状态

    双端队列示意图:


    双端队列

    AQS持有head和tail节点,分别指向队首和队尾,队列中的各个节点分别包含了对上一个、下一个节点的引用。如:对于互斥锁,当多个线程同时获取一把锁时,只能有一个线程可以获取到该锁,其他的线程被构造成Node节点,入队并阻塞。当持有锁的线程释放锁之后,唤醒下一个节点,下一个节点可以继续获取锁。这是AQS的基本原理,跟生活中排队的场景十分相似。

    1.3、ReentrantLock的非公平锁模式

    Lock接口的一个典型实现即ReentrantLock。ReentrantLock顾名思义即可重入锁,可重入指已经获取锁的当前线程可再次获取锁,而不必等待当前锁的释放。

    ReentrantLock中的Sync抽象类继承了AbstractQueuedSynchronizer类,并提供了两个实现类FairSync、NonfairSync以实现公平锁和非公平锁,并提供了两个构造函数,默认为非公平锁,或通过指定构造函数的参数实现公平锁。

    加锁、释放锁案例:

    @Test
    public void testReentrantLock() {
        Lock lock = new ReentrantLock();
        // 注意:加锁不能写在try代码块,如果try代码块加锁未成功,则finally代码块释放锁会出现异常。
        lock.lock();
        try {
            System.out.println("加锁");
        } finally {
            lock.unlock();
            System.out.println("解锁");
        }
    }
    

    下面就以ReentrantLock类为例,分析加锁、解锁的过程。为了让文章目录更为清晰,以下分为非公平锁、公平锁两部分分析。

    1.3.1、lock()

    final void lock() {
        // 无线程持有锁
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        // 已有线程持有锁
        else
            acquire(1);
    }
    

    通过lock接口获取锁,大体上可以分为以下三种情况:

    1. 锁没有被其它线程持有,则获取锁并立即返回,将持有锁的计数设置为一。
    2. 当前线程已经持有锁,则将持有锁的计数加一。
    3. 锁被其他线程持有,则阻塞当前线程直至获取到锁,然后将持有锁的计数设置为一。

    第一步很好理解,直接通过compareAndSetState(0, 1)设置同步状态即可;若设置同步状态失败,则说明已有线程持有锁(可能是当前线程重入、或其它线程),则通过acquire(1)方法以阻塞的形式获取锁。

    1.3.2、acquire()

    public final void acquire(int arg) {
        // 获取同步状态
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 若线程在获取同步状态的过程中曾被中断,则再次中断该线程
            selfInterrupt();
    }
    
    1. 以独占线程的形式获取锁
    2. 忽略中断
      1. 假如T1进入lock()方法后即被中断,lock()方法不会抛出InterruptedException异常
      2. lock()在方法执行过程中会清除、并记录线程是否被中断
        1. 否,不做任何操作
        2. 是,调用selfInterrupt()中断线程,此时已经获取到同步状态,线程得以继续被执行,线程调用者可以自行决定是否中断继续中断该线程。
    3. 调用tryAcquire方法尝试获取同步状态
      1. 成功,返回
      2. 失败
        1. 调用addWaiter()方法将线程加入同步队列
        2. 阻塞线程直至获取到同步状态

    中断可以简单理解为线程的一个标识属性,表示其是否被其他线程进行了中断操作。线程被中断不代表该线程的操作被终止,而是要线程自己去判断是否被中断过,并做出后续的处理。

    整体逻辑:


    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zYjudMqN-1605661085814)(media/15953305070670/15959337500275.jpg)]
    1.3.2.1、tryAcquire()
    // 非公平锁加锁
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        // 尝试以独占的形式获取锁
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 判断是否重入
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    
    1. 以独占的形式获取锁
      1. 成功,返回true
      2. 失败,再判断持有锁的线程是否为当前线程
        1. 是,将同步状态加一并更新,返回true
        2. 否,则说明有其他线程持有锁,当前线程需等待其他线程释放锁。返回false
    1.3.2.2、addWaiter()

    代码运行至此,则说明有其他线程持有锁,那么当前获取锁的线程将被阻塞直至获取到锁。这里就用到了前文提到的双端队列,addWaiter()方法将当前获取锁的线程够造为Node节点,并加入队列。

    private Node addWaiter(Node mode) {
        // 将当前线程构造为Node节点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 尝试快速在队列尾部添加
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 快速添加失败,调用全量方法将节点加入队列
        enq(node);
        return node;
    }
    
    // 节点入队全量方法
    private Node enq(final Node node) {
        // 自旋
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    1. 若队列为空,初始化队列,并设置头结点、尾节点
    2. 若队列不为空,则将节点加至队列尾部
    1.3.2.3、acquireQueued()

    节点入队后,随即调用acquireQueued()方法以阻塞的形式获取锁。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 自旋
            for (;;) {
                // 获取前驱节点
                final Node p = node.predecessor();
                // 为保证队列“先进先出”的特性。当前驱节点为头节点时,可尝试获取同步状态,其它节点依然阻塞
                // 注意:若某个节点的前驱节点为头节点,则说明它是队列中的第一个节点,符合“先进先出”原则
                if (p == head && tryAcquire(arg)) {
                    // 获取同步状态成功,更新头结点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 若前驱节点非头结点、或者获取同步状态失败,则继续阻塞当前节点
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    1. 第一个if,判断前驱节点是否为head节点
      1. 是,尝试获取同步状态
        1. 获取成功,返回true
        2. 获取失败,进行下一个if判断
      2. 否,进行下一个if判断
    2. 第二个if,执行到此,要么前驱节点非头节点,不符合出队原则;要么前驱节点是头结点,但获取同步状态失败(持有锁的线程依然未释放锁)
      1. 判断当前节点尝试获取同步状态失败后是否应当阻塞、并更新节点状态
        1. 是,阻塞当前节点,清除、并记录当前线程中断标识
        2. 否,自旋,进行下一轮判断

    第一个if判断非常简单,不多介绍,下面看第二个if语句中的量个判断条件:

    // 检测并更新获取同步状态失败的节点。若返回true,则当前节点应阻塞
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取前驱节点的状态
        int ws = pred.waitStatus;
        // 前驱节点的状态为SIGNAL,则后继节点应被阻塞,返回true
        if (ws == Node.SIGNAL)
            return true;
        // 前驱节点状态为CANCELLED,则清除队列中已经被取消的节点
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        }
        // 运行至此,前驱节点的状态要么为初始状态(0)、要么为PROPAGATE。
        // 此时,将前驱节点的状态改为 SIGNAL ,以便在下一轮自旋中阻塞当前节点
        else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    // 阻塞当前节点,清除并返回当前线程中断标识
    private final boolean parkAndCheckInterrupt() {
        // 阻塞当前线程
        LockSupport.park(this);
        // 清除并返回线程中断标识
        return Thread.interrupted();
    }
    
    // 这里已经到了获取同步状态最底层了
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }
    

    通过LockSupport类的park()方法阻塞线程,后续通过unpark()方法或者interrupt()方法可以唤醒该线程。这也是要清除当前线程中断标识的原因。

    到这里,基于ReentrantLock非公平锁模式下通过lock()方法加锁的过程就分析完毕了。

    1.3.3、unlock()

    前文已经介绍了通过lock()方法加锁的过程,接下来分析一下解锁过程。调用unlock()方法可以实现解锁,其整体流程如下图:

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lh8MCJWg-1605661085816)(media/15953305070670/15959338321882.jpg)]
    // 解锁
    public void unlock() {
        sync.release(1);
    }
    
    public final boolean release(int arg) {
        // 释放同步状态
        if (tryRelease(arg)) {
            // 唤醒后继节点
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    lock()方法流程可以分为两步,释放同步状态、唤醒后继节点。

    // 释放同步状态
    // 注意:如果锁已被重入,需逐步递减同步状态,当同步状态值为0时,表示完全释放了同步状态。
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        // 释放锁的线程必须与AQS中持有同步状态的线程相同
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        // 完全释放同步状态,清空独占线程
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
    
    // 唤醒后继节点   
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        
        Node s = node.next;
        // 回溯,保证后继节点及其状态的合法性
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }    
    

    前文提到,获取同步线程失败的线程,被构造成Node节点,加入到队列并阻塞,这里调用LockSupport.unpark(s.thread)方法,将其唤醒以继续获取同步状态。LockSupport工具类底层使用了unsafe的本地方法,涉及到JVM源码这里不做深入的分析,因为我也不会。

    为了能让整个流程串起来,再继续分析一步。当后继节点被唤醒后。将继续在acquireQueued()方法中获取同步状态,并重新设置头节点、回收已经出队的节点。

    // acquireQueued方法摘抄
    if (p == head && tryAcquire(arg)) {
        setHead(node);
        p.next = null; // help GC
        failed = false;
        return interrupted;
    }
    

    至此,基于ReentrantLock非公平锁模式下通过lock()方法加锁、unlock()方法解锁,以及队列同步器的使用就分析完毕了。

    1.3.4、lockInterruptibly()

    相较于lock()接口lockInterruptibly()在获取锁的过程中,会响应中断。因为前文已经对lock()接口做了较为详细的分析,所以这里我们只简单的分析一下lockInterruptibly()是如何响应中断的。

    // 以响应中断的模式获取同步状态
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    
    public final void acquireInterruptibly(int arg) throws InterruptedException {
        // ①
        // 快速检查线程是否被中断
        if (Thread.interrupted())
            throw new InterruptedException();
        // 调用tryAcquire()方法获取同步状态。注意:该方法不响应中断
        if (!tryAcquire(arg))
            // 以响应中断的模式获取同步状态
            doAcquireInterruptibly(arg);
    }
    
    private void doAcquireInterruptibly(int arg) throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // ② 响应中断
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    1. lockInterruptibly()对中断的响应分别标记在代码的①、②处:
      1. 快速检查线程是否被中断
        1. 是,抛出InterruptedException异常
        2. 否,继续获取同步同步状态
      2. 判断线程在获取锁的过程中是否被中断
        1. 否,返回
        2. 是,抛出InterruptedException异常
    2. lock()方法和lockInterruptibly()对线程中断的处理方式区别:
      1. lock方法不响应中断,但是会记录中断状态,开发者需要自己去判断、并响应中断
      2. lockInterruptibly()方法响应中断,若线程被中断,抛出InterruptedException异常

    1.3.6、tryLock()

    tryLock()方法以非阻塞的形式获取锁。若获取到锁,返回true;否则,返回false,而不会阻塞获取锁的线程。tryLock()方法的具体代码,前文都有介绍,不多赘述。

    1.3.7、tryLock(long time, TimeUnit unit)

    前文已经介绍过了lock()、lockInterruptibly()、tryLock()三种获取同步状态的方式。这三种方式个有优缺点。

    1. lock() 以阻塞的形式获取锁,不响应中断
    2. lockInterruptibly() 以阻塞的形式获取锁,响应中断
    3. tryLock() 以非阻塞的形式获取锁,不响应中断

    综合以上各个方法的特性,lock()、lockInterruptibly()虽然能获取到锁,但是调用者不知道会阻塞多久;tryLock()方法虽然能快速返回是否获取到锁,但是又不会阻塞。tryLock(long time, TimeUnit unit)方法正好综合了以上特点。以带超时阻塞的形式获取锁。

    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    
    public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
        // 响应中断
        if (Thread.interrupted())
            throw new InterruptedException();
        // 调用tryAcquire()方法尝试快速获取同步状态
        // 若tryAcquire()方法未能获取到同步状态,则调用doAcquireNanos以带超时阻塞的形式再次获取同步状态
        return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
    }
    

    tryLock(long timeout, TimeUnit unit)方法中的大部分代码均已分析过,不在赘述,这里只分析doAcquireNanos()方法。

    // 以超时模式获取同步状态
    private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
        // 超时
        if (nanosTimeout <= 0L)
            return false;
        // 计算超时到期时间
        final long deadline = System.nanoTime() + nanosTimeout;
        // 线程构造为AQS节点、入队
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                // 符合出队条件,再次获取同步状态
                if (p == head && tryAcquire(arg)) {
                    // 获取同步状态成功,重新设置头节点、清空释放同步状态的节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                // 计算需要阻塞的时长
                nanosTimeout = deadline - System.nanoTime();
                // 超时
                if (nanosTimeout <= 0L)
                    return false;
                // spinForTimeoutThreshold:自旋超时阈值,1000纳秒(1秒=1000毫秒;1毫秒=1000微秒;1微秒=1000纳秒)
                // 如果nanosTimeout大于spinForTimeoutThreshold,则将线程阻塞nanosTimeout纳秒
                if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
                    // 阻塞nanosTimeout纳秒,到期唤醒后,再次以自旋的形式获取锁
                    LockSupport.parkNanos(this, nanosTimeout);
                // 响应中断
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    到这里,基于ReentrantLock可重入锁的非公平模式下的lock()、lockInterruptibly()、tryLock()、tryLock(long time, TimeUnit unit)、以及unlock()方法都以分析完毕。这一部分内容相对来讲较难,需要多多分析、多调试才能有更为深刻的了解。笔者水平有限,还望多多指正。

    从后面的小节开始,分析公平锁模式


    1.4、ReentrantLock的公平锁模式

    公平锁模式保证先获取锁的线程一定能够先获得锁,简单理解就是“先到先得”。前文已经分析过非公平锁模式,下文的分析我们着重分析两者之间的区别,而不再逐个分析每个方法。

    1.4.1、lock()

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // hasQueuedPredecessors() 查询是否有线程等待获取的时间长于当前线程。
            if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    针对于lock()方法,非公平锁和公平锁的区别就在于在tryAcquire()方法中多了一个hasQueuedPredecessors()判断。即判断是否有线程等待获取的时间长于当前线程。

    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    

    1.4.2、tryLock()

    tryLock()方法会破坏锁的公平性。若能立即获取锁,返回true;否则返回false。而不会考虑是否有其他线程先于此线程获取锁。

    1.4.3、tryLock(long time, TimeUnit unit)

    tryLock(long time, TimeUnit unit)保持锁的公平性与lock()方法一致,不多赘述。

    相关文章

      网友评论

          本文标题:Lock接口

          本文链接:https://www.haomeiwen.com/subject/sleobktx.html