美文网首页
并发编程工具(四)Semaphore 信号量

并发编程工具(四)Semaphore 信号量

作者: 圆企鹅i | 来源:发表于2021-05-17 16:01 被阅读0次

    允许固定数量线程执行的锁

    小漏斗
    简而言之 ,就是能够控制固定数量线程同时执行的工具

    image.png
    demo and 方法说明
    package com.threads.thread.jdk.semaphore;
    
    import lombok.AllArgsConstructor;
    import lombok.Builder;
    import lombok.Data;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.Random;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.IntStream;
    
    /**
     * @author zhangxuecheng4441
     * @date 2021/5/17/017 19:45
     */
    @Slf4j
    public class SemaphoreExample {
        public static void main(String[] args) {
    
            //execute task count in the same time
            final int parallelCount = 3;
            //task task controller
            final SemaphoreController semaphoreController = new SemaphoreController(parallelCount, true);
            //task count
            IntStream allTask = IntStream.range(0, 10);
            allTask.forEach(i ->
                    new Thread(() -> {
                        try {
                            //todo debug 运行任务 其他任务拿不到凭证则
                            boolean isGetStamp = semaphoreController.executeTask(1, 10);
                            //获取凭证 则可以执行任务
                            if (isGetStamp) {
                                //执行任务
                                executeSomeTask(i);
                            } else {
                                log.warn("can not execute task {}", i);
                                //执行服务降级方案
                                return;
                            }
                            //todo debug 运行任务 如果拿不到凭证 则堵塞任务 等待可以获得凭证 则执行
                            //semaphoreController.executeTaskUtilSuccessOrInterrupted(1);
                            //executeSomeTask(i);
                        } catch (Exception e) {
                            log.error("execute error", e);
                            //其他任务发生异常时 return 不会执行下面的提交任务凭证
                            return;
                        }
                        //todo 因为semaphore是任务共享的 其他线程发送异常后
                        //todo 也会执行endTask(release) 会出现非执行任务线程 提交任务结束的bug
                        //拆分开 try-catch-finally 其他任务发生异常时 return 不会执行下面的提交任务凭证
                        try {
                            log.info("finish task");
                        } finally {
                            //任务声明结束 务必放在finally中 否则在排查问题上会有很多问题
                            semaphoreController.endTask(1);
                            log.info("NO.{} task end ", i);
                        }
                    }, "thread-" + i).start()
            );
    
    
        }
    
        private static void executeSomeTask(int taskNumber) {
            try {
                int taskSpendTime = new Random().nextInt(20);
                log.info("execute NO.[{}] task need spend {} s", taskNumber, taskSpendTime);
                TimeUnit.SECONDS.sleep(taskSpendTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
        }
    
        @Data
        @Builder
        @AllArgsConstructor
        private static class SemaphoreController {
            final int initCount = 2;
            private final Semaphore semaphore;
    
    
            /**
             * 定义Semaphore指定许可证数量,并且指定非公平的同步器,
             * 因此new Semaphore(n)实际上是等价于newSemaphore(n,false)的。
             *
             * @param customStampCount customStampCount
             * @param failQueue        定义Semaphore指定许可证数量的同时给定非公平或是公平同步器。
             */
            public SemaphoreController(int customStampCount, boolean failQueue) {
                this.semaphore = new Semaphore(customStampCount, failQueue);
            }
    
            /**
             * 在使用无参的tryAcquire时只会向Semaphore尝试获取一个许可证,
             * 但是该方法会向Semaphore尝试获取指定数目的许可证。
             * <p>
             * 该方法与tryAcquire无参方法类似,同样也是尝试获取一个许可证,但是增加了超时参数。
             * 如果在超时时间内还是没有可用的许可证,那么线程就会进入[阻塞状态],
             * 直到到达超时时间或者在超时时间内有可用的证书(被其他线程释放的证书),
             * 或者阻塞中的线程被其他线程执行了中断。
             *
             * @return isTaskStart
             */
            public boolean executeTask(int stampCount, int timeoutSeconds) {
                boolean isTaskStart = false;
                try {
                    isTaskStart = semaphore.tryAcquire(stampCount, timeoutSeconds, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return isTaskStart;
            }
    
            /**
             * 提交任务 返回标记
             *
             * @param stampCount stampCount
             */
            public void endTask(int stampCount) {
                try {
                    semaphore.release(stampCount);
                } catch (Exception e) {
                    log.error("error release");
                }
            }
    
            /**
             * acquire方法也是向Semaphore获取许可证,但是该方法比较偏执一些,
             * 获取不到就会一直等(陷入阻塞状态),Semaphore为我们提供了acquire方法的两种重载形式。
             * <p>
             * 直到Semaphore有可用的许可证为止,或者被其他线程中断。
             * 当然,如果有可用的许可证则会立即返回
             *
             * @param stampCount taskCount
             */
            public void executeTaskUtilSuccessOrInterrupted(int stampCount) {
                try {
                    semaphore.acquire(stampCount);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            /**
             * 该方法会向Semaphore获取一个许可证,如果获取不到就会一直等待,
             * 与此同时对该线程的任何中断操作都会被无视,
             * 直到Semaphore有可用的许可证为止。当然,如果有可用的许可证则会立即返回。
             *
             * @param stampCount stampCount
             */
            public void forceExecuteTaskUtilSuccess(int stampCount) {
                semaphore.acquireUninterruptibly(stampCount);
            }
    
            /**
             * 对Semaphore许可证的争抢采用公平还是非公平的方式,
             * 对应到内部的实现类为FairSync(公平)和NonfairSync(非公平)。
             *
             * @return isFair
             */
            public boolean isFair() {
                return semaphore.isFair();
            }
    
            /**
             * 当前的Semaphore还有多少个可用的许可证。
             *
             * @return 当前的Semaphore还有多少个可用的许可证。
             */
            public int availablePermits() {
                return semaphore.availablePermits();
            }
    
            /**
             * 排干Semaphore的所有许可证,以后的线程将无法获取到许可证,
             * 已经获取到许可证的线程将不受影响。
             *
             * @return 排干Semaphore的所有许可证
             */
            public int drainPermits() {
                return semaphore.drainPermits();
            }
    
            /**
             * 当前是否有线程由于要获取Semaphore许可证而进入阻塞?(该值为预估值。)
             *
             * @return 当前是否有线程由于要获取Semaphore许可证而进入阻塞?(该值为预估值。)
             */
            boolean hasQueuedThreads() {
                return semaphore.hasQueuedThreads();
            }
    
            /**
             * 如果有线程由于获取Semaphore许可证而进入阻塞,那么它们的个数是多少呢?(该值为预估值。)
             *
             * @return 如果有线程由于获取Semaphore许可证而进入阻塞,那么它们的个数是多少呢?(该值为预估值。)
             */
            int getQueueLength() {
                return semaphore.getQueueLength();
            }
        }
    }
    
    

    重写之后 安全的小漏斗
    package com.threads.thread.jdk.semaphore;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    
    import static java.lang.Thread.currentThread;
    
    /**
     * @author zhangxuecheng4441
     * @date 2021/5/17/017 12:00
     */
    @Slf4j
    public class MySemaphore extends Semaphore {
    
        private static final long serialVersionUID = 3184061773358565644L;
        /**
         * 定义线程安全的、存放Thread类型的队列 todo 此处可以是其他线程安全的集合
         */
        private final ConcurrentLinkedQueue<Thread> queue = new ConcurrentLinkedQueue<>();
    
        int parallelCount;
    
        public MySemaphore(int permits) {
            super(permits);
        }
    
        public MySemaphore(int permits, boolean fair) {
            super(permits, fair);
        }
    
        @Override
        public void acquire() throws InterruptedException {
            super.acquire();
            // 线程成功获取许可证,将其放入队列中
            this.queue.add(currentThread());
        }
    
        @Override
        public void acquireUninterruptibly() {
            super.acquireUninterruptibly();
            // 线程成功获取许可证,将其放入队列中
            this.queue.add(currentThread());
        }
    
        @Override
        public boolean tryAcquire() {
            final boolean acquired = super.tryAcquire();
            if (acquired) {
                // 线程成功获取许可证,将其放入队列中
                this.queue.add(currentThread());
            }
            return acquired;
        }
    
        @Override
        public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException {
            final boolean acquired = super.tryAcquire(timeout, unit);
            if (acquired) {
                // 线程成功获取许可证,将其放入队列中
                this.queue.add(currentThread());
            }
            return acquired;
        }
    
        @Override
        public void release() {
            final Thread currentThread = currentThread();
            // 当队列中不存在该线程时,调用release方法将会被忽略
            if (!this.queue.contains(currentThread)) {
                return;
            }
            super.release();
            // 成功释放,并且将当前线程从队列中剔除
            this.queue.remove(currentThread);
        }
    
        @Override
        public void acquire(int permits) throws InterruptedException {
            super.acquire(permits);
            // 线程成功获取许可证,将其放入队列中
            this.queue.add(currentThread());
        }
    
        @Override
        public void acquireUninterruptibly(int permits) {
            super.acquireUninterruptibly(permits);
            // 线程成功获取许可证,将其放入队列中
            this.queue.add(currentThread());
        }
    
        @Override
        public boolean tryAcquire(int permits) {
            boolean acquired = super.tryAcquire(permits);
            if (acquired) {
                // 线程成功获取许可证,将其放入队列中
                this.queue.add(currentThread());
            }
            return acquired;
        }
    
        @Override
        public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException {
            boolean acquired = super.tryAcquire(permits, timeout, unit);
            if (acquired) {
                // 线程成功获取许可证,将其放入队列中
                this.queue.add(currentThread());
            }
            return acquired;
        }
    
        @Override
        public void release(int permits) {
            final Thread currentThread = currentThread();
            // 当队列中不存在该线程时,调用release方法将会被忽略
            if (!this.queue.contains(currentThread)) {
                return;
            }
            super.release(permits);
            // 成功释放,并且将当前线程从队列中剔除
            this.queue.remove(currentThread);
        }
    
    
    }
    
    
    
    

    处理任务的方法
    package com.threads.thread.jdk.semaphore;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    import java.util.function.Function;
    
    /**
     * @author zhangxuecheng4441
     * @date 2021/5/17/017 20:10
     */
    @Slf4j
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class MySemaphoreHandler<T, R> {
        private static final long serialVersionUID = 3184061773358565644L;
        Semaphore semaphore;
        /**
         * 需要执行的函数
         */
        Function<T, R> executeTaskFunction;
        /**
         * 线程堵塞降级方法
         */
        Function<T, R> dropMethodFunction;
        /**
         * 一次任务凭证数量
         */
        int permits;
        /**
         * 超时时间
         */
        long timeout;
        /**
         * 超时时间单位
         */
        TimeUnit unit;
    
        /**
         * 执行任务 如果任务失败失败
         * 则执行降级方法
         */
        public R handle(T parameter) {
            R result = null;
            try {
                //运行任务 其他任务拿不到凭证则
                boolean isGetStamp = semaphore.tryAcquire(permits, timeout, unit);
                //获取凭证 则可以执行任务
                if (isGetStamp) {
                    //执行任务
                    result = executeTaskFunction.apply(parameter);
                } else {
                    result = dropMethodFunction.apply(parameter);
                    //执行服务降级方案
                    return result;
                }
            } catch (Exception e) {
                log.error("execute error", e);
                //其他任务发生异常时 return 不会执行下面的提交任务凭证
                return null;
            }
            //todo 因为semaphore是任务共享的 其他线程发送异常后
            //todo 也会执行endTask(release) 会出现非执行任务线程 提交任务结束的bug
            //todo 使用重写的 { MySemaphore } 可以直接避免问题
            //拆分开 try-catch-finally 其他任务发生异常时 return 不会执行下面的提交任务凭证
            try {
                log.info("No.{} finish task", parameter);
            } finally {
                //任务声明结束 务必放在finally中 否则在排查问题上会有很多问题
                semaphore.release(permits);
            }
            return result;
        }
    
        /**
         * 强行执行所有任务
         * 任务无法获得凭证 堵塞线程至能获得为止
         */
        public R forceHandle(T parameter) {
            R result = null;
            try {
                semaphore.acquire();
                result = executeTaskFunction.apply(parameter);
            } catch (Exception e) {
                log.error("execute error", e);
                //其他任务发生异常时 return 不会执行下面的提交任务凭证
                return null;
            }
            //todo 因为semaphore是任务共享的 其他线程发送异常后
            //todo 也会执行endTask(release) 会出现非执行任务线程 提交任务结束的bug
            //todo 使用重写的 { MySemaphore } 可以直接避免问题
            //拆分开 try-catch-finally 其他任务发生异常时 return 不会执行下面的提交任务凭证
            try {
                log.info("No.{} finish task", parameter);
            } finally {
                //任务声明结束 务必放在finally中 否则在排查问题上会有很多问题
                semaphore.release(permits);
            }
            return result;
        }
    
    }
    
    

    执行自定义的小漏斗的demo
    package com.threads.thread.jdk.semaphore;
    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    import java.util.stream.IntStream;
    
    /**
     * @author zhangxuecheng4441
     * @date 2021/5/17/017 21:45
     */
    @Slf4j
    public class MySemaphoreExample {
        /**
         * execute task count in the same time
         */
        final static int PARALLEL_COUNT = 3;
        /**
         * 一次任务获取凭证数量
         */
        final static int ONCE_PERMITS = 1;
        /**
         * 一共十个任务
         */
        final static IntStream ALL_TASK = IntStream.range(0, 10);
        /**
         * 任务获取超时时间10s
         */
        final static int OUT_TIME = 10;
    
        public static void main(String[] args) {
            MySemaphoreHandler<Integer, Integer> semaphoreHandler = new MySemaphoreHandler<>(new MySemaphore(PARALLEL_COUNT),
                    MySemaphoreExample::executeSomeTaskFunction, MySemaphoreExample::dropMethodFunction,
                    ONCE_PERMITS, OUT_TIME, TimeUnit.SECONDS);
            //执行任务
            ALL_TASK.forEach(i ->
                    new Thread(//() -> semaphoreHandler.handle(i)
                            () -> semaphoreHandler.forceHandle(i)
                            , "thread-" + i).start()
    
            );
        }
    
        /**
         * 主要任务执行逻辑
         *
         * @param taskNumber
         * @return
         */
        public static Integer executeSomeTaskFunction(Integer taskNumber) {
            try {
                int taskSpendTime = new Random().nextInt(20);
                log.info("execute NO.[{}] task need spend {} s", taskNumber, taskSpendTime);
                TimeUnit.SECONDS.sleep(taskSpendTime);
                return taskSpendTime;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        /**
         * 降级方法
         *
         * @param taskNumber
         * @return
         */
        private static Integer dropMethodFunction(Integer taskNumber) {
            try {
                int taskSpendTime = new Random().nextInt(20);
                log.info("execute dropMethodFunction NO.[{}] task need spend {} s", taskNumber, taskSpendTime);
                TimeUnit.SECONDS.sleep(taskSpendTime);
                return taskSpendTime;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        }
    
    }
    
    

    感谢:《java并发变成详解:深入理解并发核心库》汪文军

    相关文章

      网友评论

          本文标题:并发编程工具(四)Semaphore 信号量

          本文链接:https://www.haomeiwen.com/subject/ylpjjltx.html