美文网首页Java面试程序员工作生活
多线程之Semaphore (限流Java 版)

多线程之Semaphore (限流Java 版)

作者: Unyielding_L | 来源:发表于2019-10-13 17:47 被阅读0次

    概念

    计数信号量。从概念上讲,信号量维护一组许可证。

    举一个例子

    某银行分店只有三个窗口,所以同一时间最多只有三个人办理业务,其它人只能等待。可以把办理业务的人比作成线程,三个窗口就相当于三个许可证。此时来了4个人,先到的三个领到人许可证然后办理业务,第四个人呢只有等待,等待其中一个先办好业务释放许可证之后,然后再办理业务。

    简单的用法

    调用aquire方法是阻塞的直到有一个许可可用然后返回。每次调用release方法就会增加一个许可,隐式地释放一个阻塞获取者(调用aquire方法阻塞的线程)。当然没有所谓实际的许可对象Semaphore仅仅是维护了一个数字而已,然后执行相应的减加操作而已。

    一个demo?

    public class SemaphoreDemo {
    
        public static final int THREAD_SIZE = 10;
    
        public static void runSomething() throws InterruptedException {
            //模拟处理什么事
            Thread.sleep(1000);
            System.out.println(String.format("current threadId is %d,current time is %d",
                    Thread.currentThread().getId(), System.currentTimeMillis() / 1000));
        }
    
        public static void main(String[] args) throws InterruptedException {
            //创建一个包含4个许可证的信号量实例
            Semaphore semaphoreDemo = new Semaphore(4);
            for (int i = 0; i < THREAD_SIZE; i++) {
                //获取许可
                Thread demoThread = new Thread(() -> {
                    try {
                        //获取许可
                        semaphoreDemo.acquire();
                        //操作资源
                        runSomething();
                    } catch (InterruptedException e) {
                        //抛出InterruptedException 会将该线程的中断标志设置为false
                        Thread.currentThread().interrupt();
                    } finally {
                        semaphoreDemo.release();
                    }
                });
                //开启demo线程
                demoThread.start();
    
            }
    
        }
    }
    

    👆程序大家有认真看嘛,给大家一分钟的时间看下。。。。。。大家发现有没有啥问题?考虑一下?
    👆程序在调用aquire方法时会阻塞住,如果此时该线程被中断了finally 还执行release 方法.So,we can optimize it !
    Semaphore 类提供了一个好用的方法
    tryAcquire 方法,调用该方法在调用的时间如果有许可的话则会获取到许可并返回true,但是如果当前没有许可的话则会立即返回false
    整改之后的代码如下:

    /**
     * @author 梁自强
     * @date 2019.09.20
     */
    public class SemaphoreAdvancedDemo {
    
        public static void main(String[] args) {
            //新建一个拥有4个许可的信号量
            Semaphore semaphoreAdvance = new Semaphore(4);
            for (int i = 0; i < THREAD_SIZE; i++) {
                Thread demoThread = new Thread(() -> {
                    boolean isAcquire = false;
                    try {
                        //tryAcquire 如果 没有许可会立即返回false,否则会通过CAS 去修改被volatile修饰的许可总数即state
                        while (!(isAcquire = semaphoreAdvance.tryAcquire())) {
                            Thread.sleep(100);
                        }
                        runSomething();
                    } catch (InterruptedException e) {
                        System.out.println(String.format("threadId:%s interrupt", Thread.currentThread().getId()));
                        Thread.currentThread().interrupt();
                    } finally {
                        if (isAcquire) {
                            semaphoreAdvance.release();
                            System.out.println(String.format("current threadId:%s released a permit", Thread.currentThread().getId()));
                        }
                    }
                });
                demoThread.start();
                //随机调用interrupt 方法模仿实际被中断
                if (ThreadLocalRandom.current().nextInt(THREAD_SIZE) > THREAD_SIZE / 2) {
                    demoThread.interrupt();
                }
            }
        }
    }
    

    上面有个while 循环感觉很不爽有木有?我们再来优化一下?
    下面在看一个神奇的方法,再也不害怕中断了
    acquireUninterruptibly() :从信号量实例获取许可,直到有一个许可可用否则一直阻塞。若阻塞中的线程被调用了interrupt方法,该线程会一直等待,当获取到许可返回时中断状态会被设置为true
    测试方法:

     public static void main(String[] ar) throws InterruptedException {
            
            /**
             *
             * {@link Semaphore#acquireUninterruptibly} 方法 获取许可,如有许可则返回,
             * 若么有阻塞,在阻塞的过程中线程调用中断方法也不会影响线程的等待获取许可,但是在返回时该线程的中断状态会被设置为true
             * 测试步骤:
             * 1.创建一个拥有一个许可的信号量实例
             * 2.在主线程中acquire一个许可
             * 3.创建一个线程a去获取许可
             * 4.调用a.interrupt方法
             * 5.主线程释放许可
             */
            Semaphore semaphore = new Semaphore(1);
            semaphore.acquire();
            Thread testThread = new Thread(()->{
                semaphore.acquireUninterruptibly();
                //测试线程是否是中断状态
                if (Thread.currentThread().isInterrupted()) {
                    System.out.println("pass");
                }else {
                    System.out.println("get error");
                }
            });
            //启动测试线程
            testThread.start();
            //中断测试线程
            testThread.interrupt();
            //释放许可
            semaphore.release();
        }
    结果:
    pass
    

    下面来使用下这个 acquireUninterruptibly 方法来改造一下我们的增加版demo,下面我们来看下,做出一些调整使用了策略模式修改了我们上面的类,使其易于扩展,因为需要传到Thread里执行,这里我就偷个懒,直接使用java 的Runnable 为策略基类,实现了两个子类实现run 接口,类图关系如下:

    Semaphore 使用策略图
    并且在方法调用上还特别设计了一下,在调用的时候 传的是Supplier 实现类对象,相当于函数式调用大家不仅学习了多线程的知识,还学习如何使用java 8 的新姿势,此时不点个赞,有点对不住我这个博主φ(>ω<*) 。
    代码如下:
    public class SemaphoreAdvancedDemo {
    
        public static void main(String[] args)  {
            //新建一个拥有4个许可的信号量
            Semaphore semaphoreAdvance = new Semaphore(4);
    //        testSemaphore(() -> new Advance(semaphoreAdvance));
    
            testSemaphore(()->new Final(semaphoreAdvance));
        }
    
        private static void testSemaphore(Supplier<Runnable> supplier) {
    
            for (int i = 0; i < THREAD_SIZE; i++) {
                Thread demoThread = new Thread(supplier.get());
    
                demoThread.start();
                //随机调用interrupt 方法模仿实际被中断
                if (ThreadLocalRandom.current().nextInt(THREAD_SIZE) > THREAD_SIZE / 2) {
                    demoThread.interrupt();
                }
            }
        }
        static class Advance implements Runnable {
    
             Advance(Semaphore semaphoreAdvance) {
                this.semaphoreAdvance = semaphoreAdvance;
            }
    
            private Semaphore semaphoreAdvance;
    
            @Override
            public void run() {
                boolean isAcquire = false;
                try {
                    //tryAcquire 如果 没有许可会立即返回false,否则会通过CAS 去修改被volatile修饰的许可总数即state
                    while (!(isAcquire = semaphoreAdvance.tryAcquire())) {
                        Thread.sleep(100);
                    }
                    runSomething();
                } catch (InterruptedException e) {
                    System.out.println(String.format("threadId:%s interrupt", Thread.currentThread().getId()));
                    Thread.currentThread().interrupt();
                } finally {
                    if (isAcquire) {
                        semaphoreAdvance.release();
                        System.out.println(String.format("current threadId:%s released a permit", Thread.currentThread().getId()));
                    }
                }
            }
        }
    
        static class Final implements Runnable {
    
             Final(Semaphore semaphoreFinal) {
                this.semaphoreFinal = semaphoreFinal;
            }
            private Semaphore semaphoreFinal;
    
            @Override
            public void run() {
                try {
                    semaphoreFinal.acquireUninterruptibly();
                    if (Thread.currentThread().isInterrupted()) {
                        System.out.println(String.format("[final] %s have interrupt", Thread.currentThread().getName()));
                        throw new InterruptedException();
                    }
                    runSomething();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    semaphoreFinal.release();
                    System.out.println(String.format("%s released a permit", Thread.currentThread().getName()));
                }
            }
        }
    }
    

    使用场景

    通例:某类资源同时限定n 个线程访问

    实际使用场景

    • 信号量可以用来限制一次数据库连接的数量
    • 可以实现java 版的限流工具
    • 信号量持有一个许可证的时侯可以当做同步资源来使用,不过使用需要小心因为信号量即使一个线程没有获取许可证,也可以释放许可证,这就是和排它锁的区别,但是如果你使用得当,它还可以解决线上死锁的问题(大家可以思考下怎么设计在评论区讨论)
      最后实现一个信号量版的令牌桶算法
      令牌桶示意图
      上源码
    /**
     * 限流:令牌桶算法
     */
    public class RateLimiterOnSemaphore {
        private static final int DEFAULT_REQUEST_PER_SECOND = 200;
        private static final int SECOND_MILLIONS = 1000;
        private ScheduledExecutorService schedule = Executors.newScheduledThreadPool(1);
        private final Semaphore tokenContainer;
    
        private final int requestPerSecond;
    
        public RateLimiterOnSemaphore() {
            this(DEFAULT_REQUEST_PER_SECOND);
        }
    
        public RateLimiterOnSemaphore(int requestPerSecond) {
            this.requestPerSecond = requestPerSecond;
            tokenContainer = new Semaphore(requestPerSecond);
            //定时任务往container 匀速的存放token
            //计算 定时任务执行的间隔时间
            long period = SECOND_MILLIONS / requestPerSecond;
    
            schedule.scheduleAtFixedRate(this::putToken, 0, period, TimeUnit.MILLISECONDS);
        }
    
        public static RateLimiterOnSemaphore create(int tokensPerSecond) {
            return new RateLimiterOnSemaphore(tokensPerSecond);
        }
    
        /**
         * 获取token
         */
        public void acquire() {
            tokenContainer.acquireUninterruptibly();
        }
    
        /**
         * 尝试获取token
         *
         * @return true 如果获取成功 否则返回false
         */
        public boolean tryAcquire() {
            return tokenContainer.tryAcquire();
        }
    
        /**
         * 往容器里存放token
         */
        private void putToken() {
            //判断是否达到每秒上限
            if (tokenContainer.availablePermits() < requestPerSecond) {
                tokenContainer.release();
            }
        }
    }
    

    小结
    这篇主要讲信号量的用法,下一篇讲jdk 是如何实现的。请大家点赞关注下吧,动动你的小拇指是对我最大的帮助。

    相关文章

      网友评论

        本文标题:多线程之Semaphore (限流Java 版)

        本文链接:https://www.haomeiwen.com/subject/rnxjuctx.html