美文网首页
线程池的拒绝策略示例

线程池的拒绝策略示例

作者: overflowedstack | 来源:发表于2021-06-10 23:18 被阅读0次

    Java的线程池中,如果不断往线程池提交任务,最终会发生什么?
    如果work queue是一个有界队列,队列放满,线程数量达到maxsize,且没有空闲线程时,再往线程池提交任务会触发线程池的拒绝策略。

    线程池有哪些拒绝策略呢?

    1. AbortPolicy 丢弃并抛出异常-- 默认策略

    定义
        /**
         * The default rejected execution handler
         */
        private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();
    
        /**
         * A handler for rejected tasks that throws a
         * {@code RejectedExecutionException}.
         */
        public static class AbortPolicy implements RejectedExecutionHandler {
            /**
             * Creates an {@code AbortPolicy}.
             */
            public AbortPolicy() { }
    
            /**
             * Always throws RejectedExecutionException.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             * @throws RejectedExecutionException always
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }
        }
    
    示例
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class ThreadPoolRejectPolicyTest {
    
        public static void main(String[] args) {
            ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
            
            for (int i = 0; i < 8; i++) {
                Runnable task = new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("Thread is submitted.");
                }};
                    
                threadPool.submit(task);
            }
        }
    }
    
    Thread is submitted.
    Thread is submitted.
    Thread is submitted.
    Thread is submitted.
    Thread is submitted.
    Thread is submitted.
    Thread is submitted.
    Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@6bc7c054 rejected from java.util.concurrent.ThreadPoolExecutor@232204a1[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 7]
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
        at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
        at demo.multithread.ThreadPoolRejectPolicyTest.main(ThreadPoolRejectPolicyTest.java:21)
    
    

    2. DiscardPolicy 直接丢弃任务

    定义
        /**
         * A handler for rejected tasks that silently discards the
         * rejected task.
         */
        public static class DiscardPolicy implements RejectedExecutionHandler {
            /**
             * Creates a {@code DiscardPolicy}.
             */
            public DiscardPolicy() { }
    
            /**
             * Does nothing, which has the effect of discarding task r.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            }
        }
    
    示例

    一共提交8个任务,其中有一个默默被丢弃。

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class ThreadPoolRejectPolicyTest {
    
        public static void main(String[] args) {
            ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 60, 
                    TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), new ThreadPoolExecutor.DiscardPolicy());
            
            for (int i = 0; i < 8; i++) {
                Runnable task = new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("Thread is submitted.");
                        try {
                            Thread.sleep(60000l);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                }};
                    
                threadPool.submit(task);
            }
            
            int threadCount = ((ThreadPoolExecutor)threadPool).getPoolSize();
            System.out.println("threadCount " + threadCount);
            
            int threadCntInWorkQueue = ((ThreadPoolExecutor)threadPool).getQueue().size();
            System.out.println("threadCntInWorkQueue " + threadCntInWorkQueue);
                    
            threadPool.shutdown();
        }
    }
    
    Thread is submitted.
    threadCount 2
    threadCntInWorkQueue 5
    Thread is submitted.
    

    3. DiscardOldestPolicy 丢弃最旧的任务,再试着处理

    定义
        /**
         * A handler for rejected tasks that discards the oldest unhandled
         * request and then retries {@code execute}, unless the executor
         * is shut down, in which case the task is discarded.
         */
        public static class DiscardOldestPolicy implements RejectedExecutionHandler {
            /**
             * Creates a {@code DiscardOldestPolicy} for the given executor.
             */
            public DiscardOldestPolicy() { }
    
            /**
             * Obtains and ignores the next task that the executor
             * would otherwise execute, if one is immediately available,
             * and then retries execution of task r, unless the executor
             * is shut down, in which case task r is instead discarded.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    e.getQueue().poll();
                    e.execute(r);
                }
            }
        }
    
    示例

    线程池1个核心线程,max线程数为2,work queue大小为5.
    可以看到,提交8个任务后,第2个任务被丢弃了。因为第2个任务是oldest,第一个被放进queue的任务。

    package demo.multithread;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class ThreadPoolRejectPolicyTest {
    
        public static void main(String[] args) {
            ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 60, 
                    TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), new ThreadPoolExecutor.DiscardOldestPolicy());
            
            for (int i = 0; i < 8; i++) {
                Runnable task = new MyTask(i);
                    
                threadPool.submit(task);
            }
            
            int threadCount = ((ThreadPoolExecutor)threadPool).getPoolSize();
            System.out.println("threadCount " + threadCount);
            
            int threadCntInWorkQueue = ((ThreadPoolExecutor)threadPool).getQueue().size();
            System.out.println("threadCntInWorkQueue " + threadCntInWorkQueue);
                    
            threadPool.shutdown();
        }
    }
    
    class MyTask implements Runnable {
        private int taskId;
        
        MyTask(int i) {
            taskId = i;
        }
    
        @Override
        public void run() {
            System.out.println("Running thread - id " + taskId);
            try {
                Thread.sleep(1000l);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public int getTaskId() {
            return taskId;
        }
    
        public void setTaskId(int taskId) {
            this.taskId = taskId;
        }
    }
    
    Running thread - id 0
    threadCount 2
    threadCntInWorkQueue 5
    Running thread - id 6
    Running thread - id 2
    Running thread - id 3
    Running thread - id 4
    Running thread - id 5
    Running thread - id 7
    

    4. CallerRunsPolicy 由调用者线程执行任务

    用这种拒绝策略时要注意,主线程既需要负责创建线程,又需要执行任务,会造成性能问题。

    定义
        /**
         * A handler for rejected tasks that runs the rejected task
         * directly in the calling thread of the {@code execute} method,
         * unless the executor has been shut down, in which case the task
         * is discarded.
         */
        public static class CallerRunsPolicy implements RejectedExecutionHandler {
            /**
             * Creates a {@code CallerRunsPolicy}.
             */
            public CallerRunsPolicy() { }
    
            /**
             * Executes task r in the caller's thread, unless the executor
             * has been shut down, in which case the task is discarded.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    r.run();
                }
            }
        }
    
    示例

    在输出中,能看出,主线程号为1,而提交的任务中,其中一个任务(最后一个被提交的任务)就是由主线程来执行的。

    package demo.multithread;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class ThreadPoolRejectPolicyTest {
    
        public static void main(String[] args) {
            ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 60, 
                    TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), new ThreadPoolExecutor.CallerRunsPolicy());
            
            System.out.println("Main thread is " + Thread.currentThread().getId());
            
            for (int i = 0; i < 8; i++) {
                Runnable task = new MyTask(i);
                    
                threadPool.submit(task);
            }
            
            int threadCount = ((ThreadPoolExecutor)threadPool).getPoolSize();
            System.out.println("threadCount " + threadCount);
            
            int threadCntInWorkQueue = ((ThreadPoolExecutor)threadPool).getQueue().size();
            System.out.println("threadCntInWorkQueue " + threadCntInWorkQueue);
                    
            threadPool.shutdown();
        }
    }
    
    class MyTask implements Runnable {
        private int taskId;
        
        MyTask(int i) {
            taskId = i;
        }
    
        @Override
        public void run() {
            System.out.println("Running thread - id " + taskId);
            System.out.println("The thread id is " + Thread.currentThread().getId());
            try {
                Thread.sleep(1000l);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public int getTaskId() {
            return taskId;
        }
    
        public void setTaskId(int taskId) {
            this.taskId = taskId;
        }
    }
    
    Main thread is 1
    Running thread - id 0
    The thread id is 9
    Running thread - id 7
    The thread id is 1
    Running thread - id 6
    The thread id is 10
    Running thread - id 1
    The thread id is 9
    threadCount 2
    threadCntInWorkQueue 3
    Running thread - id 2
    The thread id is 10
    Running thread - id 3
    The thread id is 9
    Running thread - id 4
    The thread id is 10
    Running thread - id 5
    The thread id is 9
    

    5. 自定义拒绝策略

    了解了前四种拒绝策略,发现:
    abort,discard,discardOldest都会丢弃任务;
    callerRun虽然执行了任务,但是会影响主线程性能。

    若将work queue设置为无界队列,或者将maxsize设置为最大整数,都有可能造成out of memory。

    那么可以通过自定义拒绝策略,让后进来的task阻塞住,有资源了再处理。这样可以让每一个任务都得到执行。

    package demo.multithread;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class ThreadPoolRejectPolicyTest {
    
        public static void main(String[] args) {
            ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 2, 60, 
                    TimeUnit.SECONDS, new ArrayBlockingQueue<>(5), new CustomRejectPolicy());
                    
            for (int i = 0; i < 8; i++) {
                Runnable task = new MyTask(i);
                    
                threadPool.submit(task);
            }
                    
            threadPool.shutdown();
        }
    }
    
    class CustomRejectPolicy implements RejectedExecutionHandler {
    
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
        }
        
    }
    
    class MyTask implements Runnable {
        private int taskId;
        
        MyTask(int i) {
            taskId = i;
        }
    
        @Override
        public void run() {
            System.out.println("Running thread - id " + taskId);
            try {
                Thread.sleep(1000l);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public int getTaskId() {
            return taskId;
        }
    
        public void setTaskId(int taskId) {
            this.taskId = taskId;
        }
    }
    
    Running thread - id 0
    Running thread - id 6
    Running thread - id 1
    Running thread - id 2
    Running thread - id 3
    Running thread - id 4
    Running thread - id 5
    Running thread - id 7
    

    相关文章

      网友评论

          本文标题:线程池的拒绝策略示例

          本文链接:https://www.haomeiwen.com/subject/jwjpeltx.html