美文网首页
Java线程池

Java线程池

作者: BlackNeko | 来源:发表于2017-01-02 13:58 被阅读144次

Java线程池分析。

创建线程池

ThreadPoolExecutor threadPoolExecutor =
                new ThreadPoolExecutor(2, 4, 1L,
                        TimeUnit.SECONDS, new LinkedBlockingDeque<>(3));

构造函数:

public ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue)

参数分析:

  • corePoolSize : 核心线程数
  • maximumPoolSize : 最大线程数
  • keepAliveTime : 非核心线程闲置时的超时时长,如果设置 ThreadPoolExecutor#allowCoreThreadTimeOut(true),则核心线程也会在执行完任务后,在等待keepAliveTime时间后终止
  • unit : TimeUnit枚举
  • workQueue : 线程池中的任务队列

任务队列实现类:

  • ArrayBlockingQueue : 基于数组结构,FIFO(先进先出)
  • LinkedBlockingQueue : 链表,FIFO
  • SynchronousQueue : 不存储元素,每个插入操作必须等到另一个线程调用移除操作,否则一直处于阻塞状态。Executors.newCachedThreadPool()使用。
  • PriorityBlockingQueue : 有优先级,无阻塞。

此构造函数调用了重载的构造函数,使用了默认的 ThreadFactory 和 RejectedExecutionHandler,最终构造函数:

public ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        ThreadFactory threadFactory,
                        RejectedExecutionHandler handler) 
  • ThreadFactory : 创造线程工厂
  • RejectedExecutionHandler : 饱和策略,线程池满后,新任务添加后的处理

饱和策略:他们都实现RejectedExecutionHandler接口

  • AbortPolicy : 默认实现,直接抛出异常 RejectedExecutionException
  • CallerRunsPolicy : 只用调用者所在的线程来执行任务
  • DiscardOldestPolicy : 丢弃队列里最近的一个任务,并执行当前任务
  • DiscardPolicy : 不处理,丢弃

创建饱和策略实现类:

new ThreadPoolExecutor.AbortPolicy();
new ThreadPoolExecutor.CallerRunsPolicy();
new ThreadPoolExecutor.DiscardOldestPolicy();
new ThreadPoolExecutor.DiscardPolicy();

例如:

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2, 4, 1, TimeUnit.SECONDS, new LinkedBlockingDeque<>(3),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        //create thread 创造线程
                        return new Thread(r, "thread#name");
                    }
                }, new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        //do something
                        //线程池满后,新任务添加后的处理
                    }
        });

最多任务数 = 核心线程数 + 非核心线程数 + 任务队列长度

非核心线程数 = 最大线程数 - 核心线程数

即:

最多任务数 = 最大线程数 + 任务队列长度

线程池中的线程开启顺序

核心线程 -> 任务队列 -> 非核心线程 -> 饱和策略

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    //线程数小于核心线程数,创建线程并执行
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //加入任务队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //开启非核心线程
    else if (!addWorker(command, false)){
        //开启失败,饱和策略
        reject(command);
    }
}

线程池分类

  • FixedThreadPool : 数量固定,只有核心线程,或者说是全是核心线程,没有超时机制,任务队列没有限制
Executors.newFixedThreadPool( int nThreads);

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>());
}
  • CachedThreadPool : 无核心线程,非核心线程可看做无限大,每个线程无任务60s闲置会被回收,任务队列没有限制,适合大量耗时较少的任务。
Executors.newCachedThreadPool();

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                    60L, TimeUnit.SECONDS,
                                    new SynchronousQueue<Runnable>());
}
  • ScheduledThreadPool : 核心线程数固定,非核心线程可看做无限大,任务队列没有限制,适合执行定时任务何具有固定周期的重复任务。
Executors.newScheduledThreadPool( int corePoolSize);

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
            new DelayedWorkQueue());
}
  • SingleThreadExecutor : 保持所有任务都在同一个线程中执行。
Executors.newSingleThreadExecutor();

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

提交任务

无返回结果:

threadPoolExecutor.execute(new Runnable() {
    @Override
    public void run() {
        System.out.println("running " + Thread.currentThread().getName());
    }
});

有返回结果:

Future<String> future = threadPoolExecutor.submit(new Callable<String>() {
    @Override
    public String call() throws Exception {
        String result = "hello bitch!";
        Thread.sleep(3000);
        return result;
    }
});

try {
    String result = future.get();
    System.out.println(result);
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

Future#get() 是阻塞的,在结果返回前会一直等待。

关闭线程池

threadPoolExecutor.shutdown();
threadPoolExecutor.shutdownNow();
threadPoolExecutor.isShutdown();
threadPoolExecutor.isTerminated();
threadPoolExecutor.isTerminating();

shutdown,shutdownNow:遍历工作线程,逐个调用线程的interrupt。

shutdownNow首先将线程池状态设置为stop,尝试停止正在执行的或是暂停任务的线程,返回等待执行任务的列表。

shutdown只是把线程状态设置为SHUTDOWN,中断没有正在执行的线程。

调用shutdown,shutdownNow任意一个方法,isShutdown都会返回true。所有任务都关闭后,isTerminated返回true。

线程池配置

//获取当前设备CPU个数
Runtime.getRuntime().availableProcessors();
  • CPU密集型配置尽可能小的线程,N(cpu) + 1
  • IO 密集型配置尽可能多的线程,N(cpu) * 2
  • 使用有界队列,防止线程开太多

线程池的监控

//线程池中需要执行的任务数量,总任务数量
threadPoolExecutor.getTaskCount();

//已完成
threadPoolExecutor.getCompletedTaskCount();

//曾经创建过的最大线程数量,可用来判断线程池是否满过
threadPoolExecutor.getLargestPoolSize();

//线程池中的线程数量
threadPoolExecutor.getPoolSize();

//获取活动的线程数
threadPoolExecutor.getActiveCount();

参考:
《Java并发编程的艺术》
《Android开发艺术探索》

相关文章

网友评论

      本文标题:Java线程池

      本文链接:https://www.haomeiwen.com/subject/wduwvttx.html