美文网首页
8.AQS共享锁

8.AQS共享锁

作者: Junma_c631 | 来源:发表于2020-09-28 17:06 被阅读0次

一、Semaphore实现原理解析

1.1Semaphore实例

image.png
package executors;

import java.util.concurrent.Semaphore;

public class SemphoreTest {
    public static void main(String[] args) {
        //信号量
        Semaphore windows = new Semaphore(3);
        for(int i=0;i<5;i++){
            new Thread(()->{
                try {
                    windows.acquire();
                    System.out.println(Thread.currentThread().getName()+"--开始买票");
                    Thread.sleep(5000l);
                    System.out.println(Thread.currentThread().getName()+"--结束买票");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    windows.release();
                }
            },"买票者"+i).start();
        }

    }
}

1.2Semaphore源码解析

new Semaphore(3)创建了一个非公平的同步器,并设置同步器维护的state=3

public class Semaphore implements java.io.Serializable {
    //同步器
    private final Sync sync;
    public Semaphore(int permits) {
        //非公平同步器
        sync = new NonfairSync(permits);
    }
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
    abstract static class Sync extends AbstractQueuedSynchronizer {
       Sync(int permits) {
            setState(permits);
       }
    
    }
}

windows.acquire()分析
因为资源设置的是3也就是state=3,有三个资源可以同时被占用;前三个线程可以顺利的获取资源
每次获取资源state状态-1。


image.png

从第四个线程开始由于资源state=0 已经没有资源了所以会进行阻塞


image.png
public class Semaphore implements java.io.Serializable {
    //同步器
    private final Sync sync;
    public Semaphore(int permits) {
        //非公平同步器
        sync = new NonfairSync(permits);
    }
    public void acquire() throws InterruptedException {
        //1.调用AbstractQueuedSynchronizer.acquireSharedInterruptibly(arg)方法
        sync.acquireSharedInterruptibly(1);
    }
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
    abstract static class Sync extends AbstractQueuedSynchronizer {
      ...
      final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
    

}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
     ...
     public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0) //2.调回NonfairSync.tryAcquireShared减资源及state=state-1
            //state<0说明没有资源了,这时候需要构建等待队列,并当前阻塞线程。
            doAcquireSharedInterruptibly(arg);
    }
    
    /*
    1.构建等待队列,并入队操作
    2.设置前驱节点的waitState=-1
    3.阻塞当前节点的线程。
    */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //入等待队列Node.SHARED=new Node()---用于建立初始头或共享标记
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                //获取前驱节点
                final Node p = node.predecessor();
                //如果前驱节点是head节点,尝试获取锁
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //设置头结点并传播
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&//设置前驱节点为-1,如果前驱节点已经是-1了返回true
                    parkAndCheckInterrupt())//调用LockSupport.park(this);阻塞当前线程
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    private Node addWaiter(Node mode) {
        /*
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
        */
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //构建队列
        enq(node);
        return node;
    }
    /*
    设置前驱节点为-1,如果前驱节点已经是-1了返回true
    */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //设置前驱节-1
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
}

semaphore.release();

public class Semaphore implements java.io.Serializable {
    //同步器
    private final Sync sync;
    public Semaphore(int permits) {
        //非公平同步器
        sync = new NonfairSync(permits);
    }
    public void release() {
        sync.releaseShared(1);
    }
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }
    abstract static class Sync extends AbstractQueuedSynchronizer {
      ...
      protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
      }
}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
     ...
     public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {//设置state状态state=state+1
            doReleaseShared();//
            return true;
        }
        return false;
    }
    /*
    
    */
    private void doReleaseShared() {
      
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);//唤醒头结点的第二个节点
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
   
    private void unparkSuccessor(Node node) {
       
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
            
        Node s = node.next;//头结点的第二个节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //唤醒头节的第二个节点
        if (s != null)
            LockSupport.unpark(s.thread);
    }

}

被唤醒后出队逻辑

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
     ...
    /*
    线程唤醒后出队
    */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
       
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                 
                final Node p = node.predecessor();
                if (p == head) {
                    //这里如果获取锁成功
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        //设置head为当前node节点,并释放原有的head节点
                        //设置后,从tail开始循环遍历所有节点唤醒队列中的线程
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())//线程在这里被唤醒,唤醒后再次进入循环体
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
    //释放原有head节点,让当前节点设置为head
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
   
}

相关文章

网友评论

      本文标题:8.AQS共享锁

      本文链接:https://www.haomeiwen.com/subject/bgcduktx.html