美文网首页
Java之线程池

Java之线程池

作者: zhglance | 来源:发表于2020-02-13 11:44 被阅读0次

    1.Executors

    Executors提供的常用线程池:
    线程池的构造方法具体如下:

    
        /**
         * Creates a new {@code ThreadPoolExecutor} with the given initial
         * parameters.
         *
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @param threadFactory the factory to use when the executor
         *        creates a new thread
         * @param handler the handler to use when execution is blocked
         *        because the thread bounds and queue capacities are reached
         * @throws IllegalArgumentException if one of the following holds:<br>
         *         {@code corePoolSize < 0}<br>
         *         {@code keepAliveTime < 0}<br>
         *         {@code maximumPoolSize <= 0}<br>
         *         {@code maximumPoolSize < corePoolSize}
         * @throws NullPointerException if {@code workQueue}
         *         or {@code threadFactory} or {@code handler} is null
         */
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), defaultHandler);
        }
    
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 threadFactory, defaultHandler);
        }
    
        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  RejectedExecutionHandler handler) {
            this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                 Executors.defaultThreadFactory(), handler);
        }
    

    参数说明:

    a.corePoolSize: 线程池中常驻线程数;
    b.maximumPoolSize: 线程池能够容纳的最大线程数,当等待队列满了,才增大存活线程,最大为maximumPoolSize;
    **c.keepAliveTime: **多余空闲线程存活的时间,如果当线程池中线程数量超过corePoolSize时,当有空闲线程的时间超过keepAliveTime时间时,就会将多余的空闲线程销毁,只保留corePoolSize线程;
    d.unit: keepAliveTime的单位;
    e.workQueue: 任务队列,被提交但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种;
    **f. threadFactory: **线程工厂,用于创建线程,一般用默认即可;
    **g. handler: ** 拒绝策略,当任务太多来不及处理时,如何拒绝任务,主要有AbortPolicy,CallerRunsPolicy,DiscardOldestPolicy,DiscardPolicy,NewThreadRunsPolicy等.

    1.1 newFixedThreadPool线程池

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    newFixedThreadPool指定常驻线程数,线程池中的线程数是固定的,但是由于使用了阻塞new LinkedBlockingQueue<Runnable>(),默认最大值长度为Integer.MAX_VALUE(即2 147 483 647),现实业务场景中几乎不会用到这么大的容量,因此如果LinkedBlockingQueue没有设置大小,可以将其视为无界队列。newFixedThreadPool使用无界队列,没有拒绝策略,如果线程等待太多会导致LinkedBlockingQueue队列长度十分长,会导致JVM的OMM。

    1.2 newSingleThreadExecutor线程池

        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    

    newSingleThreadExecutor指定常驻线程数为1,线程池中的线程数是固定的1,,但是由于使用了new LinkedBlockingQueue<Runnable>(),默认最大值长度为Integer.MAX_VALUE(即2 147 483 647),现实业务场景中几乎不会用到这么大的容量,因此如果LinkedBlockingQueue没有设置大小,可以将其视为无界队列。newSingleThreadExecutor使用无界队列,没有拒绝策略,如果线程等待太多会导致LinkedBlockingQueue队列长度十分长,会导致JVM的OMM。

    1.3 newCachedThreadPool线程池

        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    newCachedThreadPool可以动态的调整线程池线程数,核心线程数被设置为0,最大线程数设置为Integer.MAX_VALUE,使用有界阻塞队列SynchronousQueue。由于最大线程数几乎无上限,当主线程提交的速度高于线程池处理的速度的时候,newCachedThreadPool会不断的创建新的线程,极端情况下会导致CPU和内存资源耗尽。

    1.4 newScheduledThreadPool周期性线程池

        /**
         * Creates a thread pool that can schedule commands to run after a
         * given delay, or to execute periodically.
         * @param corePoolSize the number of threads to keep in the pool,
         * even if they are idle
         * @return a newly created scheduled thread pool
         * @throws IllegalArgumentException if {@code corePoolSize < 0}
         */
        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    

    newScheduledThreadPool可以指定延迟时长和周期性执行,使用无界队列DelayQueue。

    1.5 newWorkStealingPool(jdk8新增,ForkJoinPool的扩展)

        /**
         * Creates a work-stealing thread pool using all
         * {@link Runtime#availableProcessors available processors}
         * as its target parallelism level.
         * @return the newly created thread pool
         * @see #newWorkStealingPool(int)
         * @since 1.8
         */
        public static ExecutorService newWorkStealingPool() {
            return new ForkJoinPool
                (Runtime.getRuntime().availableProcessors(),
                 ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                 null, true);
        }
    

    具体原理待完成

    1.6 阻塞队列:

    阻塞队列:
    a.阻塞添加 当队列满了的时候插入队列,会使线程阻塞,当队列不再满时,再唤醒线程执行插入队列;
    b.阻塞删除 当队列为空的时候执行删除,删除线程将会被阻塞,直到队列不再为空的时候再唤起删除线程。

    1.7 线程池的拒绝策略:

    1.AbortPolicy 默认策略,即丢弃任务,抛出异常;

    1. DiscardPolicy 丢弃任务,不抛出异常;
    2. DiscardOldestPolicy 丢弃队列中等待最久的任务,并执行当前任务;
    3. CallerRunsPolicy 当队列满时,新线程提交的直接在主线程中执行(不在线程池中执行),相当于submit被阻塞,后续提交的任务被阻塞,只有主线程执行提交的线程之后,后续的线程才会被提交到线程池中。

    1.8 线程池submit()和execute()的区别

    execute()方法无返回值
    submit() 可以有返回值,如使用Future接收返回值。

    1.9 线程池可以catch到线程的Exception吗

    答案:不能

    1.10 线程池中线程的回收

    1.当从等待队列中获取不到线程的时候(即返回null),就会触发线程尝试回收,当线程数大于核心线程数,且线程空闲时间超过设置的空闲时间阈值,那么就会使用CAS乐观锁触发销毁线程操作,避免并发操作导致保留的线程数小于核心线程数。
    2.当线程池执行了shutdown时,拒绝接受新提交的线程,判断线程池中的线程是否处于空闲状态,如果处于空闲状态则使用CAS回收,否则等待执行完毕再回收。
    3.当线程池执行了shutdownNow时,拒绝接受新提交的线程,同时立马关闭线程池(不使用CAS),线程池里的任务不再执行。

    线程池个数的选择:

    1.计算密集型任务

    线程数 = CPU个数 + 1

    加1的原因是,不会因为某个线程异常或者暂停而导致CPU在执行周期内轮空;

    2.对于IO型或者是阻塞类型

    线程数 = CPU个数 * 2
    或者
    线程数 = CPU个数 * CPU利用率 (0~1之间)* (1 + (等待时间 / 计算时间))

    对于这种情况比较麻烦,需要考虑的因素很多,如线程要链接数据库时,那么数据库连接池可用的连接数限制了线程的个数大小。

    相关文章

      网友评论

          本文标题:Java之线程池

          本文链接:https://www.haomeiwen.com/subject/ovekfhtx.html