美文网首页
java并发编程基础 —AQS框架(基于1.8源码分析)

java并发编程基础 —AQS框架(基于1.8源码分析)

作者: Gxgeek | 来源:发表于2017-12-18 17:56 被阅读0次
AQS其实就是java.util.concurrent.locks.AbstractQueuedSynchronizer这个类

AQS框架都是基于父类AbstractQueuedSynchronizer实现的,

  • 它的实现类包括ReentrantLock(独占锁),
  • ReentrantReadAndWriteLock(独占锁与共享锁),
  • Condition(看上面表的对比就知道作用类似于Object的wait和notify),
  • 还有CountDownLatch类,
  • 以及java的线程池ThreadPoolExecuter的内部类Worker;

AQS简核心是通过一个共享变量来同步状态,变量的状态由子类去维护,而AQS框架做的是:

  • 线程阻塞队列的维护
  • 线程阻塞和唤醒

共享变量的修改都是通过Unsafe类提供的CAS操作完成的。AbstractQueuedSynchronizer类的主要方法是acquire和release,典型的模板方法,
下面这4个方法由子类去实现:

//尝试获取独占锁,锁竞争时不一定能获取成功,成功则返回true,否则返回false
protected boolean tryAcquire(int arg)
//尝试释放独占锁,锁竞争时不一定能释放成功,成功则返回true,否则返回false
protected boolean tryRelease(int arg)
//尝试获取共享锁,锁竞争时不一定能获取成功,成功则返回true,否则返回false
protected int tryAcquireShared(int arg)
//尝试释放共享锁,锁竞争时不一定能释放成功,成功则返回true,否则返回false
protected boolean tryReleaseShared(int arg)

2.类的内部类

AbstractQueuedSynchronizer类有两个内部类,分别为Node类与ConditionObject类。下面分别做介绍。

1.Node类

static final class Node {
        // 模式,分为共享与独占
        // 共享模式
        static final Node SHARED = new Node();
        // 独占模式
        static final Node EXCLUSIVE = null;        
        // 结点状态
        // CANCELLED,值为1,表示当前的线程被取消
        // SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark
        // CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
        // PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行
        // 值为0,表示当前节点在sync队列中,等待着获取锁
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;        

        // 结点状态
        volatile int waitStatus;        
        // 前驱结点
        volatile Node prev;    
        // 后继结点
        volatile Node next;        
        // 结点所对应的线程
        volatile Thread thread;        
        // 下一个等待者
        Node nextWaiter;
        
        // 结点是否在共享模式下等待
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
        
        // 获取前驱结点,若前驱结点为空,抛出异常
        final Node predecessor() throws NullPointerException {
            // 保存前驱结点
            Node p = prev; 
            if (p == null) // 前驱结点为空,抛出异常
                throw new NullPointerException();
            else // 前驱结点不为空,返回
                return p;
        }
        
        // 无参构造函数
        Node() {    // Used to establish initial head or SHARED marker
        }
        
        // 构造函数
         Node(Thread thread, Node mode) {    // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
        
        // 构造函数
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

从Node结构prev和next节点可以看出它是一个双向链表,waitStatus存储了当前线程的状态信息
waitStatus

  • CANCELLED,值为1,表示当前的线程被取消;
  • SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
  • CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
  • PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
    值为0,表示当前节点在sync队列中,等待着获取锁。
  1. ConditionObject类

3.类的核心方法

下面我们通过以下五个方面来介绍AQS是怎么实现的锁的获取和释放的

  • 独占式获得锁
  • 独占式释放锁
  • 共享式获得锁
  • 共享式释放锁
  • 独占超时获得锁
  1. acquire函数
    该函数以独占模式获取(资源),忽略中断,即线程在aquire过程中,中断此线程是无效的。源码如下
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
           selfInterrupt();
}

<img src="http://ovg6pv08q.bkt.clouddn.com/%E5%B1%8F%E5%B9%95%E5%BF%AB%E7%85%A7%202017-12-13%20%E4%B8%8B%E5%8D%884.05.44.png" width = "300" height = "290" alt="方法流程图" align=center />

首先执行tryAcquire方法,尝试获得锁。
如果获取失败则进入addWaiter方法,
构造同步节点(独占式Node.EXCLUSIVE),
将该节点添加到同步队列尾部,并返回此节点,
进入acquireQueued方法。
acquireQueued方法,这个新节点死是循环的方式获取同步状态,如果获取不到则阻塞节点中的线程,阻塞后的节点等待前驱节点来唤醒或阻塞线程被中断。

addWaiter方法代码如下:
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        //尾节点
        Node pred = tail;
        如果尾节点不为空   将node 节点设置成为尾节点
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //如果更新失败(存在并发竞争更新),则进入enq方法进行添加
        enq(node);
        return node;
    }

