美文网首页程序员
Java - ReentrantLock实现细节

Java - ReentrantLock实现细节

作者: 夹胡碰 | 来源:发表于2020-12-23 00:27 被阅读0次

    话不多说,下面通过流程图及源码介绍ReentrantLock的实现细节。
    先看下逻辑流程图,总体统揽:


    1. ReentrantLock的实现依赖于AQS-AbstractQueuedSynchronizer

    ReentrantLock中有两种锁实现,分别是公平锁fairSync和非公平锁NonfairSync,他们都继承Sync,而 Sync又继承自AbstractQueuedSynchronizer 。
    重点关注重写的tryAcquiretryRelease方法,他俩分别是加锁释放锁的重要逻辑。

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;
    
        /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        abstract void lock();
    
        /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
    
        protected final boolean isHeldExclusively() {
            // While we must in general read state before owner,
            // we don't need to do so to check if current thread is owner
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
    
        final ConditionObject newCondition() {
            return new ConditionObject();
        }
    
        ...
    }
    
    2. AQS的模板模式

    acquire方法作为模板方法,包含了未实现方法tryAcquire,相当于抽象类。(这里没有实现成抽象类的原因是AQS还可以提供共享锁的实现框架,而tryAcquire属于独占锁依赖的方法,实现共享锁没必要实现独占锁的抽象方法)

    public final void acquire(int arg) {
        if (!tryAcquire(arg) && // 尝试获取锁
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 获取失败则进入等待队列
            selfInterrupt();
    }
    
    protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    
    3. 以公平锁来看获取锁逻辑

    主要有三个关注点:

    • lock(加锁)调用模板方法acquire
    • tryAcquire(尝试获取锁 成功-true 失败-false)如果当前没有线程持有锁(有可能等待队列中有线程等待,但是锁刚释放,队首线程还未争抢)则CAS争抢锁,争抢成功设置当前线程为锁持有线程。
    • tryAcquire如果有线程持有锁,判断是否是当前线程持有的,如果是则变成重入锁state+1,否则返回false获取失败。
    static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;
    
        final void lock() {
            acquire(1);// 加锁,直接调用AQS的模板方法acquire
        }
    
        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) { // 没有线程持有锁
                if (!hasQueuedPredecessors() && // 等待队列中没有等待线程
                    compareAndSetState(0, acquires)) { // CAS争抢锁
                    setExclusiveOwnerThread(current); // 争抢成功之后将锁持有线程变成当前持有线程
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) { // 重入锁的关键,判断持有锁的线程是否是当前线程
                int nextc = c + acquires;
                if (nextc < 0) // 溢出
                    throw new Error("Maximum lock count exceeded");
                setState(nextc); // 重入锁+1
                return true;
            }
            return false;
        }
    }
    
    4. 尝试获取锁失败进入等待队列
    • 将新进线程节点放置队尾
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node); // 如果等待队列为空,初始化等待队列并将新进线程节点放置队尾
        return node;
    }
    
    • 如果等待队列为空,初始化等待队列并将新进线程节点放置队尾
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) { // 将新进线程节点放置队尾
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    • 将排队线程挂起,形成阻塞态,释放cpu,等待唤醒
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) // 对当前线程进行LockSupport.park(this)操作挂起线程,形成阻塞态,释放cpu
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    • 对当前线程进行LockSupport.park(this)操作挂起线程,形成阻塞态,释放cpu,排队等待获取锁,获取锁时通过LockSupport.unpark唤醒线程。
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    
    5. unlock释放锁
    • 调用AQS的release方法
    public void unlock() {
        sync.release(1); // AQS的release模板方法
    }
    
    • release也是一个模板方法,最后调用unparkSuccessor唤醒线程
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h); // 唤醒线程
            return true;
        }
        return false;
    }
    
    • 调用LockSupport.unpark唤醒等待线程
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread); // 唤醒等待线程
    }
    

    相关文章

      网友评论

        本文标题:Java - ReentrantLock实现细节

        本文链接:https://www.haomeiwen.com/subject/efwenktx.html