美文网首页
AbstractQueuedSynchronizer

AbstractQueuedSynchronizer

作者: 奔向学霸的路上 | 来源:发表于2020-06-25 22:22 被阅读0次

AQS简介

JUC将AQS定位为一个模板方法,它是一个抽象类,不可被实例化,它的设计之处是为了让子类通过继承的方式实现多样的功能。

AQS原理

AQS无非关注点在于:状态(业务主要逻辑控制)、队列(等待队列)、CAS(安全set值)

我们以ReentrantLock为例,来说明AQS提供的独占功能

ReentrantLock的公平锁

lock()
        final void lock() {
            acquire(1);
        }

父类AQS的方法

  public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

tryAcquire交由子类去实现

/**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            //当前线程
            final Thread current = Thread.currentThread();
            //获取同步状态
            int c = getState();
            if (c == 0) {
                //如果等待队列中,当前线程前没有其他线程,CAS更新同步状态
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    //设置锁的占有线程为当前线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //可重入的情况
            else if (current == getExclusiveOwnerThread()) {
                //同步状态加1
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

同步状态state含义如下:


image.png

接下来看AQS的acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法
首先看addWaiter(Node.EXCLUSIVE),它将当前线程包装成一个独占结点,并添加到队列尾部。
每个Node需要标记是独占的还是共享的,由传入的mode决定,ReentrantLock自然是使用独占模式Node.EXCLUSIVE。

/**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        //如果队列不为空,采用CAS方式将新Node加入到队尾
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //如果CAS修改失败或者队列为空,进入enq方法
        enq(node);
        return node;
    }

enq方法是个死循环,保证node一定能插入到队列,注意如果队列为空,会为头节点创建一个空的Node。

/**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
image.png

下一步是调用acquireQueued阻塞线程

/*
     * Various flavors of acquire, varying in exclusive/shared and
     * control modes.  Each is mostly the same, but annoyingly
     * different.  Only a little bit of factoring is possible due to
     * interactions of exception mechanics (including ensuring that we
     * cancel if tryAcquire throws exception) and other control, at
     * least not without hurting performance too much.
     */

    /**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
               // 1.如果正好是头节点,调用tryAcquire尝试,如果获取成功,修改头节点为自己
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //2. 如果获取失败,线程可能等着下一次获取,也可能不想要了,Node变量waitState描述了线程的等待状态
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

2处会有4中状态

static final int CANCELLED =  1;   //取消
static final int SIGNAL    = -1;     //下个节点需要被唤醒
static final int CONDITION = -2;  //线程在等待条件触发
static final int PROPAGATE = -3; //(共享锁)状态需要向后传播

shouldParkAfterFailedAcquire传入当前节点和前节点,根据前节点的状态,判断线程是否需要阻塞

/**
     * Checks and updates status for a node that failed to acquire.
     * Returns true if thread should block. This is the main signal
     * control in all acquire loops.  Requires that pred == node.prev.
     *
     * @param pred node's predecessor holding status
     * @param node the node
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
  • 前节点状态是SIGNAL时,当前线程需要阻塞;
  • 前节点状态是CANCELLED时,通过循环将当前节点之前所有取消状态的节点移出队列;
  • 前节点状态是其他状态时,需要设置前节点为SIGNAL。

如果线程需要阻塞,由parkAndCheckInterrupt方法进行操作。

 /**
     * Convenience method to park and then check if interrupted
     *
     * @return {@code true} if interrupted
     */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

parkAndCheckInterrupt使用了LockSupport,和cas一样,最终使用UNSAFE调用Native方法实现线程阻塞,最后返回线程唤醒后的中断状态。

todo 其他

当然AQS不仅仅只有这些,其他内容以后再整理。

相关文章

  • AQS

    AbstractQueuedSynchronizer源码 AbstractQueuedSynchronizer(A...

  • 1.AbstractQueuedSynchronizer源码分析

    AbstractQueuedSynchronizer AbstractQueuedSynchronizer是什么,...

  • AbstractQueuedSynchronizer

    理解多线程的并发锁,可结合多进程的分布式锁(如Zookeeper的互斥锁、读写锁的实现原理),本质是相通的 介绍 ...

  • AbstractQueuedSynchronizer

    AbstractQueuedSynchronizer ReentrantLock主要内部通过Sync来完成锁的实现...

  • AbstractQueuedSynchronizer

    AQS简介 JUC将AQS定位为一个模板方法,它是一个抽象类,不可被实例化,它的设计之处是为了让子类通过继承的方式...

  • AbstractQueuedSynchronizer

    提供了实现阻塞锁和相关同步器依靠先入先出(FIFO)等待队列(信号量,事件等)的框架. 此类设计对于大多数种类的依...

  • AbstractQueuedSynchronizer

    AQS简介 什么是AQS(抽象队列同步器) 详细的可以看源码中的类注释 acquire方法执行逻辑(独占锁的获取)...

  • AbstractQueuedSynchronizer

    1.概述 AbstractQueuedSynchronizer(抽象队列同步器,以下简称AQS)内部维护一个CLH...

  • AbstractQueuedSynchronizer

    AbstractQueuedSynchronizer 学习AQS的必要性 队列同步器AbstractQueuedS...

  • AbstractQueuedSynchronizer

    AbstractQueuedSynchronizerAbstractQueuedSynchronizer,队列同步...

网友评论

      本文标题:AbstractQueuedSynchronizer

      本文链接:https://www.haomeiwen.com/subject/gxdbxktx.html