美文网首页
JAVA 线程池Executors

JAVA 线程池Executors

作者: 折剑游侠 | 来源:发表于2020-01-03 13:30 被阅读0次

Executors类提供的四种创建线程池方法

private val fix = Executors.newFixedThreadPool(4)
private val cache = Executors.newCachedThreadPool()
private val single = Executors.newSingleThreadExecutor()
private val scheduled = Executors.newScheduledThreadPool(4)

调用的都是ThreadPoolExecutor构造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
}
//corePoolSize 核心线程数 不会被回收释放
//maximumPoolSize 线程池最大线程数
//keepAliveTime 空闲线程等待时间
//unit keepAliveTime时间单位
//workQueue 任务队列,保存Runnable
//threadFactory 创建线程的工厂 默认Executors.defaultThreadFactory()
//handler 线程池阻塞或达到队列容量时的执行策略 默认AbortPolicy()
1. FixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

nThreads参数既是核心线程数也是最大线程数

线程不会被回收一直保存在线程池,一般设置为cpu核心数+1

private val fix = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1)

workQueue:LinkedBlockingQueue

public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
}

public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
}

内部维护了一个循环链表

2. CachedThreadPool
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

核心线程为0,最大线程数Integer.MAX_VALUE无限大,默认等待时间60秒。

意味着线程池可以无限添加线程,60秒内未执行任务的线程会被回收释放,以此实现缓存策略。

workQueue:SynchronousQueue

public SynchronousQueue() {
        this(false);
}

public SynchronousQueue(boolean fair) {
        transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

fair表示公平非公平,公平线程按照FIFO顺序执行。这里默认实现是false,非公平策略。

3. SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}

private static class FinalizableDelegatedExecutorService
            extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
        protected void finalize() {
            super.shutdown();
        }
}

FinalizableDelegatedExecutorService是个代理类,内部通过new ThreadPoolExecutor创建线程池。

核心线程数、最大线程数皆为1。用来执行单个任务。

这里workQueue是LinkedBlockingQueue。

4. ScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE,
              DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
              new DelayedWorkQueue());
}

private static final long DEFAULT_KEEPALIVE_MILLIS = 10L;

corePoolSize核心线程数,最大线程数Integer.MAX_VALUE,默认等待时间10S,与CachedThreadPool相似。

不同的是workQueue,这里是DelayedWorkQueue,顾名思义延迟执行。内部实现了定时执行任务的逻辑,这里就不深究了。

Executors类还有几个生成ExecutorService的静态方法
  • newSingleThreadScheduledExecutor
public static ScheduledExecutorService newSingleThreadScheduledExecutor(){
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
}

核心线程数为1的ScheduledThreadPoolExecutor。

  • unconfigurableExecutorService
public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
        if (executor == null)
            throw new NullPointerException();
        return new DelegatedExecutorService(executor);
}

传入ExecutorService生成代理类DelegatedExecutorService

  • newWorkStealingPool
public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}

//parallelism并行级别 默认为cpu核心数
public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
}

//ForkJoinPool构造方法
public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        this(checkParallelism(parallelism),
             checkFactory(factory),
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
}

private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

//parallelism 并行级别
//factory 创建线程的工厂
//handler 处理出错而终止无法恢复的线程
//asyncMode true队列,false栈,默认为true

WorkStealingPool JDK1.8新增,拆分任务到多线程执行,最后合并返回。

相关文章

网友评论

      本文标题:JAVA 线程池Executors

      本文链接:https://www.haomeiwen.com/subject/ikqzoctx.html