美文网首页
【多线程与并发】:线程池与Executor框架

【多线程与并发】:线程池与Executor框架

作者: maxwellyue | 来源:发表于2017-06-28 22:41 被阅读108次

    为什么要用线程池

    关于为什么要使用多线程,请参考【多线程与并发】:线程的创建、状态、方法中的最后一点。

    那为什么要使用线程池呢?

    ①降低资源消耗:对象的创建和销毁是非常耗时的操作(线程也是一个对象)。通过重复利用已创建的线程降低线程创建和销毁造成的消耗;
    ②提高响应速度:当任务到达时,任务可以不需要等待线程创建就能立即执行;
    ③提高线程的可管理性:线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。


    线程池工作原理

    这里是指jdk中的线程池实现。

    1、线程池的主要处理流程
    线程池处理流程.png
    2、创建线程池
    //ThreadPoolExecutor的构造器
    public ThreadPoolExecutor(int corePoolSize,                                
                              int maximumPoolSize,                             
                              long keepAliveTime,                              
                              TimeUnit unit,                                   
                              BlockingQueue<Runnable> workQueue,               
                              ThreadFactory threadFactory,                     
                              RejectedExecutionHandler handler) {              
        if (corePoolSize < 0 ||                                                
            maximumPoolSize <= 0 ||                                            
            maximumPoolSize < corePoolSize ||                                  
            keepAliveTime < 0)                                                 
            throw new IllegalArgumentException();                              
        if (workQueue == null || threadFactory == null || handler == null)     
            throw new NullPointerException();                                  
        this.corePoolSize = corePoolSize;                                      
        this.maximumPoolSize = maximumPoolSize;                                
        this.workQueue = workQueue;                                            
        this.keepAliveTime = unit.toNanos(keepAliveTime);                      
        this.threadFactory = threadFactory;                                    
        this.handler = handler;                                                
    }                                                                          
    

    参数解释

    • corePoolSize(线程池的基本大小):当提交一个任务时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,直到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreTreads()方法,线程池就会提前创建并启动所有基本线程。

    • maximumPoolSize(线程最大数量):线程池允许创建的最大线程数。如果队列已满,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。但如果使用了无解的任务队列,该参数没有效果。

    • keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。如果任务很多,且每个任务执行时间较短,可调大该值。

    • TimeUnit(线程活动保持时间的单位):keepAliveTime的时间度量单位。可选天、小时、分钟、毫秒、微妙、纳秒。

    • BlockingQueue<Runnable>(任务队列):用于保存等待执行的任务的阻塞嘟列,可以选择以下几个阻塞队列

    • ArrayBlockingQueue:基于数组结构的有界阻塞队列

    • LinkedBlockingQueue:基于链表机构的阻塞队列,吞吐量通常高于ArrayBlockingQueue,静态工厂方法Executors.newFixedThreadPool()使用该队列。

    • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除,否则插入操作一直处于阻塞状态,吞吐量通常高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool()使用该队列。

    • PriorityBlockingQueue:具有优先级的无限阻塞队列。

    • ThreadFactory:创建线程的工厂。

    • RejectedExecutionHandler:饱和策略,即队列和线程池都满了,对于新提交的任务无法执行,这时采取的处理新来的任务的方法,有4种策略可选(也可以自定义策略---实现RejectedExecutionHandler接口,如记录日志或持久化不能处理的任务):

    • CallerRunsPolicy:使用调用者所在的线程来运行任务。

    • AbortPolicy:直接抛出RejectedExecutionException异常。(默认策略)

    • DiscardPolicy:对新任务直接丢弃,不做任何事情

    • DiscardOldestPolicy:丢掉队列里最近(the oldest unhandled)的一个任务,并执行当前新任务。

    3、向线程池提交任务

    有两种方式将任务提交给线程池来执行

    • execute()
      用于提交不需要返回值的任务,所以无法判断任务是否被线程执行成功。
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 20,
                    30, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>());
    threadPool.execute(new Runnable() {
                public void run() {
                    //do something
                }
    });
    
    • submit()
      提交需要返回值的任务。线程池会返回一个Future类型的对象,通过这个对象可以判断任务是否执行成功,并且可以通过Future对象的get()方法来获取返回值。但get()方法会阻塞当前线程直到任务完成,使用get(long timeout, TimeUnit unit)方法会阻塞当前线程一段时间后立即返回(此时任务可能还没有执行完)。
     //有结果的任务
    class TaskWithResult implements Callable<String>{
            @Override
            public String call() throws Exception {
                return "返回:我是实现有结果的任务";
            }
    }
    @org.junit.Test
    public void test() throws ExecutionException, InterruptedException {
            ExecutorService threadPool = Executors.newFixedThreadPool(3);
            Future<String> future = threadPool.submit(new TaskWithResult());
            System.out.println(future.isDone() ? "执行完了" : "没执行完呢");
            System.out.println(future.get());
            System.out.println(future.isDone() ? "执行完了" : "没执行完呢");
    }
    //输出为:
    没执行完呢
    返回:我是实现有结果的任务
    执行完了
    
    4、关闭线程池

    调用线程池的两个方法来关闭shutdown()或者shutdownNow():遍历线程池中的工作线程,然后逐个调用线程的interupt()方法中断线程,所以无法响应中断的任务可能永远无法终止。

    • shutdownNow()
      不允许添加新的任务。立刻关闭线程池。不管池中是否还存在正在运行的任务。关闭顺序是先尝试关闭当前正在运行的任务。然后返回待完成任务的清单。已经运行的任务则不返回。(首先将线程池的状态设置为STOP,然后尝试终止所有的线程(包括正在执行任务或暂停任务的),并返回等待执行任务的列表;)
    • shutdown()
      不允许添加新的任务,等池中所有的任务执行完毕之后再关闭线程池。
      (只是将线程池的状态设置为SHUTDOWN,然后中断所有没有正在执行任务的线程。)

    //todo 有待验证shutdown()和shutdownNow()的区别


    线程池框架Executor

    1、简介

    Executor是(since)JDK1.5实现的线程池技术。

    先看Executor框架的主要类与接口

    Executor主要类与接口.png Future.png

    Executor主要可以分为3个部分:

    • 任务对象的创建:实现Runnable接口或实现Callable接口
    • 任务的执行:接口ExecutorService、两个实现类ThreadPoolExecutorScheduledThreadPoolExecutor
    • 异步计算的结果:接口Future以及实现类FutureTask

    2、任务对象
    • 两种方式创建任务对象:实现Runnable接口或实现Callable接口。
    • 两者的区别:Runnable不会返回结果,Callable可以返回结果;Callable的call方法可抛出异常,而Runnable的run方法不能抛出异常
    • Runnable可以包装为Callable:通过Executors工具类提供的两个方法
    public static <T> Callable<T> callable(Runnable task, T result);
    public static Callable<Object> callable(Runnable task);
    

    3、任务的执行

    任务的执行是由两个实现类完成的:ThreadPoolExecutorScheduledThreadPoolExecutor
    前面介绍线程池工作过程就是以ThreadPoolExecutor为例进行的。在实际使用中,通常使用工具类Executors创建不同类型的ThreadPoolExecutor
    ScheduledThreadPoolExecutorThreadPoolExecutor类的子类,相当于特定功能的扩展:在给定的延迟之后运行任务或者定期执行任务。它与Timer的功能类似,但更强大、更灵活。Timer对应的是单个后台线程,而ScheduledThreadPoolExecutor可以指定多个线程数。

    • ThreadPoolExecutor
      工具类Executors可以创建3中类型的ThreadPoolExecutor,分别如下。
    • FixedThreadPool:可重用、固定线程数的线程池。
    public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
    }
    

    可以看出,
    ①FixedThreadPool的corePoolSize和maxmumPoolSize都被设置为
    nThreads。
    ②keepAliveTime设为0,表示某工作线程一旦空闲,就立即关闭该工作线程。
    ③使用无界队列LinkedBlockingQueue,当线程池中的线程数达到corePoolSize后,新任务将会在无界队列中等待,因此线程数永远不会超过corePoolSize。
    FixedThreadPool适用于为了满足资源管理的需求,而需要限制当前线程数量的场景,比如负载较重的服务器。

    • SingleThreadExecutor:只有一个线程的线程池。
    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    

    参数与FixedThreadPool的区别仅在于corePoolSize和maxmumPoolSize均为1,keepAliveTime和使用的阻塞队列都一样,特性类似,可以概括为:当有新任务时,如果线程池中没有线程,则创建一个线程,之后来的任务都存储在无界队列LinkedBlockingQueue中。该线程一直从队列中取任务执行。假如任务都执行完毕,立即终止该线程。
    SingleThreadExecutor适用于需要保证顺序地执行各个任务,并且在任意时间点,不会有多个线程是活动的场景。

    • CachedThreadPool:根据需要创建新的线程
    public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
    }
    

    ①使用无容量队列SynchronousQueue,但maxmumPoolSize无界。如果提交任务的速度大于线程处理任务的速度,将会不断创建新线程,极端情况会因为创建过多线程而耗尽CPU资源。
    ②keepAliveTime为60s,空闲线程超过该时间将会终止。
    ③执行完任务的某线程会执行SynchronousQueue.poll()从队列中取任务,这个取的动作会持续60s,如果在60s内有新的任务,则执行新的任务,没有任务则终止线程。因此长时间保持空闲的CachedThreadPool不会占用任何资源。
    ④当有任务提交时,a.如果当前线程池为空或者已创建的线程都正在处理任务,则CachedThreadPool会创建新线程来执行该任务。b.如果当前线程池有空闲的线程(正在执行阻塞方法SynchronousQueue.poll()),则将任务交给该等待任务的空闲线程来执行。
    CachedThreadPool适用于执行很多的短期异步任务的小程序或者是负载较轻的服务器。

    • ScheduledThreadPoolEecutor
      先记住它是用来执行定期任务或者在给定延迟时间之后执行任务。其他待深入。

    4、异步结果的获取

    主要是通过接口Future和实现类FutureTask。

    • Future
      Future代表了一个异步计算的结果。
    public interface Future<V> {
    //取消当前任务,如果任务已经完成,就会取消失败,返回false;
    //如果取消成功,并且在调用该方法之前对应的任务还没有开始,
    //则该任务永远也不会执行。如果任务正在执行,
    //参数mayInterruptIfRunning设为true则表示将正在执行该任务的线程终止,
    //参数mayInterruptIfRunning设为false则表示会等待任务完成。
    //该方法返回true或false之后,之后的isDone()方法会返回true;
    //如果该方法返回true,之后的isCancelled()方法也会返回true,
    boolean cancel(boolean mayInterruptIfRunning);
    //
    //对应的任务是否在完成之前就取消了
    boolean isCancelled();
    //任务是否完成
    boolean isDone();
    //
    //获取计算结果,如果任务还没执行完成,则会阻塞当前线程(调用该方法所在的线程),直到任务完成
    V get() throws InterruptedException, ExecutionException;
    //最多等待timeout就尝试取回结果
    V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    

    JDK文档上给出的示例:

    interface ArchiveSearcher { String search(String target); }
    class App {
           ExecutorService executor = ...
           ArchiveSearcher searcher = ...
           public void showSearch(final String target)throws InterruptedException {
                 Future<String> future= executor.submit(new Callable<String>() {
                      public String call() {
                          return searcher.search(target);
                      }
                  });
                 displayOtherThings(); // do other things while searching
                 try {
                      displayText(future.get()); // use future
                 } catch (ExecutionException ex) { cleanup(); return; }
           }
    }
    
    • FutureTask
      FutureTask继承关系图.png

    FutureTask就像它的名字一样,既有Future的特点(实现Future接口),又具有任务的特点(实现Runnable接口)。更直白的理解是,FutureTask就是一种特殊的任务的描述类,利用FutureTask创建的任务可以获取计算结果。
    FutureTask表示一个可取消的异步计算,并通过实现Future接口来开始或取消一个计算、查看计算是否完成、获取计算结果。如果计算还没有完成,调用FutureTask的get方法会阻塞当前线程(调用get方法所在的线程)。
    FutureTask可以用来包装Callable和Runnable对象,因为实现了Runnable接口,所以FutureTask可以提交给Executor来执行(不提交就调用自己的run方法,也可以执行计算)。

    FutureTask<String> future = new FutureTask<String>(new Callable<String>() {
           public String call() {
           return searcher.search(target);
    }});
    executor.execute(future);
    

    作为一个独立的类,该类提供了很多protected的方法,以便创建你自己的定制任务类。

    //todo 具体实现


    说明:内容大多数摘抄自《Java并发编程的艺术》

    相关文章

      网友评论

          本文标题:【多线程与并发】:线程池与Executor框架

          本文链接:https://www.haomeiwen.com/subject/jzlycxtx.html