美文网首页JVM
AbstractQueuedSynchronizer实现分析

AbstractQueuedSynchronizer实现分析

作者: 黑小鹰 | 来源:发表于2019-01-08 15:20 被阅读17次

    1.什么是AbstractQueuedSynchronizer?
    2.同步队列中的节点(Node)
    3.独占式同步状态获取与释放
    4.并发问题
    5.挂起等待线程
    6.一个例子

    什么是AbstractQueuedSynchronizer?

    AbstractQueuedSynchronizer即是同步器(简称AQS),同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

    AQS的核心思想是基于volatile int state这样的一个属性同时配合Unsafe工具对其原子性的操作来实现对当前锁的状态进行修改。当state的值为0的时候,标识改Lock不被任何线程所占有。

    同步队列中的节点(Node)

    同步队列中的节点(Node)用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点,节点的属性类型与名称以及描述如下表所示


    节点是构成同步队列(等待队列)的基础,同步器拥有首节点(head)和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入该队列的尾部,同步队列的基本结构如下图所示


    同步器队列基本结构

    同步器包含了两个节点类型的引用,一个指向头节点,而另一个指向尾节点。试想一下,当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,转而被构造成为节点并加入到同步队列中,而这个加入队列的过程必须要保证线程安全,因此同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect,Node update),它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。

    同步器将节点加入到同步队列的过程


    节点加入到同步队列.jpg

    同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点,该过程


    首节点的设置

    独占式同步状态获取与释放

    当多个线程同时去竞争锁的时候发生了什么?

    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    多个线程同时进来,他们会首先会通过CAS去修改state的状态,如果修改成功,那么竞争成功,因此这个时候多个线程只有一个CAS成功,其他两个线程失败,也就是tryAcquire返回false。

    接下来,addWaiter会把将当前线程关联的EXCLUSIVE类型的节点入队列:

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    

    如果队尾节点不为null,则说明队列中已经有线程在等待了,那么直接入队尾。对于我们举的例子,这边的逻辑应该是走enq,也就是开始队尾是null,其实这个时候整个队列都是null的。

    private Node enq(Node node) {
            for (;;) {
                Node oldTail = tail;
                if (oldTail != null) {
                    node.setPrevRelaxed(oldTail);
                    if (compareAndSetTail(oldTail, node)) {
                        oldTail.next = node;
                        return oldTail;
                    }
                } else {
                    initializeSyncQueue();
                }
            }
        }
    

    如果Thread2和Thread3同时进入了enq,同时t == null,则同时进入initializeSyncQueue()方法;initializeSyncQueue方法通过HEAD.compareAndSet(this, null, (h = new Node()创建同步器tail,第二次同步器尾节点不为空则都进入if代码块,进行CAS加入队尾,这个时候只有一个线程能够成功,然后其他继续进入循环,因此这个时候又是只有一个线程成功,我们假设是Thread2成功,哈哈,Thread2开心的返回了,Thread3失落的再进行下一次的循环,最终入队列成功,返回自己。

    并发问题

    基于上面两段代码,他们是如何实现不进行加锁,当有多个线程,或者说很多很多的线程同时执行的时候,怎么能保证最终他们都能够乖乖的入队列而不会出现并发问题的呢?

    这也是这部分代码的经典之处,多线程竞争,热点、单点在队列尾部,多个线程都通过【CAS+死循环】这个free-lock黄金搭档来对队列进行修改,每次能够保证只有一个成功,如果失败下次重试,如果是N个线程,那么每个线程最多loop N次,最终都能够成功。

    挂起等待线程

    上面只是addWaiter的实现部分,那么节点入队列之后会继续发生什么呢?那就要看看acquireQueued是怎么实现的了,为保证文章整洁,代码我就不贴了,同志们自行查阅,我们还是以上面的例子来看看,Thread2和Thread3已经被放入队列了,进入acquireQueued之后:

    对于Thread2来说,它的prev指向HEAD,因此会首先再尝试获取锁一次,如果失败,则会将HEAD的waitStatus值为SIGNAL,下次循环的时候再去尝试获取锁,如果还是失败,且这个时候prev节点的waitStatus已经是SIGNAL,则这个时候线程会被通过LockSupport挂起。

    对于Thread3来说,它的prev指向Thread2,因此直接看看Thread2对应的节点的waitStatus是否为SIGNAL,如果不是则将它设置为SIGNAL,再给自己一次去看看自己有没有资格获取锁,如果Thread2还是挡在前面,且它的waitStatus是SIGNAL,则将自己挂起。

    如果Thread1死死的握住锁不放,那么Thread2和Thread3现在的状态就是挂起状态啦,而且HEAD,以及Thread的waitStatus都是SIGNAL,尽管他们在整个过程中曾经数次去尝试获取锁,但是都失败了,失败了不能死循环呀,所以就被挂起了。当前状态如下:


    一个例子

    在上述对同步器AbstractQueuedSynchronizer进行了实现层面的分析之后,我们通过一个例子来加深对同步器的理解:

    设计一个同步工具,该工具在同一时刻,只能有两个线程能够并行访问,超过限制的其他线程进入阻塞状态。
    对于这个需求,可以利用同步器完成一个这样的设定,定义一个初始状态,为2,一个线程进行获取那么减1,一个线程释放那么加1,状态正确的范围在[0,1,2]三个之间,当在0时,代表再有新的线程对资源进行获取时只能进入阻塞状态(注意在任何时候进行状态变更的时候均需要以CAS作为原子性保障)。由于资源的数量多于1个,同时可以有两个线程占有资源,因此需要实现tryAcquireShared和tryReleaseShared方法

    public class TwinsLock implements Lock {
        private final Sync sync = new Sync(2);
    
        private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = -7889272986162341211L;
    
            Sync(int count) {
                if (count <= 0) {
                    throw new IllegalArgumentException("count must large than zero.");
                }
                setState(count);
            }
    
            public int tryAcquireShared(int reduceCount) {
                for (;;) {
                    int current = getState();
                    int newCount = current - reduceCount;
                    if (newCount < 0 || compareAndSetState(current, newCount)) {
                        return newCount;
                    }
                }
            }
    
            public boolean tryReleaseShared(int returnCount) {
                for (;;) {
                    int current = getState();
                    int newCount = current + returnCount;
                    if (compareAndSetState(current, newCount)) {
                        return true;
                    }
                }
            }
    
            final ConditionObject newCondition() {
                return new ConditionObject();
            }
        }
    
        public void lock() {
            sync.acquireShared(1);
        }
    
        public void unlock() {
            sync.releaseShared(1);
        }
    
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
        public boolean tryLock() {
            return sync.tryAcquireShared(1) >= 0;
        }
    
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
        }
    
        @Override
        public Condition newCondition() {
            return sync.newCondition();
        }
    }
    
    

    测试用例

    public class TwinsLockTest {
    
        public void test() {
            final Lock lock = new TwinsLock();
            class Worker extends Thread {
                public void run() {
                    while (true) {
                        lock.lock();
                        try {
                            SleepUtils.second(1);
                            System.out.println(Thread.currentThread().getName());
                            SleepUtils.second(1);
                        } finally {
                            lock.unlock();
                        }
                    }
                }
            }
            // 启动10个线程
            for (int i = 0; i < 10; i++) {
                Worker w = new Worker();
                w.setDaemon(true);
                w.start();
            }
            // 每隔1秒换行
            for (int i = 0; i < 10; i++) {
                SleepUtils.second(1);
                System.out.println();
            }
        }
        
        public static void main(String[] args) {
            new TwinsLockTest().test();
        }
    }
    
    

    参考文献:
    [1] AbstractQueuedSynchronizer的介绍和原理分析
    [2] 扒一扒ReentrantLock以及AQS实现原理 原
    [3] 分析ReentrantLock的实现原理
    [4] 《Java并发编程艺术》

    相关文章

      网友评论

        本文标题:AbstractQueuedSynchronizer实现分析

        本文链接:https://www.haomeiwen.com/subject/ileprqtx.html