美文网首页爱编程,爱生活
Java Consurrency 《Thread Pool》

Java Consurrency 《Thread Pool》

作者: 熬夜的猫头鹰 | 来源:发表于2018-06-16 21:48 被阅读10次

    线程池优点

    Java是支持多线程的,而线程在应用当中是稀缺资源,所以在编写程序的时候需要特别注意合理的利用线程。

    • 降低资源的消耗
    • 提高响应速度
    • 线程可管理 可复用

    Java的线程管理是在java.util.concurrent下,
    接口Executor提供了执行已提交的 Runnable 任务的对象的方法。此接口解耦了任务提交和每个任务将如何运行的机制(包括线程使用的细节、调度等)。通常使用 Executor 而不是显式通过 new Thread(new(RunnableTask())).start()创建线程。

    Executor executor = anExecutor;
    executor.execute(new RunnableTask1());
    executor.execute(new RunnableTask2());
    
    

    不过, Executor 接口并没有严格地要求执行是异步的。在最简单的情况下,执行程序可以在调用者的线程中立即运行已提交的任务。如:

     class DirectExecutor implements Executor {
         public void execute(Runnable r) {
             r.run();
         }
     }
     
    

    更常见的是,任务是在某个不是调用者线程的线程中执行的。以下执行程序将为每个任务生成一个新线程

      class ThreadPerTaskExecutor implements Executor {
         public void execute(Runnable r) {
             new Thread(r).start();
         }
     }
     
    

    Executor已知的子接口以及实现类架构图:

    [图片上传失败...(image-7ee135-1529156779808)]

    下面这件介绍一下子接口、子类的用途:

    ExecutorService

    ExecutorService 提供了管理终止的方法,以及可为跟踪一个或多个异步任务执行状况而生成 Future 的方法。

    可以关闭 ExecutorService,这将导致其拒绝新任务。提供两个方法来关闭 ExecutorService。shutdown() 方法在终止前允许执行以前提交的任务,而 shutdownNow() 方法阻止等待任务启动并试图停止当前正在执行的任务。在终止时,执行程序没有任务在执行,也没有任务在等待执行,并且无法提交新任务。应该关闭未使用的 ExecutorService 以允许回收其资源。

    通过创建并返回一个可用于取消执行和/或等待完成的 Future,方法 submit 扩展了基本方法 Executor.execute(Runnable)。方法 invokeAny 和 invokeAll 是批量执行的最常用形式,它们执行任务 collection,然后等待至少一个,或全部任务完成(可使用 ExecutorCompletionService 类来编写这些方法的自定义变体)

    • 方法列表
    ExecutorService#shutdown
    ExecutorService#shutdownNow
    ExecutorService#isShutdown
    ExecutorService#isTerminated
    ExecutorService#awaitTermination
    ExecutorService#submit(Callable<T>)
    ExecutorService#submit(Runnable, T)
    ExecutorService#submit(Runnable)
    ExecutorService#invokeAll(java.util.Collection<? extends Callable<T>>)
    ExecutorService#invokeAll(java.util.Collection<? extends Callable<T>>, long, TimeUnit)
    ExecutorService#invokeAny(java.util.Collection<? extends Callable<T>>)
    ExecutorService#invokeAny(java.util.Collection<? extends Callable<T>>, long, TimeUnit)
    
    

    *** submit方法返回一个Future对象,对这对象可以设置超时时间 ***

    • 实现一个线程支持执行的DEMO
    public class Main {
    
        static ExecutorService executorService = Executors.newFixedThreadPool(5);
    
        public static void main(String[] args) {
            executorService.execute(new OpenTV());
        }
    
        static class OpenTV implements Runnable{
    
            @Override
            public void run() {
                System.err.println("i have open the tv");
            }
        }
    
    }
    
    

    Doc上列举的实例:

    class NetworkService implements Runnable {
        private final ServerSocket serverSocket;
        private final ExecutorService pool;
    
        public NetworkService(int port, int poolSize)
            throws IOException {
          serverSocket = new ServerSocket(port);
          pool = Executors.newFixedThreadPool(poolSize);
        }
     
        public void run() { // run the service
          try {
            for (;;) {
              pool.execute(new Handler(serverSocket.accept()));
            }
          } catch (IOException ex) {
            pool.shutdown();
          }
        }
      }
    
      class Handler implements Runnable {
        private final Socket socket;
        Handler(Socket socket) { this.socket = socket; }
        public void run() {
          // read and service request on socket
        }
     }
     
    

    下列方法分两个阶段关闭 ExecutorService。第一阶段调用 shutdown 拒绝传入任务,然后调用 shutdownNow(如有必要)取消所有遗留的任务:

     void shutdownAndAwaitTermination(ExecutorService pool) {
       pool.shutdown(); // Disable new tasks from being submitted
       try {
         // Wait a while for existing tasks to terminate
         if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
           pool.shutdownNow(); // Cancel currently executing tasks
           // Wait a while for tasks to respond to being cancelled
           if (!pool.awaitTermination(60, TimeUnit.SECONDS))
               System.err.println("Pool did not terminate");
         }
       } catch (InterruptedException ie) {
         // (Re-)Cancel if current thread also interrupted
         pool.shutdownNow();
         // Preserve interrupt status
         Thread.currentThread().interrupt();
       }
     }
     
    

    整体的测试类

    public class ExecutorServiceMain {
    
        //创建一个线程池
        static ExecutorService pool = Executors.newFixedThreadPool(5);
    
        public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
    //        pool.execute(new OpenTV());
            Future<String> openTvFuture = pool.submit(new OpenDoor());
            System.err.println(openTvFuture.get(6L,TimeUnit.SECONDS));
            shutDown(pool);
        }
    
        static class OpenTV implements Runnable{
    
            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.err.println("I have open the tv");
            }
        }
    
        static class OpenDoor implements Callable<String>{
    
            @Override
            public String call() throws Exception {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return "I love java";
            }
        }
    
        private static void shutDown(ExecutorService pool){
            pool.shutdown();
            try {
                if(!pool.awaitTermination(6,TimeUnit.SECONDS)){
                    pool.shutdownNow();
                }
                if(!pool.awaitTermination(6,TimeUnit.SECONDS)){
                    System.err.println("something is wrong");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                pool.shutdownNow();
                Thread.interrupted();
    
            }
    
        }
    
    
    }
    
    

    ScheduledExecutorService

    一个 ExecutorService,可安排在给定的延迟后运行或定期执行的命令

    schedule 方法使用各种延迟创建任务,并返回一个可用于取消或检查执行的任务对象。scheduleAtFixedRate 和 scheduleWithFixedDelay 方法创建并执行某些在取消前一直定期运行的任务。

    用 Executor.execute(Runnable) 和 ExecutorService 的 submit 方法所提交的命令,通过所请求的 0 延迟进行安排。schedule 方法中允许出现 0 和负数延迟(但不是周期),并将这些视为一种立即执行的请求。

    所有的 schedule 方法都接受相对 延迟和周期作为参数,而不是绝对的时间或日期。将以 Date 所表示的绝对时间转换成要求的形式很容易。例如,要安排在某个以后的 Date 运行,可以使用:schedule(task, date.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)。但是要注意,由于网络时间同步协议、时钟漂移或其他因素的存在,因此相对延迟的期满日期不必与启用任务的当前 Date 相符。 Executors 类为此包中所提供的 ScheduledExecutorService 实现提供了便捷的工厂方法。

    • 方法列表
    ScheduledExecutorService#schedule(Runnable, long, TimeUnit)
    ScheduledExecutorService#schedule(Callable<V>, long, TimeUnit)
    ScheduledExecutorService#scheduleAtFixedRate
    ScheduledExecutorService#scheduleWithFixedDelay
    
    

    ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)

    创建并执行一个在给定初始延迟后首次启用的定期操作,后续操作具有给定的周期;也就是将在 initialDelay 后开始执行,然后在 initialDelay+period 后执行,接着在 initialDelay + 2 * period 后执行,依此类推。如果任务的任何一个执行遇到异常,则后续执行都会被取消。否则,只能通过执行程序的取消或终止方法来终止该任务。如果此任务的任何一个执行要花费比其周期更长的时间,则将推迟后续执行,但不会同时执行。

    ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)

    创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。如果任务的任一执行遇到异常,就会取消后续执行。否则,只能通过执行程序的取消或终止方法来终止该任务。

    • Demo
    
    public class SchduledExecutorMain {
    
        private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ScheduledFuture<String> scheduled = scheduler.schedule(new RollTheBell(), 5, TimeUnit.SECONDS);
            System.err.println(scheduled.get());
    
            Job job = new Job();
            ScheduledFuture<?> jobScheduled = scheduler.scheduleAtFixedRate(job, 10, 10, TimeUnit.SECONDS);
            System.err.println(jobScheduled.get());
    //        ExecutorServiceMain.shutDown(scheduler);
        }
    
    
        static class RollTheBell implements Callable<String> {
    
            @Override
            public String call() throws Exception {
                System.out.println("我是 5秒之后执行的");
                return "I love java";
            }
        }
    
        static class Job implements Runnable {
    
            public void run() {
                System.out.println("十秒之后每十秒执行一次");
            }
    
        }
    }
    
    
    

    AbstractExecutorService

    提供 ExecutorService 执行方法的默认实现。此类使用 newTaskFor 返回的 RunnableFuture 实现 submit、invokeAny 和 invokeAll 方法,默认情况下,RunnableFuture 是此包中提供的 FutureTask 类。例如,submit(Runnable) 的实现创建了一个关联 RunnableFuture 类,该类将被执行并返回。子类可以重写 newTaskFor 方法,以返回 FutureTask 之外的 RunnableFuture 实现。

    • 提供的方法列表
    
    AbstractExecutorService#newTaskFor(Runnable, T)
    AbstractExecutorService#newTaskFor(Callable<T>)
    AbstractExecutorService#submit(Runnable)
    AbstractExecutorService#submit(Runnable, T)
    AbstractExecutorService#submit(Callable<T>)
    AbstractExecutorService#doInvokeAny
    AbstractExecutorService#invokeAny(java.util.Collection<? extends Callable<T>>)
    AbstractExecutorService#invokeAny(java.util.Collection<? extends Callable<T>>, long, TimeUnit)
    AbstractExecutorService#invokeAll(java.util.Collection<? extends Callable<T>>)
    AbstractExecutorService#invokeAll(java.util.Collection<? extends Callable<T>>, long, TimeUnit)
    
    
    

    ThreadPoolExecutor

    ThreadPoolExecutor继承了AbstractExecutorService,总共提供了四个构造函数

    
    ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
    ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>, ThreadFactory)
    ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>, RejectedExecutionHandler)
    ThreadPoolExecutor(int, int, long, TimeUnit, BlockingQueue<Runnable>, ThreadFactory, RejectedExecutionHandler)
    
    

    构造函数主要提供以下几个参数:

    • corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。

    • BlockingQueue

      • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
      • LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
      • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
      • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
    • maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
    • ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。
    • RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。
      • AbortPolicy:直接抛出异常。
      • CallerRunsPolicy:只用调用者所在线程来运行任务。
      • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
      • DiscardPolicy:不处理,丢弃掉。
        当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。
    • keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
    • TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

    线程池提交任务

    • 一种是无返回值
    
    pool.execute(new Runnable() {
                @Override
                public void run() {
                    // do something
                }
            });
    
    
    • 一种是有返回值

    我们也可以使用submit 方法来提交任务,它会返回一个future,那么我们可以通过这个future来判断任务是否执行成功,通过future的get方法来获取返回值,get方法会阻塞住直到任务完成,而使用get(long timeout, TimeUnit unit)方法则会阻塞一段时间后立即返回,这时有可能任务没有执行完。

    Future<Object> future = executor.submit(harReturnValuetask);
    try {
         Object s = future.get();
    } catch (InterruptedException e) {
        // 处理中断异常
    } catch (ExecutionException e) {
        // 处理无法执行任务异常
    } finally {
        // 关闭线程池
        executor.shutdown();
    }
    
    

    线程池的关闭

    public static void shutDown(ExecutorService pool){
            pool.shutdown();
            try {
                if(!pool.awaitTermination(6,TimeUnit.SECONDS)){
                    pool.shutdownNow();
                }
                if(!pool.awaitTermination(6,TimeUnit.SECONDS)){
                    System.err.println("something is wrong");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                pool.shutdownNow();
                Thread.interrupted();
    
            }
    
        }
    

    线程池的工作原理分析

    [图片上传失败...(image-4dd8a2-1529156779808)]

    从上图我们可以看出,当提交一个新任务到线程池时,线程池的处理流程如下:

    • 首先线程池判断基本线程池是否已满?没满,创建一个工作线程来执行任务。满了,则进入下个流程。
    • 其次线程池判断工作队列是否已满?没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。
    • 最后线程池判断整个线程池是否已满?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略来处理这个任务。

    合理的配置线程池

    要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:

    • 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
    • 任务的优先级:高,中和低。
    • 任务的执行时间:长,中和短。
    • 任务的依赖性:是否依赖其他系统资源,如数据库连接。

    任务性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务配置尽可能小的线程,如配置Ncpu+1个线程的线程池。IO密集型任务则由于线程并不是一直在执行任务,则配置尽可能多的线程,如2Ncpu。混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。我们可以通过Runtime.getRuntime().availableProcessors()* 方法获得当前设备的CPU个数。

    优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。

    执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。

    依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,如果等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。

    建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。

    线程池的监控

    通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用

    • taskCount:线程池需要执行的任务数量。
    • completedTaskCount:线程池在运行过程中已完成的任务数量。小于或等于taskCount。
    • largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。
    • getPoolSize:线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不加
    • getActiveCount:获取活动的线程数。

    通过扩展线程池进行监控。通过继承线程池并重写线程池的beforeExecute,afterExecute和terminated方法,我们可以在任务执行前,执行后和线程池关闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间等。这几个方法在线程池里是空方法。如:

    protected void beforeExecute(Thread t, Runnable r) { }
    
    
    

    参考:

    • Java并发编程实战
    • JDK1.6源码
    • infoq

    相关文章

      网友评论

        本文标题:Java Consurrency 《Thread Pool》

        本文链接:https://www.haomeiwen.com/subject/ovtieftx.html