AQS框架

作者: 何甜甜在吗 | 来源:发表于2018-04-02 21:09 被阅读0次

    AQS(AbstractQueuedSynchronized)队列同步器
    理解AQS需要知道1.队列结构,2.同步的对象
    皮皮甜觉得它是并发的基础,因为比如我们常用的ReentrantLock、CountDownLatch底层实现都是基于AQS框架的,所以深入理解AQS非常有必要
    废话不多说,进入主题。
    在分析源码之前,先说说AQS使用的设计模式:模板方法模式,什么是模板方法模式可以参考之前写的设计模式---模板方法模式
    AQS中可重写的方法:
    1.protected boolean tryAcquire(int arg)
    2.protected boolean tryRelease(int arg)
    3.protected boolean tryAcquireShared(int arg)
    4.protected boolean tryReleaseShared(int arg)
    5.protected boolean isHeldExclusively()
    其中1、2为独占式获得锁和释放锁
    3、4为共享式获得锁和释放锁
    5判断锁是否被当前线程所占用
    独占式和共享式的区别:是否允许多个线程获取锁

    AQS中的模板方法:
    1.void acquire(int arg)
    2.void acquireInterruptibly(int arg)
    3.boolean tryAcquireNanos(int arg, long nanos)
    4.void acquireShared(int arg)
    5.boolean releaseShared(int arg)
    等等,模板方法有很多,这里只举例一些,看到3、4方法就知道为什么ReentrantLock支持超时获取锁、以及可中断了

    因为是基于模板方法的,所以我们自己使用AQS框架时,会重写可重写的方法,模板方法中调用的是可重写的方法。其中在可重写方法中,我们通过调用setState(int newState)getState()compareAndSetState(int expect, int update)方法对同步状态进行修改

    还是举个例子来说明AQS框架的使用:
    TwinsLock.java

    public class TwinsLock implements Lock {
        //自定义同步器
        private static class Sync extends AbstractQueuedLongSynchronizer {
            //重写AQS中的一些方法
    
            public Sync(int count) {
                if (count <= 0) {
                    throw new IllegalArgumentException("count must large than zero.");
                }
                setState(count);  //设置同步状态,表示同时允许多少线程访问共享资源
            }
    
            //重写共享式获取锁和释放锁的方法
            @Override
            protected long tryAcquireShared(long reduceCount) {
                for (;;) {
                    int current = (int)getState();  //获得此时的同步状态
                    int newCount = (int) (current - reduceCount);
    
                    //获取同步状态失败
                    if (newCount < 0 || compareAndSetState(current, newCount)) {
                        return newCount;
                    }
                }
            }
    
            @Override
            protected boolean tryReleaseShared(long returnCount) {
                for (;;) {
                    int current = (int)getState();
                    int newCount = (int)(current + returnCount);
                    if (compareAndSetState(current, newCount));  //重新设置共享状态
                }
            }
        }
        private final Sync sync = new Sync(2);  //设置同步状态为2
        //Lock中的接口
        @Override
        public void lock() {
            sync.acquireShared(1);
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
    
        }
    
        @Override
        public boolean tryLock() {
            return false;
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }
    
        @Override
        public void unlock() {
            sync.releaseShared(1);
        }
    
        @Override
        public Condition newCondition() {
            return null;
        }
    }
    
    

    这里定义了一种锁,允许有两个线程获得锁,因为允许两个线程,所以使用共享式模式。
    一般会将继承AQS框架的类作为内部类,即这里的Sync.java
    独占式模式核心方法源码分析:
    重写了tryAcquireShared(long reduceCount)tryReleaseShared(long returnCount)这两个方法,在TwinsLock.java中,我们通过sync调用模板方法sync.acquireShared(1)sync.releaseShared(1)

    1.独占式获得锁

    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    可以看到,在模板方法acquire(int arg)里调用了可重写方法tryAcquire(arg)
    1)成功获得锁,即tryAcquire(arg)返回true
    2)获得锁失败,就要将当前线程和同步状态构造成一个节点加入队列中,同时阻塞当前线程并进行自旋判断是否可以获得锁,这几步主要是通过addWaiter(Node mode)acquireQueued(final Node node, int arg)方法实现的
    addWaiter(Node mode)方法往队列中添加节点

    private Node addWaiter(Node mode) {
            //1.将当前线程和同步状态构造成为一个节点Node
            Node node = new Node(Thread.currentThread(), mode);
            //2.获得队列的尾节点 
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                //3.cas安全的将当前节点添加到队列末尾
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            //添加失败(有其他节点抢先添加到队列末尾)或此时队列为空
            enq(node);
            return node;
        }
    

    enq(node)方法用来处理阻塞线程并发插入内置队列的情况

    private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    通过死循环保证节点被正确的添加到队列末尾,同时每次进入循环时会拿到最新的尾节点

    acquireQueued(final Node node, int arg)方法节点自旋获得同步状态

    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    通过死循环做到自旋
    如果前驱节点是头节点并且成功获得同步状态,则当前节点将变成头节点

    2.独占式释放锁

     public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    模板方法release(int arg)调用可重写方法tryRelease(arg),并且释放是从头节点开始释放的,释放通过调用LockSupport中的unparkSuccessor(h)方法实现的

    共享式模式核心方法源码分析
    1.共享式获得锁

    public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    

    如果获得锁失败,将调用doAcquireShared(arg)方法将线程和同步状态构成节点加入到队列中,tryAcquireShared(arg)为可重写的方法,如果返回值大于0则代表成功获得了锁

    private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                //进行自旋获得同步状态
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    共享式获得锁的实现思路和独占式获得锁的实现思路类似,有部分代码是公用的,不同处为1)独占式通过boolean值来判断线程是否获得同步状态,而共享式通过返回值是否大于0来判断是否获得锁 2)独占式中通过acquireQueued方法来实现自旋,而在共享式中将实现直接写在了doAcquireShared

    2.共享式释放锁

     public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    

    doReleaseShared()方法的实现

    private void doReleaseShared() {
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    

    共享式释放锁的步骤可以分为两步
    1)调用tryReleaseShared方法释放同步状态
    2)调用doReleaseShared释放头节点
    共享式中为了保证安全释放节点,通过for循环和cas来实现节点的释放

    独占式超时获取同步状态

     private boolean doAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (nanosTimeout <= 0L)
                return false;
            //1.计算deadline时间
            final long deadline = System.nanoTime() + nanosTimeout;
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                    //2.每次进入循环执行相应的操作后在计算一下剩余的时间
                    nanosTimeout = deadline - System.nanoTime();
                   //如果小于0,则超时获取锁失败 
                    if (nanosTimeout <= 0L)
                        return false;
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > spinForTimeoutThreshold)
                        LockSupport.parkNanos(this, nanosTimeout);
                    if (Thread.interrupted())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    相关文章

      网友评论

          本文标题:AQS框架

          本文链接:https://www.haomeiwen.com/subject/prgxhftx.html