//enq方法代码如下:
private Node enq(final Node node) {

        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                //如果队列为空,则通过CAS把当前Node设置成头节点
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                //如果队列不为空,则向队列尾部添加Node
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
 //   该方法使用CAS自旋的方式来保证向队列中添加Node(同步节点简写Node)   
//  如果队列为空,则把当前Node设置成头节点  
// 如果队列不为空,则向队列尾部添加Node

acquireQueued 代码:
  • 通过tryAcquire()和addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了。
  • 进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了。
  • 在等待队列中排队拿号(中间没其它事干可以休息),直到拿到号后再返回
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
            
                //找到当前节点的前驱节点
                final Node p = node.predecessor();
 9             //如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。
                if (p == head && tryAcquire(arg)) {
                //如果p节点是头节点且tryAcquire方法返回true。那么将当前节点设置为头节点。
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;//返回等待过程中是否被中断过
                }
                
                //意味着 不是 老二,自己可以休息了,就进入waiting状态,直到被unpark()
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;//设置被中断过
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    //  此方法主要用于检查状态,看看自己是否真的可以去休息了(进入waiting状态,

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
             /*
              * 如果前驱放弃了,那就一直往前找,直到找到最近一个正常 等待的状态,并排在它的后边。
              * 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC回收)!
              */

            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
        //如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!

            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
//如果线程找好安全休息点后,那就可以安心去休息了。此方法就是让线程去休息,真正进入等待状态。
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);//调用park()使线程进入waiting状态
        return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的。

    }

整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能安心去休息,需要去找个安心的休息点,同时可以再尝试下看有没有机会轮到自己拿号。

acquireQueued()方法总结

  • 结点进入队尾后,检查状态,找到安全休息点;
  • 调用park()进入waiting状态,等待unpark()或interrupt()唤醒自己;
  • 被唤醒后,看自己是不是有资格能拿到号。如果拿到,head指向当前结点,并返回从入队到拿到号的整个过程中是否被中断过;如果没拿到,继续流程1。

acquire() 方法总结

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

  • 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
  • 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  • acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  • 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
方法流程图

release(int)

acquire()的反操作 release(),此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。

    public final boolean release(int arg) {
        // tryReease由子类实现,通过设置state值来达到同步的效果。
        if (tryRelease(arg)) {
            Node h = head;
         // waitStatus为0说明是初始化的空队列
            if (h != null && h.waitStatus != 0)
            // 唤醒后续的结点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    
    //  此方法用于唤醒等待队列中下一个线程
    private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    //这里,node一般为当前线程所在的结点。
    int ws = node.waitStatus;
    if (ws < 0)//置零当前线程所在的结点状态,允许失败。
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;//找到下一个需要唤醒的结点s
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//唤醒

}

总结:用unpark()唤醒等待队列中最前边的那个未放弃线程

acquireShared(int)

doAcquireShared(int) ->  此方法用于将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。下面是doAcquireShared()的源码:
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);//加入队列尾部
        boolean failed = true;//是否成功标志

        try {
            boolean interrupted = false;//等待过程中是否被中断过的标志

            for (;;) {
                final Node p = node.predecessor();//前节点
                if (p == head) {//如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的
                    int r = tryAcquireShared(arg);//尝试获取资源

                    if (r >= 0) {//成功
                        setHeadAndPropagate(node, r);//将head指向自己,还有剩余资源可以再唤醒之后的线程
                        p.next = null; // help GC
                        if (interrupted)//如果等待过程中被打断过,此时将中断补上。
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
         //判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

这里tryAcquireShared()依然需要自定义同步器去实现。但是AQS已经把其返回值的语义定义好了:
负值代表获取失败;0代表获取成功,但没有剩余资源;
正数表示获取成功,还有剩余资源,其他线程还可以去获取。

所以这里acquireShared()的流程就是:
  • tryAcquireShared()尝试获取资源,成功则直接返回;
  • 失败则通过doAcquireShared()进入等待队列,直到获取到资源为止才返回。
setHeadAndPropagate(Node, int)
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; 
    setHead(node);//head指向自己
     //如果还有剩余量,继续唤醒下一个邻居线程
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}
此方法在setHead()的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还会去唤醒后继结点,毕竟是共享模式!

releaseShared()

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {//尝试释放资源
            doReleaseShared();//唤醒后继结点
            return true;
        }
        return false;
    }

此方法 --> 释放掉资源后,唤醒后继。

    private void doReleaseShared() {

        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);//唤醒后继
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)  // head发生变化

                break;
        }
    }

实例:

Mutex(互斥锁)

public class Mutex implements Lock {
    // 静态内部类,自定义同步器
    private static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -4387327721959839431L;

        // 是否处于占用状态
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // 当状态为0的时候获取锁
        @Override
        public boolean tryAcquire(int acquires) {
            assert acquires == 1;// 这里限定只能为1个量
            if (compareAndSetState(0, 1)) {//state为0才设置为1,不可重入!
                setExclusiveOwnerThread(Thread.currentThread());//设置为当前线程独占资源
                return true;
            }
            return false;
        }

        // 释放锁,将状态设置为0
       // 尝试释放资源,立即返回。成功则为true,否则false。
        @Override
        protected boolean tryRelease(int releases) {
            assert releases == 1; // Otherwise unused
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // 返回一个Condition,每个condition都包含了一个condition队列
        Condition newCondition() {
            return new ConditionObject();
        }
    }

    // 仅需要将操作代理到Sync上即可
    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
}

相关文章

网友评论

      本文标题:java并发编程基础 —AQS框架(基于1.8源码分析)

      本文链接:https://www.haomeiwen.com/subject/inimwxtx.html