美文网首页君临天下-Java
ReentrantLock源码解析

ReentrantLock源码解析

作者: 老荀 | 来源:发表于2020-04-04 17:04 被阅读0次

    主要成员

    public class ReentrantLock implements Lock, java.io.Serializable {
        private final Sync sync;
        abstract static class Sync extends AbstractQueuedSynchronizer {
            ...
        }
    }
    

    两种实现

    static final class NonfairSync extends Sync {
        ...
    }
    static final class FairSync extends Sync {
        ...
    }
    

    构造器

    // 默认是非公平的实现
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    

    主要方法

    // 都是直接调用成员sync去实现的
    public void lock() {
        sync.lock();
    }
    
    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }
    
    public void unlock() {
        sync.release(1);
    }
    

    AbstractQueuedSynchronizer

    在聊Sync之前,必须得聊下AQS,几乎是所有java并发工具的基石了

    // AQS的父类,持有一个线程的成员
    public abstract class AbstractOwnableSynchronizer
        implements java.io.Serializable {
        
        protected AbstractOwnableSynchronizer() { }
        // 这个成员指向持有当前AQS的线程
        private transient Thread exclusiveOwnerThread;
    
        protected final void setExclusiveOwnerThread(Thread thread) {
            exclusiveOwnerThread = thread;
        }
        protected final Thread getExclusiveOwnerThread() {
            return exclusiveOwnerThread;
        }
    }
    
    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
    
        protected AbstractQueuedSynchronizer() { }
        ...
    }
    

    等待实现的方法

    AQS是个抽象类,但是已经实现了绝大部分的功能,只有部分方法必须由子类来实现

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
    
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
    
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }
    

    重要成员

    AQS内部实现了一个双向链表结构

    private transient volatile Node head;
    private transient volatile Node tail;
    // 这个字段没有在构造器里初始化所以初始化值就是0
    private volatile int state;
    // 这个node会持有一个线程对象
    static final class Node {
            ...
        volatile int waitStatus;
           
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        Node nextWaiter;
        ...
        Node() {    // Used to establish initial head or SHARED marker
        }
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }
    

    lock

    非公平锁

    final void lock() {
        // 第一次插队的机会,无视state直接更新
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    

    会调用到AQS的方法

    public final void acquire(int arg) {
        // tryAcquire 是由子类实现的
        if (!tryAcquire(arg) &&
            // && 之后的方法不区分公平和非公平,ReentrantLock都是排他锁 所以是Node.EXCLUSIVE
            // addWaiter就是把自己添加到AQS的链表中
            // acquireQueued可以理解为开始排队
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 第二次插队的机会,直接使用CAS尝试去更新state,从0更新到1
            if (compareAndSetState(0, acquires)) {
                // 成功了就是获得锁
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 提前剧透 下面的else if和公平锁没有区别
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
               throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    如果插队失败,当前线程就会成为遵纪守法好公民,尝试进入队列并开始排队

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // 最最一开始tail没有初始化 这里是null
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            // 尝试排在队列末尾,也是用CAS的方式,是原子操作,线程安全的
            if (compareAndSetTail(pred, node)) {
                // 入队成功,返回
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    
    private Node enq(final Node node) {
        // 用死循环保证入队成功
        for (;;) {
            Node t = tail;
            if (t == null) {
                if (compareAndSetHead(new Node()))
                   // 初始化,可以看到tail和head指向是一致的
                   tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // 前一个节点是head,自己是队首,才能尝试去获取锁
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 如果不是就会根据前一个节点的状态,来决定是否等待
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // 等待
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
            }
        }
    }
    

    公平锁

    final void lock() {
        acquire(1);
    }
    

    直接看公平锁的tryAcquire方法

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            // 这个hasQueuedPredecessors就是公平和非公平最大的区别了
            // 返回false才有资格去获取锁
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
             setState(nextc);
             return true;
         }
         return false;
     }
    
    public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    

    关于Node的状态

    Node定义了5种状态
    如果在代码里看到waitStatus>0的条件,就是判断当前Node是不是取消了

    static final class Node {
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
        // 初始化是0
        volatile int waitStatus;
    }
    

    再回头看刚刚的acquireQueued方法

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                ...
            }
        } finally {
            if (failed)
                // 只要在try代码块中有抛出异常,就会进入cancel的流程
                cancelAcquire(node);
            }
        }
    }
    
    private void cancelAcquire(Node node) {
        if (node == null)
            return;
        // 和自己的线程解约
        node.thread = null;
        Node pred = node.prev;
        // 一直循环找到状态不是cancel的节点
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
    
        Node predNext = pred.next;
    
        // 将当前节点置为cancel
        node.waitStatus = Node.CANCELLED;
    
        // 如果自己就是队尾节点,把上一个节点设置成tail
        if (node == tail && compareAndSetTail(node, pred)) {
            // 设置成功后把上一个节点的next设置为空
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                    (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                // 介绍自己的下家给上家认识,自己深藏功与名
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                // 自己是队头就走unlock的流程
                unparkSuccessor(node);
            }
            node.next = node; // help GC
        }
    }
    

    unlock

    解锁操作是不区分公平和非公平的

    public void unlock() {
        sync.release(1);
    }
    
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            // 只有c等于0了,才是释放锁
            // 这里也牵涉到ReentrantLock是可重入锁,是可以反复上锁的
            // 所以对应的就要反复解锁,state这个int字段,就可以理解为上锁的次数
            free = true;
            // 把AQS中的独占线程清空
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
    

    tryLock

    上面看懂后,没有参数的tryLock就特别简单,因为就是非公平锁的实现,并且如果没有获取成功,也不会入队

    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }
    

    带参数的tryLock

    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }
    
    private boolean doAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            // 死循环直到获取锁或者超时
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    // 超时
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    // 用这个LockSupport等待指定的时间
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                // 因为failed默认是true,所以不管是超时还是抛异常,最终都会走到cancel流程
                cancelAcquire(node);
        }
    }
    

    高级篇

    相关文章

      网友评论

        本文标题:ReentrantLock源码解析

        本文链接:https://www.haomeiwen.com/subject/cdpkphtx.html