Semaphore

作者: 奔向学霸的路上 | 来源:发表于2020-06-26 16:06 被阅读0次

    Semaphore概念

    semaphore,又名信号量,类似于“许可证”的概念,它实际上是维护了一些“许可证”,用来控制同时允许共享资源的最大线程数。比如疫情期间的图书馆,每天只允许一定数量的人进入,其他人来了之后就需要等待。

    应用场景

    Semaphore用作流量控制,特别是资源有限的情况下。常用来举例的一个场景是说,数据库的连接,比如我们要读取的数据量比较大,启动几十个线程并发读取,但是由于数据库连接数只有10个,所以这个时候需要Semaphore来控制最多10个线程来请求数据库。

    代码示例

    public class SemaphoreTest {
    
        private static final int THREAD_COUNT = 30;
    
        private static ExecutorService threadPool = Executors
                .newFixedThreadPool(THREAD_COUNT);
    
        private static Semaphore s = new Semaphore(10);
    
        public static void main(String[] args) {
            for (int i = 0; i < THREAD_COUNT; i++) {
                threadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            s.acquire();
                            System.out.println("save data");
                            s.release();
                        } catch (InterruptedException e) {
                        }
                    }
                });
            }
    
            threadPool.shutdown();
        }
    }
    

    虽然有30个线程在执行,但是只允许10个并发的执行。Semaphore的构造方法Semaphore(int permits) 接受一个整型的数字,表示可用的许可证数量。Semaphore(10)表示允许10个线程获取许可证,也就是最大并发数是10。Semaphore的用法也很简单,首先线程使用Semaphore的acquire()获取一个许可证,使用完之后调用release()归还许可证。还可以用tryAcquire()方法尝试获取许可证。

    Semaphore还提供一些其他方法

    int availablePermits() :返回此信号量中当前可用的许可证数。
    int getQueueLength():返回正在等待获取许可证的线程数。
    boolean hasQueuedThreads() :是否有线程正在等待获取许可证。
    void reducePermits(int reduction) :减少reduction个许可证。是个protected方法。
    Collection getQueuedThreads() :返回所有等待获取许可证的线程集合。是个protected方法。

    Semaphore原理

    Semaphore底层也是基于AQS分别实现了公平与非公平策略,需注意semaphore的锁是共享锁。

    Semaphore源码分析

    semaphore的两种构造函数

    获取锁

    1. 接受一个许可数量的构造,默认是非公平
    /**
         * Creates a {@code Semaphore} with the given number of
         * permits and nonfair fairness setting.
         *
         * @param permits the initial number of permits available.
         *        This value may be negative, in which case releases
         *        must occur before any acquires will be granted.
         */
        public Semaphore(int permits) {
            sync = new NonfairSync(permits);
        }
    
    1. 两个参数,其中一个是“许可证”数量,true代表公平策略
    /**
         * Creates a {@code Semaphore} with the given number of
         * permits and the given fairness setting.
         *
         * @param permits the initial number of permits available.
         *        This value may be negative, in which case releases
         *        must occur before any acquires will be granted.
         * @param fair {@code true} if this semaphore will guarantee
         *        first-in first-out granting of permits under contention,
         *        else {@code false}
         */
        public Semaphore(int permits, boolean fair) {
            sync = fair ? new FairSync(permits) : new NonfairSync(permits);
        }
    

    构造一个公平策略Semaphore,state设置为允许的最大许可量

            //FairSync类
            FairSync(int permits) {
                super(permits);
            }
            //Sync类(FairSync类的父类)
            Sync(int permits) {
                setState(permits);
            }
    
    Semaphore公平策略源码解读

    当线程调用acquire()方法时,入参"1"表示尝试获取1个许可

    /**
         * Acquires a permit from this semaphore, blocking until one is
         * available, or the thread is {@linkplain Thread#interrupt interrupted}.
         *
         * <p>Acquires a permit, if one is available and returns immediately,
         * reducing the number of available permits by one.
         *
         * <p>If no permit is available then the current thread becomes
         * disabled for thread scheduling purposes and lies dormant until
         * one of two things happens:
         * <ul>
         * <li>Some other thread invokes the {@link #release} method for this
         * semaphore and the current thread is next to be assigned a permit; or
         * <li>Some other thread {@linkplain Thread#interrupt interrupts}
         * the current thread.
         * </ul>
         *
         * <p>If the current thread:
         * <ul>
         * <li>has its interrupted status set on entry to this method; or
         * <li>is {@linkplain Thread#interrupt interrupted} while waiting
         * for a permit,
         * </ul>
         * then {@link InterruptedException} is thrown and the current thread's
         * interrupted status is cleared.
         *
         * @throws InterruptedException if the current thread is interrupted
         */
        public void acquire() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    

    acquireSharedInterruptibly,线程中断,抛中断异常;尝试获取锁小于0,代表获取失败,加入等待队列

    /**
         * Acquires in shared mode, aborting if interrupted.  Implemented
         * by first checking interrupt status, then invoking at least once
         * {@link #tryAcquireShared}, returning on success.  Otherwise the
         * thread is queued, possibly repeatedly blocking and unblocking,
         * invoking {@link #tryAcquireShared} until success or the thread
         * is interrupted.
         * @param arg the acquire argument.
         * This value is conveyed to {@link #tryAcquireShared} but is
         * otherwise uninterpreted and can represent anything
         * you like.
         * @throws InterruptedException if the current thread is interrupted
         */
        public final void acquireSharedInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //获取失败,加入等待队列
            if (tryAcquireShared(arg) < 0)
                doAcquireSharedInterruptibly(arg);
        }
    

    tryAcquireShared尝试获取锁,先判断队列中是否存在比当前线程等待时间长的线程;available获取可用的许可证数;remaining申请acquires数量后,剩余的可用许可证数;

    protected int tryAcquireShared(int acquires) {
                for (;;) {
                    if (hasQueuedPredecessors())//先判断队列中是否存在比当前线程等待时间长的线程
                        return -1;
                    int available = getState();//获取可用的许可证数
                    int remaining = available - acquires;//申请acquires数量后,剩余的可用许可证数
                    //两种情况,1,是可用许可证没有了,那么返回剩余许可证数量;2,是许可证还有,CAS尝试更新成功后,返回剩余许可证数
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
    

    当tryAcquireShared(arg) < 0 获取锁失败,加入AQS等待队列,执行doAcquireSharedInterruptibly(arg)方法。

    /**
         * Acquires in shared interruptible mode.
         * @param arg the acquire argument
         */
        private void doAcquireSharedInterruptibly(int arg)
            throws InterruptedException {
            //添加一个节点到队列尾,这块的逻辑可以参考AbstractQueuedSynchronizer里的介绍
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();//找当前节点的上一个节点
                    if (p == head) {//如果上一个节点正好是头节点
                        int r = tryAcquireShared(arg);//获取共享锁
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);//将node节点设置为head节点,r>0说明还有机会获取到锁,唤醒后面的先从,称之为传播
                            p.next = null; // help GC
                            failed = false;
                            return;
                        }
                    }
                    //如果不是头节点,就不能获取
                    //对节点状态进行检查并更新状态,如果线程应该阻塞,返回true
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())//中断阻塞,并返回当前线程是否阻塞boolean值
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    //取消获取
                    cancelAcquire(node);
            }
        }
    

    释放锁
    release()方法用于锁的释放

    /**
         * Releases a permit, returning it to the semaphore.
         *
         * <p>Releases a permit, increasing the number of available permits by
         * one.  If any threads are trying to acquire a permit, then one is
         * selected and given the permit that was just released.  That thread
         * is (re)enabled for thread scheduling purposes.
         *
         * <p>There is no requirement that a thread that releases a permit must
         * have acquired that permit by calling {@link #acquire}.
         * Correct usage of a semaphore is established by programming convention
         * in the application.
         */
        public void release() {
            sync.releaseShared(1);
        }
    

    releaseShared方法,如果锁释放成功,唤醒AQS等待队列中的head节点

    /**
         * Releases in shared mode.  Implemented by unblocking one or more
         * threads if {@link #tryReleaseShared} returns true.
         *
         * @param arg the release argument.  This value is conveyed to
         *        {@link #tryReleaseShared} but is otherwise uninterpreted
         *        and can represent anything you like.
         * @return the value returned from {@link #tryReleaseShared}
         */
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    

    tryReleaseShared方法其实是对state做加法运算

        protected final boolean tryReleaseShared(int releases) {
                for (;;) {
                    int current = getState();
                    int next = current + releases;//加上释放的
                    if (next < current) // overflow
                        throw new Error("Maximum permit count exceeded");
                    if (compareAndSetState(current, next))
                        return true;
                }
        }
    

    doReleaseShared方法唤醒后续线程节点可以来争取信号量

      /**
         * Release action for shared mode -- signals successor and ensures
         * propagation. (Note: For exclusive mode, release just amounts
         * to calling unparkSuccessor of head if it needs signal.)
         */
        private void doReleaseShared() {
            /*
             * Ensure that a release propagates, even if there are other
             * in-progress acquires/releases.  This proceeds in the usual
             * way of trying to unparkSuccessor of head if it needs
             * signal. But if it does not, status is set to PROPAGATE to
             * ensure that upon release, propagation continues.
             * Additionally, we must loop in case a new node is added
             * while we are doing this. Also, unlike other uses of
             * unparkSuccessor, we need to know if CAS to reset status
             * fails, if so rechecking.
             */
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    //获取head的状态
                    int ws = h.waitStatus;
                    ///头节点线程状态为SIGNAL唤醒后续线程节点
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        //唤醒下一个节点
                        unparkSuccessor(h);
                    }
                    //成功设置成 0 之后,将 head 状态设置成传播状态
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    
    Semaphore非公平策略源码解读

    与公平获取的区别是,无需判断队列,其他部分大致相同

            protected int tryAcquireShared(int acquires) {
                return nonfairTryAcquireShared(acquires);
            }
    
    final int nonfairTryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
    

    相关文章

      网友评论

          本文标题:Semaphore

          本文链接:https://www.haomeiwen.com/subject/sxdbxktx.html