美文网首页
Java基础 -- 线程拒绝策略

Java基础 -- 线程拒绝策略

作者: tom_xin | 来源:发表于2018-12-02 13:52 被阅读0次

    什么是线程拒绝策略

        当线程池达到饱和状态时,新提交的任务需要一种处理方法,这也就出现了拒绝策略。Java中提供了四种默认的拒绝策略,CallerRunsPolicy ,AbortPolicy,DiscardPolicy,DiscardOldestPolicy。我们也可以通过实现RejectedExecutionHandler接口来定义自己的拒绝策略。


    拒绝策略发生在什么时候

        /**
         * Executes the given task sometime in the future.  The task
         * may execute in a new thread or in an existing pooled thread.
         *
         * If the task cannot be submitted for execution, either because this
         * executor has been shutdown or because its capacity has been reached,
         * the task is handled by the current {@code RejectedExecutionHandler}.
         *
         * @param command the task to execute
         * @throws RejectedExecutionException at discretion of
         *         {@code RejectedExecutionHandler}, if the task
         *         cannot be accepted for execution
         * @throws NullPointerException if {@code command} is null
         */
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            /*
             * Proceed in 3 steps:
             *
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             *
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             *
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
             */
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                // 当检测到线程池不是在运行状态时,将会执行拒绝策略。
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            // 当添加线程任务失败时,将会执行拒绝策略。
            else if (!addWorker(command, false))
                reject(command);
        }
    

    从代码中可以看到reject(command)方法,是有两处地方被调用的。代码中也有标注:
    1、当检测到线程池不是在运行状态时,将会执行拒绝策略。
    2、当检测到线程池不是在运行状态时,将会执行拒绝策略。


    四种拒绝策略

    CallerRunsPolicy

        /**
         * A handler for rejected tasks that runs the rejected task
         * directly in the calling thread of the {@code execute} method,
         * unless the executor has been shut down, in which case the task
         * is discarded.
         */
        public static class CallerRunsPolicy implements RejectedExecutionHandler {
            /**
             * Creates a {@code CallerRunsPolicy}.
             */
            public CallerRunsPolicy() { }
    
            /**
             * Executes task r in the caller's thread, unless the executor
             * has been shut down, in which case the task is discarded.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                // 判断线程池当前的状态,如果不是关闭状态,则直接运行该方法。
                if (!e.isShutdown()) {
                    r.run();
                }
            }
        }
    
    

    AbortPolicy

        /**
         * A handler for rejected tasks that throws a
         * {@code RejectedExecutionException}.
         */
        public static class AbortPolicy implements RejectedExecutionHandler {
            /**
             * Creates an {@code AbortPolicy}.
             */
            public AbortPolicy() { }
    
            /**
             * Always throws RejectedExecutionException.
             * 
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             * @throws RejectedExecutionException always
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                // 直接抛出异常
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }
        }
    
    

    DiscardPolicy :不做任何处理

        /**
         * A handler for rejected tasks that silently discards the
         * rejected task.
         */
        public static class DiscardPolicy implements RejectedExecutionHandler {
            /**
             * Creates a {@code DiscardPolicy}.
             */
            public DiscardPolicy() { }
    
            /**
             * Does nothing, which has the effect of discarding task r.
             * 
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            }
        }
    
    

    DiscardOldestPolicy:判断当前线程池的状态,如果不是shutdown,则将队列中最老的任务弹出,加入信息任务。

        /**
         * A handler for rejected tasks that discards the oldest unhandled
         * request and then retries {@code execute}, unless the executor
         * is shut down, in which case the task is discarded.
         */
        public static class DiscardOldestPolicy implements RejectedExecutionHandler {
            /**
             * Creates a {@code DiscardOldestPolicy} for the given executor.
             */
            public DiscardOldestPolicy() { }
    
            /**
             * Obtains and ignores the next task that the executor
             * would otherwise execute, if one is immediately available,
             * and then retries execution of task r, unless the executor
             * is shut down, in which case task r is instead discarded.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                if (!e.isShutdown()) {
                    e.getQueue().poll();
                    e.execute(r);
                }
            }
        }
    

    其他拒绝策略

        我们来看一下Dubbo中,拒绝策略是怎么实现的,可以参考到我们的项目中。

    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    package org.apache.dubbo.common.threadpool.support;
    
    import org.apache.dubbo.common.Constants;
    import org.apache.dubbo.common.URL;
    import org.apache.dubbo.common.logger.Logger;
    import org.apache.dubbo.common.logger.LoggerFactory;
    import org.apache.dubbo.common.utils.JVMUtil;
    
    import java.io.File;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.concurrent.Executors;
    import java.util.concurrent.RejectedExecutionException;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.ThreadPoolExecutor;
    
    /**
     * Abort Policy.
     * Log warn info when abort.
     */
    public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
    
        protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
    
        private final String threadName;
    
        private final URL url;
    
        private static volatile long lastPrintTime = 0;
    
        private static Semaphore guard = new Semaphore(1);
    
        public AbortPolicyWithReport(String threadName, URL url) {
            this.threadName = threadName;
            this.url = url;
        }
        
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            // 打印了线程的名字,Pool Size:线程池的大小,active:活跃数,core:核心线程数,max:最大线程数,largest:线程池中达到过的最大值。线程池当前的状态,shutdown,isTerminated,isTerminating。请求URL地址。
            String msg = String.format("Thread pool is EXHAUSTED!" +
                            " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
                            " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
                    threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                    e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
                    url.getProtocol(), url.getIp(), url.getPort());
            logger.warn(msg);
            // 打印线程堆栈信息
            dumpJStack();
            throw new RejectedExecutionException(msg);
        }
    
        private void dumpJStack() {
            long now = System.currentTimeMillis();
    
            //dump every 10 minutes  限制两个堆栈信息打印的时间不能超过10分钟。
            if (now - lastPrintTime < 10 * 60 * 1000) {
                return;
            }
            // 通过Semaphore来控制同一时间只有一个线程在打印信息。
            if (!guard.tryAcquire()) {
                return;
            }
            // 将打印堆栈信息,放到一个单独的线程池中来执行。
            Executors.newSingleThreadExecutor().execute(new Runnable() {
                @Override
                public void run() {
                    String dumpPath = url.getParameter(Constants.DUMP_DIRECTORY, System.getProperty("user.home"));
    
                    SimpleDateFormat sdf;
    
                    String OS = System.getProperty("os.name").toLowerCase();
    
                    // window system don't support ":" in file name
                    if(OS.contains("win")){
                        sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");
                    }else {
                        sdf = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss");
                    }
    
                    String dateStr = sdf.format(new Date());
                    FileOutputStream jstackStream = null;
                    try {
                        // 设置输出打印文件的路径,文件名称。
                        jstackStream = new FileOutputStream(new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr));
                        // 将堆栈信息写入文件中。
                        JVMUtil.jstack(jstackStream);
                    } catch (Throwable t) {
                        logger.error("dump jstack error", t);
                    } finally {
                        guard.release();
                        if (jstackStream != null) {
                            try {
                                jstackStream.flush();
                                jstackStream.close();
                            } catch (IOException e) {
                            }
                        }
                    }
    
                    lastPrintTime = System.currentTimeMillis();
                }
            });
    
        }
    
    }
    
    

    我们再来看一下线程的堆栈信息是怎么获取的?

    public class JVMUtil {
        public static void jstack(OutputStream stream) throws Exception {
            ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean();
           // 通过ThreadMxBean获的所有线程信息,方法中的两个true,true分别代表
    Monitor锁:通常synchronized(this)类型的锁,Synchronized锁:ReentrantLock锁和ReentrantReadWriteLock锁。
            for (ThreadInfo threadInfo : threadMxBean.dumpAllThreads(true, true)) {
                stream.write(getThreadDumpString(threadInfo).getBytes());
            }
        }
        该方法主要是将ThreadInfo中的线程信息打印出来,线程状态,线程名称,线程持有锁的状态,线程堆栈信息。
        private static String getThreadDumpString(ThreadInfo threadInfo) {
            StringBuilder sb = new StringBuilder("\"" + threadInfo.getThreadName() + "\"" +
                    " Id=" + threadInfo.getThreadId() + " " +
                    threadInfo.getThreadState());
            if (threadInfo.getLockName() != null) {
                sb.append(" on " + threadInfo.getLockName());
            }
            if (threadInfo.getLockOwnerName() != null) {
                sb.append(" owned by \"" + threadInfo.getLockOwnerName() +
                        "\" Id=" + threadInfo.getLockOwnerId());
            }
            if (threadInfo.isSuspended()) {
                sb.append(" (suspended)");
            }
            if (threadInfo.isInNative()) {
                sb.append(" (in native)");
            }
            sb.append('\n');
            int i = 0;
    
            StackTraceElement[] stackTrace = threadInfo.getStackTrace();
            MonitorInfo[] lockedMonitors = threadInfo.getLockedMonitors();
            for (; i < stackTrace.length && i < 32; i++) {
                StackTraceElement ste = stackTrace[i];
                sb.append("\tat " + ste.toString());
                sb.append('\n');
                if (i == 0 && threadInfo.getLockInfo() != null) {
                    Thread.State ts = threadInfo.getThreadState();
                    switch (ts) {
                        case BLOCKED:
                            sb.append("\t-  blocked on " + threadInfo.getLockInfo());
                            sb.append('\n');
                            break;
                        case WAITING:
                            sb.append("\t-  waiting on " + threadInfo.getLockInfo());
                            sb.append('\n');
                            break;
                        case TIMED_WAITING:
                            sb.append("\t-  waiting on " + threadInfo.getLockInfo());
                            sb.append('\n');
                            break;
                        default:
                    }
                }
    
                for (MonitorInfo mi : lockedMonitors) {
                    if (mi.getLockedStackDepth() == i) {
                        sb.append("\t-  locked " + mi);
                        sb.append('\n');
                    }
                }
            }
            if (i < stackTrace.length) {
                sb.append("\t...");
                sb.append('\n');
            }
    
            LockInfo[] locks = threadInfo.getLockedSynchronizers();
            if (locks.length > 0) {
                sb.append("\n\tNumber of locked synchronizers = " + locks.length);
                sb.append('\n');
                for (LockInfo li : locks) {
                    sb.append("\t- " + li);
                    sb.append('\n');
                }
            }
            sb.append('\n');
            return sb.toString();
        }
    }
    
    

    相关文章

      网友评论

          本文标题:Java基础 -- 线程拒绝策略

          本文链接:https://www.haomeiwen.com/subject/kyyzfqtx.html