美文网首页
AbstractQueuedSynchronizer

AbstractQueuedSynchronizer

作者: 奔向学霸的路上 | 来源:发表于2020-06-25 22:22 被阅读0次

    AQS简介

    JUC将AQS定位为一个模板方法,它是一个抽象类,不可被实例化,它的设计之处是为了让子类通过继承的方式实现多样的功能。

    AQS原理

    AQS无非关注点在于:状态(业务主要逻辑控制)、队列(等待队列)、CAS(安全set值)

    我们以ReentrantLock为例,来说明AQS提供的独占功能

    ReentrantLock的公平锁

    lock()
            final void lock() {
                acquire(1);
            }
    

    父类AQS的方法

      public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    tryAcquire交由子类去实现

    /**
             * Fair version of tryAcquire.  Don't grant access unless
             * recursive call or no waiters or is first.
             */
            protected final boolean tryAcquire(int acquires) {
                //当前线程
                final Thread current = Thread.currentThread();
                //获取同步状态
                int c = getState();
                if (c == 0) {
                    //如果等待队列中,当前线程前没有其他线程,CAS更新同步状态
                    if (!hasQueuedPredecessors() &&
                        compareAndSetState(0, acquires)) {
                        //设置锁的占有线程为当前线程
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                //可重入的情况
                else if (current == getExclusiveOwnerThread()) {
                    //同步状态加1
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
        }
    

    同步状态state含义如下:


    image.png

    接下来看AQS的acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法
    首先看addWaiter(Node.EXCLUSIVE),它将当前线程包装成一个独占结点,并添加到队列尾部。
    每个Node需要标记是独占的还是共享的,由传入的mode决定,ReentrantLock自然是使用独占模式Node.EXCLUSIVE。

    /**
         * Creates and enqueues node for current thread and given mode.
         *
         * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
         * @return the new node
         */
        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            //如果队列不为空,采用CAS方式将新Node加入到队尾
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            //如果CAS修改失败或者队列为空,进入enq方法
            enq(node);
            return node;
        }
    

    enq方法是个死循环,保证node一定能插入到队列,注意如果队列为空,会为头节点创建一个空的Node。

    /**
         * Inserts node into queue, initializing if necessary. See picture above.
         * @param node the node to insert
         * @return node's predecessor
         */
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
    image.png

    下一步是调用acquireQueued阻塞线程

    /*
         * Various flavors of acquire, varying in exclusive/shared and
         * control modes.  Each is mostly the same, but annoyingly
         * different.  Only a little bit of factoring is possible due to
         * interactions of exception mechanics (including ensuring that we
         * cancel if tryAcquire throws exception) and other control, at
         * least not without hurting performance too much.
         */
    
        /**
         * Acquires in exclusive uninterruptible mode for thread already in
         * queue. Used by condition wait methods as well as acquire.
         *
         * @param node the node
         * @param arg the acquire argument
         * @return {@code true} if interrupted while waiting
         */
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                   // 1.如果正好是头节点,调用tryAcquire尝试,如果获取成功,修改头节点为自己
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    //2. 如果获取失败,线程可能等着下一次获取,也可能不想要了,Node变量waitState描述了线程的等待状态
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    2处会有4中状态

    static final int CANCELLED =  1;   //取消
    static final int SIGNAL    = -1;     //下个节点需要被唤醒
    static final int CONDITION = -2;  //线程在等待条件触发
    static final int PROPAGATE = -3; //(共享锁)状态需要向后传播
    

    shouldParkAfterFailedAcquire传入当前节点和前节点,根据前节点的状态,判断线程是否需要阻塞

    /**
         * Checks and updates status for a node that failed to acquire.
         * Returns true if thread should block. This is the main signal
         * control in all acquire loops.  Requires that pred == node.prev.
         *
         * @param pred node's predecessor holding status
         * @param node the node
         * @return {@code true} if thread should block
         */
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    
    • 前节点状态是SIGNAL时,当前线程需要阻塞;
    • 前节点状态是CANCELLED时,通过循环将当前节点之前所有取消状态的节点移出队列;
    • 前节点状态是其他状态时,需要设置前节点为SIGNAL。

    如果线程需要阻塞,由parkAndCheckInterrupt方法进行操作。

     /**
         * Convenience method to park and then check if interrupted
         *
         * @return {@code true} if interrupted
         */
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

    parkAndCheckInterrupt使用了LockSupport,和cas一样,最终使用UNSAFE调用Native方法实现线程阻塞,最后返回线程唤醒后的中断状态。

    todo 其他

    当然AQS不仅仅只有这些,其他内容以后再整理。

    相关文章

      网友评论

          本文标题:AbstractQueuedSynchronizer

          本文链接:https://www.haomeiwen.com/subject/gxdbxktx.html