美文网首页
java语言之二- Concurrent Lock源码解读

java语言之二- Concurrent Lock源码解读

作者: Wu杰语 | 来源:发表于2018-11-10 23:21 被阅读0次

    Lock所处的位置

    见上一节《java并发知识体系简要指南》知识体系图


    image.png

    在unsafe中提供了两个关键机制:

    • CAS,CPU提供的原子指令
    • park/unpark, 调用这个native函数会将线程调度和停止调度

    由于安全机制,unsafe不能被直接调用,但是可以被虚拟机代码调用,而LockSupport则封装了这两个机制,可以供程序员调用,而为了实现各种场景的ECF(异常控制流),Concurrence的作者 设计了AbstractQueueSychronizer这个中间类,继承这个中间类,就可以轻松实现各种场景的ECF语义。

    而这个中间类的代码是设计巧妙的,比较难懂的,最好的方法是懂得原理的情况下,使用各种ECF调试。

    理解AbstractQueueSychronizer的关键:

    • 数据结构,使用的最简单的双向链表
    • 操作系统底层,就是上面介绍的LockSupport封装
    • 控制逻辑,主要是通过state和Node中的waitState控制的

    各种异常流

    ReetrantLock

    需要理解:

    • 什么时候线程被加锁什么时候解锁
      对于线程不能用串行化的思想来考虑问题,否则就不容易理解了,lock()内部,需要了解的就是什么时候加锁解锁,关键代码如下:
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 添加节点在addwaiter
                selfInterrupt();
        }
    
    acquireQueued的关键代码
    for (;;) {    // 恢复后代码又在这里运行,这里是种递归的运行方式, node就是每一个新节点,这里有点玄妙, release后唤醒的必然是head的后继节点, 这时会在这里更新头节点,或者说进行节点 的删除操作,然后lock的代码退出,进入业务逻辑代码
                    
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {   # 2. 有可能需要重新加锁
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())    # 1. park后代码该线程运行就在这里停止了
                        interrupted = true;
                }
    

    如上,当线程调用lock的时候,该节点会被加入到queue中,如果lock已经被占用,则调用tryAcquire不成功,会调用parkAndCheckInterrupt,操作系统会将该线程挂起。

    当前线程unlock的时候, 会首先将后继节点unpark

    public final boolean release(int arg) {
       if (tryRelease(arg)) {
           Node h = head;
           if (h != null && h.waitStatus != 0)
               unparkSuccessor(h);   // release操作的时候会释放后继节点,让后继节点运行起来
           return true;
       }
       return false;
    }
    

    后继节点unpark后,会从上述“# 1. park后代码该线程运行就在这里停止了”标记处恢复运行,跳出死循环,跳出Lock()函数,执行线程本身的逻辑代码

    • 基类AbstractQueueSychronizer中的queue的运转
    image.png

    如上图,当有两个线程都同时调用lock()的时候,ReetrantLock的队列如上图,头结点的thread是null,这是已经在执行的当前线程。

    如上代码解释,当前线程执行unlock后,会执行unparkSuccessor, 其逻辑是当前节点的waitstate == -1,就找到后继节点,如上后继节点中thread = thread1,就执行unpark(thread1),恢复thread1的运行,queue如下:


    image.png

    如此往复执行,直到Node3的节点执行完毕,此时队列中就只剩下头结点了。

    ReadWriteReetrantLock

    关键代码:

    private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r); # 在这里加入到队列
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())   # 1. 执行了park
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
        
        
        private void setHeadAndPropagate(Node node, long propagate) {
            Node h = head; // Record old head for check below
            setHead(node);   #删除头节点
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }
    

    例如说一堆线程运行,加read锁或者加write锁,执行lock后,queue如下:


    image.png

    如上,thread为Exclusive表示是写锁,是独占的,thread是Shared表示是读锁,共享的。运行过程如下:

    1. 写锁运行时,后续的线程都被阻塞了。
    2. 当写锁完成,发现waitstate=-1,解锁后续线程,后续线程是shared的,它会一直解锁线程,直到所有连续的shared线程都解锁。
            protected final boolean tryReleaseShared(int unused) {
                Thread current = Thread.currentThread();
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                    if (firstReaderHoldCount == 1)
                        firstReader = null;
                    else
                        firstReaderHoldCount--;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    int count = rh.count;
                    if (count <= 1) {
                        readHolds.remove();
                        if (count <= 0)
                            throw unmatchedUnlockException();
                    }
                    --rh.count;
                }
                for (;;) {
                    int c = getState();
                    int nextc = c - SHARED_UNIT;
                    if (compareAndSetState(c, nextc))
                        // Releasing the read lock has no effect on readers,
                        // but it may allow waiting writers to proceed if
                        // both read and write locks are now free.
                        return nextc == 0;
                }
            }
    

    这段代码的关键是,Node2、Node3 unpark后执行跳出lock时会增加state计数,执行完成是执行unlock则会减少state计数,只有当state计数减少为0的时候,才会引发Node4的解锁

    1. Node4是独占锁,和1执行相同,如此往复。

    semaphore

    假定semaphore被初始化为2,有5个线程启动加semaphore


    image.png

    注意看这里的关键点state,state就是表示有几个可解锁的park。
    关键代码

    protected int tryAcquireShared(int acquires) {
                for (;;) {
                    if (hasQueuedPredecessors())
                        return -1;
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
    

    当state 不为0的时候,获取锁总是能成功,此时有2个线程获取锁成功,其它线程获取不到锁会加入到queue中。
    正在运行某个线程运行完毕,则会将state加1,并且删除queue头结点,唤醒后续结点,如此往复,一直保持有2个线程可以运行
    由于只有2个线程在运行,因此state最大只能还原为2.

    Cond

    Cond需要有一把锁,用于保护condition对wait queue转移到queue。


    image.png

    如图,当有两个线程调用了wait()后,均被阻塞,在wait queue中有两个结点。
    当调用了signal后,讲产生结点转移



    Node1从wait queue转移到queue中,当锁unlock后,会去掉头结点,并去头结点的waitstate=-1,将会唤醒后续结点node1.
    如此往复,直至所有的节点执行完毕

    CountDownLanch

    这是数据运行最简单的一个队列了,假定初始化为3


    image.png

    如图,主线程调用await(),被park()加入到queue中。每次有线程调用countDown()时,会讲state减去1,并且检查state是否为0,如果为0,则会讲头结点删除,解锁后续结点,即将主线程解锁。

    相关文章

      网友评论

          本文标题:java语言之二- Concurrent Lock源码解读

          本文链接:https://www.haomeiwen.com/subject/xnzrxqtx.html