美文网首页并发(个人收集)
Java基础-AbstractQueuedSynchronize

Java基础-AbstractQueuedSynchronize

作者: tom_xin | 来源:发表于2019-09-28 09:16 被阅读0次

        AbstractQueuedSynchronizer类(简称AQS)依赖于内部的FIFO队列,提供了一个可以实现阻塞锁和同步机制的框架。依赖于AQS实现的具有同步机制和锁功能的类,需要实现它定义的protect方法(tryAcquire,tryRelease等方法),通过改变state字段的状态来保证同步机制。

    AQS的使用场景

    1、ReentrantLock类提供的同步锁的功能也是基于AQS类来实现的,ReentrantLock提供的公平锁(sync)和非公平锁(NofairSync)两种锁机制,都是通过重写tryAcquire和tryRelease方法来实现的。
    ReentrantLock重写的tryAcquire方法:

            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
            // 非公平的获取锁方法
            final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                // 提供了锁的可重入功能,当已经获取锁的线程再次请求获取锁时
                // 将state的值加一。
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    
    

    ReentrantLock重写的tryRelease方法

            protected final boolean tryRelease(int releases) {
                int c = getState() - releases;
                // 判断当前请求释放锁的线程与当前持有锁的线程是不是同一个线程
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
                // 因为提供了锁重入的功能,这里会判断当前线程是否已经释放完所有持有的锁
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    
    

    Reentrant基于AQS加锁的执行过程


    image.png

    2、CyclicBarrier类
    应用场景:创建一组任务,它们并行地执行工作,然后在进行下一个步骤之前等待,直至所有任务都完成,它使得所有并行任务都将在栅栏处列队,因此可以一致的向前移动。
    CyclicBarrier类提供的功能也是基于AQS类来实现的,主要是利用AQS类中ConditionObject类提供的功能。

            // 唤醒所有等待的线程
            public final void signalAll() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignalAll(first);
            }
            // 唤醒所有线程的方法,循环队列中等待的线程,调用transferForSignal方法唤醒在等待中的线程。
            private void doSignalAll(Node first) {
                lastWaiter = firstWaiter = null;
                do {
                    Node next = first.nextWaiter;
                    first.nextWaiter = null;
                    transferForSignal(first);
                    first = next;
                } while (first != null);
            }
    

    transferForSignal方法,会对当前节点状态校验,然后调用LockSupport.unpark方法唤醒指定的线程。

        final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             */
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             */
            Node p = enq(node);
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    

    await方法:当CyclicBarrier类提供的计数器没有减到等于0时,提前进入的线程会调用AQS中ConditionObject类提供的await方法,将当前线程加入到等待队列中。

            public final void await() throws InterruptedException {
                // 如果当前线程被中断,会抛出异常
                if (Thread.interrupted())
                    throw new InterruptedException();
                // 将新节点添加到等待队列中去,节点的状态为 CONDITON。
                Node node = addConditionWaiter();
                // 释放当前线程持有的锁,如果失败则将当前节点的状态改为cancelled(表示当前节点失效)。
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                // 判断当前节点是否在CLH队列中,CONDITION状态的节点,只能在CONDITION队列中。
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                // 出了while循环,说明线程被唤醒,并且已经将该node节点从CONDITION队列transfer到了CLH队列中,或者发生了中断。
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                // 处理异常或者中断的节点
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    

    下面来看一下CyclicBarrier类的执行过程


    image.png

    相关文章

      网友评论

        本文标题:Java基础-AbstractQueuedSynchronize

        本文链接:https://www.haomeiwen.com/subject/sbycyctx.html