美文网首页
Semaphore信号量源码解析

Semaphore信号量源码解析

作者: 白龙6 | 来源:发表于2019-11-11 15:51 被阅读0次

    简介

    Semaphore(信号量)是juc包下的一个工具类,主要是用来控制同时访问公共资源的线程数,这个数量在Semaphore中叫做凭证数(acquires),内部是采用AbstractQueuedSynchronizer去实现的,AbstractQueuedSynchronizer是一个同步队列,juc包下有许多工具类都依赖于该类去实现功能,例如ReentrantReadWriteLock、CountDownLatch、Semaphore等,那么我们接下来就具体看下Semaphore的具体使用方法。

    案例

    首先我们设定一个背景,假如现在我们去银行取钱,银行的窗口有四个,然后现在有很多人都在等着办理业务,但是窗口只有四个,所以同时能进行业务办理的只有四个,每办理完一个释放一个可用窗口然后才能进行下一个的业务办理,这个场景就可以用Semaphore来实现。

    public class SemaphoreTest {
    
        private static final int MAX_COUNT = 4;
    
        public static void main(String[] args) throws InterruptedException {
    
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            Semaphore semaphore = new Semaphore(MAX_COUNT, true);
    
            int personNumber = 20;
    
            for (int i = 0; i < personNumber; i++) {
                executorService.execute(() -> {
                    try {
                        //尝试获取资源
                        semaphore.acquire(1);
    
                        //....办理业务...//
                        LockSupport.parkNanos(1000 * 1000 * 1000);
    
                        //释放资源
                        semaphore.release(1);
                        System.out.println("办理完毕");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
        }
    }
    

    源码分析(acquire方法)

    接下来我们具体分享下Semaphore的原理,Semaphore主要就是两个方法一个是获取凭证一个是释放凭证资源,我们先来分析下acquire方法。

        public void acquire(int permits) throws InterruptedException {
            if (permits < 0) throw new IllegalArgumentException();
            sync.acquireSharedInterruptibly(permits);
        }
    
        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (tryAcquireShared(arg) < 0)//尝试获取许可
                doAcquireSharedInterruptibly(arg);//获取许可失败进入该方法
        }
    

    这里我们可以看出acquire方法内部调用的是sync类的acquireSharedInterruptibly方法,而acquireSharedInterruptibly方法中存在两个方法的调用,首先调用的是tryAcquireShared方法,该方法功能是尝试获取凭证,如果方法返回值小于0则进行acquireSharedInterruptibly方法。

        protected int tryAcquireShared(int acquires) {
                for (;;) {
                    if (hasQueuedPredecessors())
                        return -1;
                    int available = getState();//获取当前可用的凭证数,state是由validate修饰的,保证变量的可见性。
                    int remaining = available - acquires;//计算出剩余凭证数量
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))//凭证数小于零或者是cas替换凭证数成功则直接结束该方法。
                        return remaining;
                }
            }
    

    这里我看到tryAcquireShared方法,内部是采用自旋重试方式进行锁获取的,单次循环的逻辑可以看上述代码,先是调用hasQueuedPredecessors方法(这个方法的功能是查看当前是否存在排队的线程,如果存在排队的线程且不是当前线程则返回true,当前循环直接终止并返回-1),这个方法只存在公平模式中,非公平模式没有这段判断逻辑。
    然后再获取state值也就是可用凭证数,判断其state是否可用,可用进行cas原子替换操作,将计算后的可用凭证数值赋值给state,执行完毕返回remaining值。

        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            final Node node = addWaiter(Node.SHARED);//创建SHARED类型node节点
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();//获取当前节点的上一个节点
                    if (p == head) {//如果上一个节点为head,进入if业务逻辑
                        int r = tryAcquireShared(arg);//尝试获取凭证
                        if (r >= 0) {//获取成功,并且剩余凭证数大于零进入if业务逻辑
                            setHeadAndPropagate(node, r);//通知并且重置head节点
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    当tryAcquireShared方法获取凭证数失败后,则会进入上述方法doAcquireSharedInterruptibly中,首先为当前线程创建一个node等待节点,并且添加至aqs队列中,如果当前node节点的上一个节点非head,且上一个节点也是等待状态,则当前线程执行LockSupport.park进行长时间等待直到有可用凭证数为止,如果上一个节点是head节点则进行再一次的尝试获取凭证数,如果获取成功则直接终止循环并且将node节点状态置为已取消,如果获取失败则进行再次循环执行相同的业务逻辑。

    这里核心注意下两个方法setHeadAndPropagate、shouldParkAfterFailedAcquire。

    private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            setHead(node);
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                return true;
            if (ws > 0) {
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    

    setHeadAndPropagate这个方法只会存在Share模式下,如果是独占则不会存在该方法,该方法的作用就是当可用凭证数大于0的时候进行往后唤醒等待的共享节点。
    shouldParkAfterFailedAcquire该方法是判断当前节点是否应该被挂起,如果前驱节点为signal则直接挂起当前线程,如果前驱节点为cancelled则一直往上查找直至找到状态不为cancelled为止并且将前置节点的状态cas替换为signal。

    waitStatus 5 个状态值的含义:

    cancelled(1):该节点的线程可能由于超时或被中断而处于被取消状态,一旦处于这个状态,节点状态将一直处于cancelled,因此应该从队列中移除。
    signal(-1):当前节点为 signal 时,后继节点会被挂起,因此在当前节点释放锁或被取消之后必须被唤醒(unparking)其后继结点。
    condition(-2):该节点的线程处于等待条件状态,不会被当作是同步队列上的节点,直到被唤醒(signal),设置其值为 0,重新进入阻塞状态。
    propagate(-3) :下一个 acquireShared 无条件传播。
    0:初始化状态。

    源码分析(release方法)

    acquire分析完之后我们再看下release方法。

    public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    

    releaseShared方法是属于sync的内部方法,由semaphore.release进行调用,方法内部先调用tryReleaseShared方法,如果成功则调用doReleaseShared方法,我们先看下releaseShared方法内部的tryReleaseShared方法。

    protected final boolean tryReleaseShared(int releases) {
                for (;;) {
                    int current = getState();
                    int next = current + releases;
                    if (next < current) // overflow
                        throw new Error("Maximum permit count exceeded");
                    if (compareAndSetState(current, next))
                        return true;
                }
            }
    

    tryReleaseShared方法功能很简单,该方法就是将state加上释放的releases值然后再cas赋值给state
    变量。

     private void doReleaseShared() {
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    
    private void unparkSuccessor(Node node) {
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    doReleaseShared该方法主要功能就是尝试唤醒head的后继节点,如果后继节点存在且是signal状态则进行唤醒,这里需要关注的是unparkSuccessor方法,unparkSuccessor方法内部对head节点的后继节点进行了二次判断,如果后继节点不存在或是waitStatus为取消状态则从队列尾部开始向前查找知道节点状态不是取消为止,然后进行唤醒。

    总结

    至此我们已经将Semaphore的acquire方法以及release方法的源码简单的进行了分析,Semaphore可以用来控制访问公共资源的线程数,其内部是基于aqs实现的,aqs内部维护的state变量表示当前还可获取许可证的资线程数,感兴趣的同学可以去深入看下其他的方法,例如带超时时间的tryAcquire方法。

    相关文章

      网友评论

          本文标题:Semaphore信号量源码解析

          本文链接:https://www.haomeiwen.com/subject/eteibctx.html