美文网首页
Java线程总结 之 ThreadPool 线程池

Java线程总结 之 ThreadPool 线程池

作者: TTLLong | 来源:发表于2020-07-27 19:02 被阅读0次

    线程脑图

    多线程.png

    ThreadPool 线程池

    线程池的构造参数
    1. corePoolSize:核心线程数
    2. maximumPoolSize:最大线程数
    3. keepAliveTime:当线程数,大于核心线程数时,多出来的线程所存活的时间,超过该时间将自动销毁
    4. unit:keepAliveTime的单位(TimeUnit.SECONDS等)
    5. workQueue:阻塞队列,用于存放超过核心线程数的Runnable。
    6. threadFactory:创建线程的工厂类,实现了ThreadFactory接口。线程池的创建线程的逻辑,就是用了该接口的newThread方法
    7. handler:拒绝策略,当超过队列的容量和最大线程数时。应当采取的策略。AbortPolicy,DiscardPolicy,DiscardOldPolicy,CallerRunsPolicy
    

    ---------------------------------------------------------------------线程池构造函数源码---------------------------------------------------------------------

        /**
         * Creates a new {@code ThreadPoolExecutor} with the given initial
         * parameters.
         *
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @param threadFactory the factory to use when the executor
         *        creates a new thread
         * @param handler the handler to use when execution is blocked
         *        because the thread bounds and queue capacities are reached
         * @throws IllegalArgumentException if one of the following holds:<br>
         *         {@code corePoolSize < 0}<br>
         *         {@code keepAliveTime < 0}<br>
         *         {@code maximumPoolSize <= 0}<br>
         *         {@code maximumPoolSize < corePoolSize}
         * @throws NullPointerException if {@code workQueue}
         *         or {@code threadFactory} or {@code handler} is null
         */
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ?
                    null :
                    AccessController.getContext();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    
    ThreadPoolExecutor工作原理
    1. 如果工作线程数,小于设定的核心线程数则创建核心线程来执行任务
    2. 如果核心数达到上限,将Runnable加入创建ThreadPoolExecutor时的阻塞队列(workQueue)
    3. 阻塞队列达到容量上限,创建非核心线程,执行任务
    4. 工作线程数量达到最大值(maximumPoolSize),执行拒绝策略(handler)
    

    [图片上传失败...(image-aa80be-1595847718289)]

        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            /*
             * Proceed in 3 steps:
             *
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             *
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             *
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
             */
             
             //ctl 为 AtomicInteger,用来记录当前线程状态和线程数量
            int c = ctl.get();
            // 如果工作线程数,小于设定的核心线程数则创建核心线程来执行该任务
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))//添加任务到核心线程 创建线程在addWorker中通过Work类执行了ThreadFactory的newThread()方法
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {//能够添加任务到队列中。若队列中的元素达到队列上限,offer方法将返回false
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))//如果当前线程池的状态为不可运行状态,就将刚才的任务从队列中移除,并对该任务执行拒绝策略
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))//添加任务到非核心线程
                reject(command);//如果失败,执行设置好的拒绝策略
        }
    
    Executor框架:
    ThreadPoolExecutor:
    1. newSingleThreadExecutor:创建一个单线程的线程池,只有一个线程,存放任务的队列容量为Integer.Max
    2. newFixedThreadPool:创建指定线程数N的线程池,核心线程数和最大线程数都为N,即线程数限制为N,存放任务的队列容量为Integer.Max.
    3. newCachedThreadPool:创建一个线程池,该线程池根据需要创建新线程,但是将在可用时重新使用先前构造的线程。最大线程数为Integer.Max。存放任务的队列为,SynchronousQueue,它是一个不存储任务的阻塞队列,如果队列中有任务,则阻塞住请求的线程,直到创建新的线程,或者空闲的线程将这个任务取走执行。该线程池设置了keepAliveTime(60秒),空闲线程超出该规定时间就会自行销毁。
    
    ScheduledThreadPoolExecutor:
    可以理解未,能够执行延时任务的一个ThreadPoolExecutor。schedule方法可以设置执行任务时的延时时间。
    
    ForkJoinPool:
    ForkJoinPool采用工作窃取算法,将一个大任务根据阈值分割成很多个子任务,最后根据场景是否要合并子任务运算结果;
    
    线程池状态:
    阿里推荐的ThreadFactory是什么:
    1. ThreadFactory是一个接口,里面有newThread方法。线程池里线程的创建,最终对是调用该方法创建的。
    2. 线程池默认的DefaultThreadFactory:从代码中可以看出,线程池中默认的线程工厂类,给每个线程设置了group,name,非守护线程,以及线程优先级。我们可以模仿该类,实现自己的ThreadFactory
    
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
    
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
    
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
    
    shutDown和shutDownNow:
    1. shutDown:停止所有未阻塞的线程。源码interruptIdleWorkers()中的w.tryLock()。能请求到这把锁时,执行interrup()方法中断。
        public void shutdown() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                advanceRunState(SHUTDOWN);
                interruptIdleWorkers();
                onShutdown(); // hook for ScheduledThreadPoolExecutor
            } finally {
                mainLock.unlock();
            }
            tryTerminate();
        }
        
        private void interruptIdleWorkers() {
            interruptIdleWorkers(false);
        }
        
        private void interruptIdleWorkers(boolean onlyOne) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers) {
                    Thread t = w.thread;
                    if (!t.isInterrupted() && w.tryLock()) {
                        try {
                            t.interrupt();
                        } catch (SecurityException ignore) {
                        } finally {
                            w.unlock();
                        }
                    }
                    if (onlyOne)
                        break;
                }
            } finally {
                mainLock.unlock();
            }
        }
        
        
        //Work中的 trylock方法
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    
    1. shutdownNow:停止所有正在执行的任务(interrupt)。返回所有还没有执行的任务,并将还没有执行的任务,从队列中删除。
        public List<Runnable> shutdownNow() {
            List<Runnable> tasks;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                advanceRunState(STOP);
                interruptWorkers();
                tasks = drainQueue();
            } finally {
                mainLock.unlock();
            }
            tryTerminate();
            return tasks;
        }
        
        /**
         * Interrupts all threads, even if active. Ignores SecurityExceptions
         * (in which case some threads may remain uninterrupted).
         */
        private void interruptWorkers() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers)
                    w.interruptIfStarted();
            } finally {
                mainLock.unlock();
            }
        }
        
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    
    拒绝策略:
    概念:当队列的任务满了,运行的线程数也达到了最大线程数时,此时如果再来任务的话,就会按照用户的拒绝策略来执行。
    1. AbortPolicy: 拒绝任务,直接抛异常
    2. DiscardPolicy: 抛弃该请求任务
    3. DiscardOldPolicy: 抛弃最久的没被处理的请求任务
    4. CallerRunsPolicy: 由当前线程,执行此任务

    相关文章

      网友评论

          本文标题:Java线程总结 之 ThreadPool 线程池

          本文链接:https://www.haomeiwen.com/subject/vjrtrktx.html