美文网首页
ReentrantLock源码分析,顺便看看AbstractQu

ReentrantLock源码分析,顺便看看AbstractQu

作者: keepSwiming | 来源:发表于2017-07-07 17:34 被阅读10次

    示例小demo

    public class reentrantlockTest {
    
        private ReentrantLock lock=new ReentrantLock();
        public void test1(){
            try {
                System.out.println("test1开始等待获取锁");
                lock.lock();
                System.out.println("test1已经获取锁");
                Thread.sleep(4000);
    
            }catch (InterruptedException e) {
                    Thread.interrupted();
            } finally {
                lock.unlock();
            }
        }
    
        public void test2(){
            try {
                System.out.println("test2开始等待获取锁");
                lock.lock();
                System.out.println("test2开始已经获取锁");
            }finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) {
            final reentrantlockTest test= new reentrantlockTest();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    test.test1();
                }
            },"t1").start();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    test.test2();
                }
            },"t2").start();
        }
    
    test1开始等待获取锁
    test1已经获取锁
    test2开始等待获取锁
    test2开始已经获取锁
    

    用起来很简单lock,unlock就可以了。当多个线程同时要获取这个锁时候到底发生了什么?

    简单描述

    在具体分析源码之前,先用语言简单描述一下.


    aqs队列简单图示,源自网络

    有一位足疗店技师活特别好,大家去消费都想点他服务,但是他只是一个人,所有来的人得排队等他服务,开始服务就相当于加锁,服务结束相当于释放锁资源。
    之后第一个人去叫第二个人进来服务,不断循环,直到都所有人都消费完成.

    还有个概念
    公平锁:大家都老老实实排队,先到的先被服务.
    非公平锁:在第一个人叫第二个人的这段时间内,来消费就先进屋看看,还没人进来,就直接让技师服务(就是抢个时间差,直接插队,不过这样就节省了第一个人叫第二人的时间,效率高些).

    那ReentrantLock和Synchronize锁有什么区别
    先说相同点:两者都是可以非公平锁.在最新的jdk版本下效率差不多
    不同点:
    Synchronize:使用简单一些,非特定场景可以用
    ReentrantLock:可以实现公平锁,可以在等资源时候可以中断.

    那么AQS在里面起什么作用?
    队列怎么排,怎么入队,出队,资源消息是怎么传递的.

    开始源码分析

    从lock开始分析

    public void lock() {
        sync.lock();
    }
    
    final void lock() {
        state是一个表示锁资源状态的参数
        1.上来就直接插队,用CAS算法看能不能获取锁资源
        if (compareAndSetState(0, 1))
            获取成功后,当前线程获得资源
            setExclusiveOwnerThread(Thread.currentThread());
        else
        2.如果获取不成功,正常排队等待
            acquire(1);
    }
    
    lock默认采用非公平锁,上来尝试获取锁,获取不到进入aqs队列排队
    

    acquire()

    public final void acquire(int arg) {
        1.尝试获取锁资源,直接抛出异常,必须由子类定义什么才是获取锁
        2.如果获取不到锁,创建一个新的队列节点,将节点放到队列里面
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            3.拿到锁之后,如果该线程是中断状态,直接中断线程
            selfInterrupt();
    }
    
    入队前在尝试一次获取锁,如果获取不到,添加一个节点,把节点放到队列里排队,阻塞知道被前节点unpark通知,恢复执行后检查是否需要中断
    

    tryAcquire()

    protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
            
    nonfairTryAcquire(acquires);
    
    ================================
    final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                1.获取锁的状态
                int c = getState();
                2.如果没有线程持有锁
                if (c == 0) {
                3.尝试自己获取锁
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                4.如果是当前已经是该线程持有锁,那么将state计数器+1.这里就体现了 可重入的特性
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
    
    自定义了锁的获取形式state>0,表示锁已经被占有了,只有cas(0到1)是才获取成功,这样保证了独占模式。如果当前是该线程占有这个锁,那么state+1,unlock时-1,重入性就是这么来的.
    可以看到这里继承了aqs之后,自由的实现锁的获取方式,concurrent包里面不少类通过实现不同的锁的获取,来实现不同的特性.后续博文中会陆续介绍
    

    addWaiter(Node.EXCLUSIVE)

    这里添加了一个独占模式的节点
    先看addWaiter(Node.EXCLUSIVE)
    添加一个等待节点
    private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            1.如果存在尾部节点,将新节点链接在其后,并且将新节点设置为尾部节点
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            2.有可能队列为空,则直接入队
            enq(node);
            return node;
        }
    =====================
        private Node enq(final Node node) {
            for (;;) {
            1.如果尾部为空
                Node t = tail;
                if (t == null) { // Must initialize
                2.先初始化一个空节点,将其设置为 头,尾 然后for循环
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                3.到这里肯定已经有尾部节点了,将我们的节点加在尾部
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        4.最后将节点返回
                        return t;
                    }
                }
            }
        }
    
    这里就是初始化队列(实际是双向链表),头尾都是傀儡节点,将节点链接到链表尾部
    

    现在有了一个节点,那么就开始入队了
    acquireQueued()

     final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                1.获取当前节点的前一个节点
                    final Node p = node.predecessor();
                    2.如果前节点是头节点,尝试一次获取锁
                    if (p == head && tryAcquire(arg)) {
                    3.如果获得了锁,把该节点设置为头节点,将其设置为傀儡节点
                        setHead(node);
                        4.原头节点后面的链表置为空,// help GC
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    5.如果获取锁失败,那么就需要挂起该线程,等待通知
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
            6.如果失败,做失败处理
                if (failed)
                7.后面进行分析
                    cancelAcquire(node);
            }
        }
        
    shouldParkAfterFailedAcquire()
    =============
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    1.这里又多了一个状态,waitStatus表示节点的等待状态
    2.先获取前节点状态
            int ws = pred.waitStatus;
            3.如果前节点处于通知状态,意味当前节点可以尝试去获取锁了 
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            4.如果前节点是cancel状态
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                 5.那就不断遍历找到不是取消状态的节点
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                 6.如果头节点是新创建的状态是0(PROPAGATE以后在讨论),(这里设置为0是因为Unlock时候处理的,具体细节看后面)那么将他设置为通知状态
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        } 
        
    parkAndCheckInterrupt()
    ===============
    private final boolean
    1.走到了这里说明,该线程是可以挂起的状态了
    parkAndCheckInterrupt() {
        2.这里用了park(park(),unpark(),这一对方法很有意思。wait,notify都应该知道,先wait,在sign才可以生效要不然就卡死了.这里park,unPark,作用是类似的,但是完全不需要顺序,可以先unpark,在Park.具体写个小demo就可以了解了)
        
       所以就避免了这种情况,head已经执行完了,也unpark了,但是当前线程还没有执行到挂起的地方,造成卡死
       
       最后线程阻塞到这里,等到前节点通知
        LockSupport.park(this);
        
        3.返回线程是否中断了(被自己或其他中断了)
        只有当节点被唤起后才能设置中断状态
        return Thread.interrupted();
    }
    
    
    

    上面入队之后进行阻塞,直到接到前节点发到的信号.

    以上就是lock的实现流程,下面在看看unlock的操作

    public void unlock() {
        sync.release(1);
    }
    
    public final boolean release(int arg) {
        1.尝试释放锁资源
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    tryRelease(arg) 同tryAcquire一样需要子类自定义获取锁的方式
    
    ====================================
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    
    子类的具体实现
    protected final boolean tryRelease(int releases) {
    1.锁当前状态-1 
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                2.如果状态等于0了,那么这个锁就被成功的释放了
                if (c == 0) {
                    free = true;
                     setExclusiveOwnerThread(null);
                }
                3.如果减了一次还不为0,那么当前这个线程多了lock了这个锁,这里也是可重入导致的
                setState(c);
                return free;
            }
    ==========================================  
    private void unparkSuccessor(Node node) {
            /*
             * If status is negative (i.e., possibly needing signal) try
             * to clear in anticipation of signalling.  It is OK if this
             * fails or if status is changed by waiting thread.
             */
            int ws = node.waitStatus;
            1.将头状态设置为0
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
             * Thread to unpark is held in successor, which is normally
             * just the next node.  But if cancelled or apparently null,
             * traverse backwards from tail to find the actual
             * non-cancelled successor.
             */
             2.找到后继节点,并且没有取消,通知该节点恢复 这也是为什么必须要在finaly里unlock
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
     
    

    以上是源码解析
    最后留一些思考
    1.ReentrantLock独占性,可重入性是怎么实现的
    2.AQS在里面起什么作用?
    3.公平性和非公平性是怎么实现的
    4.park(),unPark()

    相关文章

      网友评论

          本文标题:ReentrantLock源码分析,顺便看看AbstractQu

          本文链接:https://www.haomeiwen.com/subject/zblrhxtx.html