美文网首页JUC并发相关
14. 并发终结之ReentrantLock

14. 并发终结之ReentrantLock

作者: 涣涣虚心0215 | 来源:发表于2020-09-25 00:04 被阅读0次

显式锁Lock是JDK1.5开始引入的排他锁,作用跟内部锁一致,只是提供了一些内部锁所不具备的特性。
ReentrantLock是Lock的默认实现类。
在此之前我们过了一遍ConditionObject的源码,里面提到condition queue和sync queue,其中sync queue就是加锁失败之后线程进入的queue,这里我们接着分析ReentrantLock的lock和unlock过程。

ReentrantLock源码

public class ReentrantLock implements Lock, java.io.Serializable {
    //定义一个final的Sync成员变量,就是AQS的实现类
    private final Sync sync;
    abstract static class Sync extends AbstractQueuedSynchronizer {
        abstract void lock();
        //非公平申请锁方法,提供给NonFair实现以及tryLock()方法实现
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                //这里跟Fair的区别,不看queue里是不是有前继节点在等待,而直接尝试CAS更新state
                if (compareAndSetState(0, acquires)) {
                    //尝试更新state成功即获得锁,设置当前线程未独占锁Owner
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }  
            //如果c!=0表示已经有锁了,就看持有锁的线程是不是当前线程
            else if (current == getExclusiveOwnerThread()) {
                //如果是当前线程持有锁,就再更新state,进行一此重入行为
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            //如果不是当前线程持有锁,则表示加锁失败
            return false;
        }
        //尝试释放独占锁
        protected final boolean tryRelease(int releases) {
            //拿到state减去需要释放锁的数量,得到还持有多少锁
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                //如果release所有持有的锁,那么c==0
                free = true;
                setExclusiveOwnerThread(null);
            }
            //更新释放后的state
            setState(c);
            return free;
        }
        //是不是当前线程持有独占锁
        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
        //返回一个新的condition队列
        final ConditionObject newCondition() {
            return new ConditionObject();
        }
        //返回独占锁的持有线程
        final Thread getOwner() {
            return getState() == 0 ? null : getExclusiveOwnerThread();
        }
        //返回当前线程持有的可重入锁的数量state
        final int getHoldCount() {
            return isHeldExclusively() ? getState() : 0;
        }
        //返回是不是被锁住的
        final boolean isLocked() {
            return getState() != 0;
        }
    }

    /**
     * non-fair Sync 实现
     */
    static final class NonfairSync extends Sync {
      //非公平锁加锁,
        final void lock() {
            //如果当前state是0,且CAS更新state=1成功,表示lock成功
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                //如果更新state失败,表示没有获取到锁,则进入尝试加锁流程
                acquire(1);
        }
        //非公平锁,尝试获取锁
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

    /**
     * fair Sync object
     */
    static final class FairSync extends Sync {
        //公平锁lock的实现,调用tryAcquire尝试去获取锁,
        final void lock() {
            acquire(1);
        }
        //公平锁尝试获取锁,如果state=0,可以加锁的时候,
        //   hasQueuedPredecessors()查看是不是有前继节点,如果没有,再开始CAS设置state
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

    /**
     * 默认的实现是非公平锁
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }
     //可以设置是公平锁还是非公平锁
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    //不响应中断的lock
    public void lock() {
        sync.lock();
    }
    //能够响应中断的lock
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }
    //通过AQS来的nonFairAcquire来尝试获取锁,
    public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }
    //带超时的tryLock,这个是很实用的方法,如果获取锁失败不会进入sync queue等待,而是返回false,业务逻辑可以处理失败的情况
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    //通过AQS来释放锁
    public void unlock() {
        sync.release(1);
    }
}

上面是ReentrantLock的部分源码,主要是Fair和NonFair锁的实现

执行逻辑:

  1. 尝试获取对象的锁,如果获取不到(以及有其他线程获得了锁,并且没有释放),那么它就会进入到AQS的sync queue中。
  2. 如果获取到,那么根据锁是公平锁Fair还是非公平锁NonFair
    1)如果是公平锁,那么线程会直接放到sync queue的末尾
    2)如果是非公平锁,那么线程会首先尝试CAS获取锁,如果失败,则与公平锁的处理方式一致,放到sync queue末尾。
  3. 当锁被释放时(unlock),那么底层会调用release方法对state成员变量进行减一,减一之后,如果state不为0,那么release操作执行完毕;如果state为0,则调用LockSupport的unpark方法唤醒该线程后的等待队列中的第一个后继线程(pthread_mutex_unlock),将其唤醒,是其能够获得对象的锁(release时,公平锁和非公平锁的处理逻辑是一致的)
    ReentrantLock所谓的上锁,本质就是AQS的state成员变量的操作,state+1表示上锁,state-1表示释放锁(区别与Synchronized使用对象头)

资源调度策略:

公平:从排队角度看,公平策略不允许插队,每次来争抢资源时会先判断sync queue是不是有等待的线程,即是不是有前继节点,如果有,则直接加入sync queue的末端。
非公平:允许插队,即资源持有者释放线程,sync queue中会有一个线程有机会被选中唤醒来申请资源,但是这个过程中有另一个线程也开始来抢占这个资源,这个新线程不会去看是不是等待队列有在等待的线程,而是直接参与申请资源,申请成功则获得资源,而等待队列的那个线程继续等待;申请失败则新线程会加入等待队列。非公平策略吞吐量比较高,缺点是极限情况下,会出现饥饿现象,即等待队列里的线程可能永远无法获得所需资源。

可中断的lockInterruptibly

acquireInterruptibly()方法开始的时候会判断线程是否中断。
doAcquireInterruptibly()的逻辑与lock()方法调用的acquireQueued()类似,只是acquireQueued不响应中断,而是通过返回值表示中断与否;而doAcquireInterruptibly内部会抛出InterruptedException。

public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
lock和condition的排队与插队
Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();

        new Thread(()->{
            lock.lock();
            try{
                condition.await();
                System.out.println(Thread.currentThread().getName()+"--invoke");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }, "Thread1").start();
        Thread.sleep(100);
        new Thread(()->{
            lock.lock();
            try{
                condition.await();
                System.out.println(Thread.currentThread().getName()+"--invoke");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }, "Thread2").start();
        Thread.sleep(100);
        new Thread(()->{
            lock.lock();
            try{
                condition.await();
                System.out.println(Thread.currentThread().getName()+"--invoke");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }, "Thread3").start();
        Thread.sleep(100);
        new Thread(()->{
            lock.lock();
            try{
                Thread.sleep(50000);
                System.out.println(Thread.currentThread().getName()+"-------signal");
                condition.signalAll();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }, "Thread4").start();
        Thread.sleep(100);
        new Thread(()->{
            lock.lock();
            try{
                System.out.println(Thread.currentThread().getName()+"-------get lock");
                Thread.sleep(10000);
                System.out.println(Thread.currentThread().getName()+"-------finish");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }, "Thread5").start();
        Thread.sleep(100);
        new Thread(()->{
            lock.lock();
            try{
                System.out.println(Thread.currentThread().getName()+"-------get lock");
                Thread.sleep(10000);
                System.out.println(Thread.currentThread().getName()+"-------finish");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }, "Thread6").start();
---------------------
Thread4-------signal
Thread5-------get lock
Thread5-------finish
Thread6-------get lock
Thread6-------finish
Thread1--invoke
Thread2--invoke
Thread3--invoke

上面示例里面,Thread1, Thread2, Thread3都在await,进入condition queue,Thread4会持有比较长时间锁,所以Thread5, Thread6会进入sync queue,那么Thread4在持有锁结束之后进行singalAll,唤醒所有conditionQueue的等待线程,并且释放锁,会唤醒syncQueue里head的后继节点。signalAll方法会将Thread1, Thread2, Thread3从condition queue移到sync queue,等待
主要的逻辑还是要看AQS部分源码分析

相关文章

网友评论

    本文标题:14. 并发终结之ReentrantLock

    本文链接:https://www.haomeiwen.com/subject/vuowhktx.html