美文网首页
AbstractQueuedSynchronizer分析

AbstractQueuedSynchronizer分析

作者: YocnZhao | 来源:发表于2019-03-06 17:55 被阅读0次

    相信大家看过AQS的代码的话大家对它应该会有基本的认识。它保存了两个Node,head跟tail。先看Node,Node是个双向链表,存储了pre跟next,两个有参构造方法存储Thread。

    static final class Node {
            static final Node SHARED = new Node();
            static final Node EXCLUSIVE = null;
    
            static final int CANCELLED = 1;
            static final int SIGNAL = -1;
            static final int CONDITION = -2;
            static final int PROPAGATE = -3;
    
            volatile int waitStatus;
            volatile Node prev;
            volatile Node next;
            volatile Thread thread;
            Node nextWaiter;
    
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    
            //前任
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            Node() {    // Used to establish initial head or SHARED marker
            }
    
            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }
    

    如果硬看AQS的话就很晦涩了,我们可以先写一个最简单的AQS的例子,允许线程获取锁,释放锁。

    public class Mutex implements Lock {
        private Sync sync = new Sync();
    
        class Sync extends AbstractQueuedSynchronizer {
            public boolean tryAcquire(int acquire) {
                assert acquire == 1;
                if (compareAndSetState(0, 1)) {//如果是0 -> 变成1
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            protected boolean tryRelease(int release) {
                assert release == 1;
                if (getState() == 0) throw new IllegalMonitorStateException();
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            Condition getNewCondition() {
                return new ConditionObject();
            }
        }
    
        @Override
        public void lock() {
            sync.acquire(1);
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
    
        @Override
        public boolean tryLock() {
            return sync.tryAcquire(1);
        }
    
        @Override
        public boolean tryLock(long time, @NotNull TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(time));
        }
    
        @Override
        public void unlock() {
            sync.release(1);
        }
    
        public int getQueueLength() {
            return sync.getQueueLength();
        }
    
        @NotNull
        @Override
        public Condition newCondition() {
            return sync.getNewCondition();
        }
    }
    

    我们用这个自定义的锁来对AQS初窥门径,Mutex就相当于synchronized包裹下的notify跟wait,AQS其实看来就相当于object 的锁池。可以看出来Mutex这个类的所有操作其实都是用Sync来做的操作。相当于是AQS的代理类。
    直接看lock

    sync.acquire(1);
    

    acquire Sync并没有重写acquire,我们看AQS的acquire

    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    我们看Sync实现了tryAcquire,看上面Mutex的代码 CAS 尝试获取锁,如果能获取到就把当前线程设置为当前线程,否则返回false。
    我们继续回去看acquire,如果没有获取到线程的话会继续走acquireQueued(addWaiter(Node.EXCLUSIVE), arg),先看addWaiter

    private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure 快速设置尾节点
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;//新节点的pre设置为当前的tail节点
                if (compareAndSetTail(pred, node)) {//尝试设置到tail的尾部
                    pred.next = node;//成功的话把当前tail的next设置尾新节点,完成尾节点的add
                    return node;
                }
            }
            enq(node);
            return node;
        }
    

    看相关代码:首先是

    private final boolean compareAndSetTail(Node expect, Node update) {
            return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
        }
    

    如果tailOffset位置的Node等于expect,就把update改为tail,这个操作是原子的,如果两个线程进来只有一个会成功,第二个比较的时候tailOffest就不是expect了。然后就是enq

    private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    循环把node放到尾部,返回尾部之前的node,也就是predecessor。

    相关文章

      网友评论

          本文标题:AbstractQueuedSynchronizer分析

          本文链接:https://www.haomeiwen.com/subject/mpynuqtx.html