美文网首页
java线程池学习

java线程池学习

作者: 夏天嘚花花 | 来源:发表于2019-01-17 16:43 被阅读0次

    1.java中线程池的结构

    image.png

    1.Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型。
    2.ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等
    3.抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法
    4.ThreadPoolExecutor继承了类AbstractExecutorService,对一些方法进行了重写,并提供了getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法。

    2.为什么要使用线程池

    • 使用线程池可以减少创建和销毁线程的次数,例如一个任务的执行时间包括T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。当多个任务时,使用线程池可以减少T1,T3的时间
    • 可以根据系统的承受能力,调整线程池中工作线程的数量,防止因为消耗过多内存导致服务器崩溃

    3.线程池的原理

    #线程池的创建
    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) 
    #ThreadPoolExecutor threadPoolExecutor = = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<Runnable>());
    

    参数详解

    corePoolSize - 池中所保存的线程数,包括空闲线程。
    maximumPoolSize - 池中允许的最大线程数。
    keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
    unit - keepAliveTime 参数的时间单位。
    workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute方法提交的 Runnable任务。
    threadFactory - 执行程序创建新线程时使用的工厂。
    handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。

    线程池执行任务的流程

    image.png
    //核心线程数
        private static int corePoolSize = 3;
        // 最大线程数量
        private static int maxPoolSize = 5;
        // 线程存活时间:当线程数量超过corePoolSize时,10秒钟空闲即关闭线程
        private static int keepAliveTime = 10000;
        
        private static ThreadPoolExecutor threadPoolExecutor = null;
    
        static {
            threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<Runnable>(5));
        }
    

    如图创建了一个核心线程数为3,最大线程数量为5,队列长度为5的线程池。
    0.一个任务进来时候
    1.判断核心线程数是否已经都有任务在执行,没有则在核心线程数上执行,执行完后核心线程数空闲(核心线程不会关闭);核心线程数都已经有任务在执行则进入步骤2。
    2.判断当前队列是否满,没有则加入到队列中,等待核心线程有空闲时会从队列中取出执行;有满则执行步骤3
    3.判断当前是否已经达到最大线程数的数量,没有则新建一个额外的线程执行(这个额外的线程的任务执行完后,如果keepAliveTime时间内没有新的任务则会关闭,如果有则会继续执行);有满则执行handler饱和策咯处理。

    keepAliveTime和maximumPoolSize及BlockingQueue的类型均有关系。如果BlockingQueue是无界的,那么永远不会触发maximumPoolSize,自然keepAliveTime也就没有了意义

    线程池的队列详解

    BlockingQueue的实现类如下


    image.png

    queue上的三种类型。

    排队有三种通用策略:

    直接提交。工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

    无界队列。使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。

    有界队列。当使用有限的 maximumPoolSizes时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。

    handler饱和策略

    当队列和线程池都满了,说明线程池处于饱和状态,那么必须对新提交的任务采用一种特殊的策略来进行处理。这个策略默认配置是AbortPolicy,表示无法处理新的任务而抛出异常。JAVA提供了4中策略:
    1、AbortPolicy:直接抛出异常
    2、CallerRunsPolicy:只用调用所在的线程运行任务
    3、DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
    4、DiscardPolicy:不处理,丢弃掉。

    4.Executors类

    1.newSingleThreadExecutor

     public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

    #创建一个3个核心线程数的线程池,前三个任务会进入核心线程进行执行,后面7个放到队列中
    #因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字。
    public static void newFixedThreadPoolTest(){
            ExecutorService executorService = Executors.newFixedThreadPool(3);
            for(int i = 0 ;i <=10 ; i++){
                final int index = i;
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println(index);
                        try {
                            TimeUnit.SECONDS.sleep(2);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
            executorService.shutdown();
        }
    

    2.newSingleThreadExecutor

        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    

    创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行

    #创建一个核心线程数为1,最大线程数为1的线程池,每次只有一个任务可以执行
    #输出结果为每隔2秒输出
    public static void newSingleThreadExecutorTest(){
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            for(int i = 0 ;i <=10 ; i++){
                final int index = i;
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println(index);
                        try {
                            TimeUnit.SECONDS.sleep(2);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
            executorService.shutdown();
        }
    

    3.newCachedThreadPool

        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
    注意此线程池的核心线程数为0,最大线程数为无限,空闲超时时间60秒,队列采用SynchronousQueue。这里有几个注意点,SynchronousQueue属于直接提交的队列,在某次添加元素必须等待其他线程取走后才能继续添加。核心线程数为0表示每次都会创建一个新的额外线程去执行,keeptime为60秒表示,新的额外线程空闲60秒后自动收回。捋一捋工作流程。假如现在有10个任务,当第一个任务来时候发现核心线程数为0,则启动一个额外线程去执行。此时第二个任务过来有两种情况:1.第一个任务已经结束,则第二个任务直接使用执行第一个任务时候创建的额外线程。2,第一个任务没结束,则又创建一个额外的线程。这就是可灵活回收空闲线程,若无可回收则新建。
    使用newCachedThreadPool有两个好处:1.可灵活回收空闲线程2.能保证任务的顺序,因为SynchronousQueue必须等到恰线程取走后才能继续添加。如果你的任务A1,A2有内部关联,A1需要先运行,那么先提交A1,再提交A2,当使用SynchronousQueue我们可以保证,A1必定先被执行,在A1么有被执行前,A2不可能添加入queue中

    #线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。
    public static void newCachedThreadPoolTest(){
            ExecutorService executorService = Executors.newCachedThreadPool();
            for(int i = 0 ; i < 10 ; i++){
                final int index = i;
                try{
                    TimeUnit.SECONDS.sleep(i);
                }catch (Exception e){
                    e.printStackTrace();
                }
                executorService.execute(new Runnable() {
                    @Override
                    public void run() {
                        System.out.println(index);
                    }
                });
            }
        }
    

    4.newScheduledThreadPool

     public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    

    创建一个定长线程池,支持定时及周期性任务执行

    线程池 Future

    import com.google.common.collect.Lists;
    import org.apache.commons.lang3.RandomUtils;
    import java.util.List;
    import java.util.concurrent.*;
    public class ExecutorFuture {
        private static final ExecutorService executorService = Executors.newFixedThreadPool(5);
        private static final List<Future<String>> futureList = Lists.newArrayList();
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            for(int i = 0 ; i < 100 ; i++){
                Mycallable mycallable = new Mycallable();
                futureList.add(executorService.submit(mycallable));
            }
            executorService.shutdown();
            for(Future<String> future : futureList){
                String value = future.get(); //会阻塞获取每个线程返回的值
                System.out.println(value);
            }
        }
        public static class Mycallable implements Callable<String>{
            @Override
            public String call() throws Exception {
                TimeUnit.SECONDS.sleep(1);
                return String.valueOf(RandomUtils.nextInt(1,100));
            }
        }
    }
    

    选择线程池并发线程数

    O密集型=2Ncpu(可以测试后自己控制大小,2Ncpu一般没问题)(常出现于线程中:数据库数据交互、文件上传下载、网络数据传输等等)
    计算密集型=Ncpu(常出现于线程中:复杂算法)
    java中:Ncpu=Runtime.getRuntime().availableProcessors()

    参考

    Java-线程池专题
    java线程池技术(一):ThreadFactory与BlockingQueue
    Java并发编程:线程池的使用

    相关文章

      网友评论

          本文标题:java线程池学习

          本文链接:https://www.haomeiwen.com/subject/mthmdqtx.html