美文网首页
ReentrantLock实现分析

ReentrantLock实现分析

作者: 写代码的杰西 | 来源:发表于2020-06-14 22:35 被阅读0次

    Lock接口是jdk1.5以后加入的解决线程安全的除了synchronized关键字以外的另一个选择。相对于synchronized更加灵活。今天根据ReentrantLock的源码来分析一下他的实现原理。


    ReentrantLock.png

    上图简单画了一下ReentrantLock类图的关系。
    下面写一段有线程安全问题的代码,用lock进行解决

    public class LockDemo {
    
        public int sum=0;
        private Lock lock  = new ReentrantLock();
    
        public void add(){
            lock.lock();
            try {
                sum+=1;
            }finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) {
            LockDemo lockDemo = new LockDemo();
            for (int i = 0; i < 100000; i++) {
                Thread t = new Thread(()->{
                    lockDemo.add();
                });
                t.start();
            }
            System.out.println(lockDemo.sum);
        }
    }
    
    

    跟进lock()方法的实现

        public void lock() {
            sync.lock();
        }
    

    直接调用了sync成员变量的lock方法,看看sync的lock方法怎么实现的
    有两种实现,公平和非公平,先看看默认的非公平

    static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            /**
             * Performs lock.  Try immediate barge, backing up to normal
             * acquire on failure.
             */
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    

    静态内部类NonfairSync继承自Sync,看看Sync是何方妖孽

    abstract static class Sync extends AbstractQueuedSynchronizer 
    

    继承自AbstractQueuedSynchronizer 也就是大名鼎鼎的aqs。aqs是个同步器,是1.5以后各种同步工具的基本工具。
    aqs中有个变量state,可以用来标示当前工具的一个状态。在lock中,他表示重入次数,如果没有线程占用锁,那么state就是0.如果有,就是大于等于1.在抽象类Sync中lock是抽象方法,需要子类去实现。先看看非公平锁的实现。

    final void lock() {
                //非公平锁线程进入方法以后直接尝试获取一次锁
                if (compareAndSetState(0, 1)) 
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    

    线程进入方法以后直接尝试获取锁。通过compareAndSetState方法来设置state的值。看看compareAndSetState的实现,方法位于aqs类中

    protected final boolean compareAndSetState(int expect, int update) {
            // See below for intrinsics setup to support this
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }
    

    这里调用了本地方法unsafe的compareAndSwapInt,采用cas的方式设置state。cas就是对比并且交换,三个参数stateOffset是当前锁的state的值锁存放的内存偏移量,expect是期望当前state的值,update是要设置的值。在这个方法执行时,拿传入的expect去和state比较,如果一致,就设置为update。如果不一致,就返回false。在lock方法执行时,compareAndSetState(0, 1),期望state在内存中为0,如果为0,说明当前锁未被其他线程占用,就设置为1.如果不是0,说明当前锁有被其他线程占用,返回false。
    这里如果成功获取锁,会将当前线程设置为锁的持有者。如果不成功,进入 acquire(1);方法。
    acquire的英文翻译过来是获取。

    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    这里分为2个步骤,tryAcquire(arg) 和acquireQueued(addWaiter(Node.EXCLUSIVE), arg))。先看第一个步骤

    tryAcquire方法在NonfairSync中实现。

    protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
    
     final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();//获取当前的state
                if (c == 0) {//如果没有锁
                    if (compareAndSetState(0, acquires)) {//尝试获取锁
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                //如果是自己进入了,增加重入次数
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
              //未能获取锁或者啥也没干,返回false
                return false;
            }
    
    

    如果tryAcquire方法返回了false,就会继续执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    先看addWaiter方法,由aqs实现

    private Node addWaiter(Node mode) {
          //新建一个节点,把当前线程包裹进去
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            //如果尾巴有节点
            if (pred != null) {
                node.prev = pred;
                //把当前线程节点设置为尾巴
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
          //如果走到这一步,说明尾巴没节点,或者设置尾巴节点失败了
            enq(node);
            return node;
        }
    

    进一步看看enq方法,enq,我理解为enter queue

    private Node enq(final Node node) {
            //一顿自旋,看来不完成一件事不罢休
            for (;;) {
                Node t = tail;
              //如果尾巴节点没有,就初始化链表,进入第二次自旋
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else { //尾巴节点有东西了,有可能是本来就有,有可能是第一次自旋初始化的
                    node.prev = t;
                    //这一步如果失败,继续自旋,直到成功把当前节点设置为尾巴节点。
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    至此addWaiter方法就把竞争锁失败的线程包裹成一个node放入了队列中,看看下一步acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                //又自旋  在这自旋是因为要挂起线程,当线程重新被唤醒的时候,还是在这继续执行代码
                for (;;) {
                    //刚刚加入队列节点的前一个节点
                    final Node p = node.predecessor();
                    //如果前一个节点是头节点,说明没排队的了,尝试获取一次锁
                    if (p == head && tryAcquire(arg)) {
                        //获取锁后,设置当前节点为头节点
                        setHead(node);
                        //把原本的头节点移除链表
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    //如果走到这,说明没获取到锁,尝试挂起当前线程
                    //should方法,判断一下当前节点应不应该挂起来,park
                    if (shouldParkAfterFailedAcquire(p, node) &&
                    //调用locksupport挂起当前线程,并且
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    非公平锁的lock方法就此结束了
    看看公平锁的lock方法实现

    final void lock() {
                acquire(1);
            }
    

    区别就是直接acquire,进去看看tryAcquire的实现

    protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                  //如果前面没有节点了,直接获取,否则返回false
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    后面的都一样。

    相关文章

      网友评论

          本文标题:ReentrantLock实现分析

          本文链接:https://www.haomeiwen.com/subject/xhmfxktx.html