ExecutorService

作者: 王金松 | 来源:发表于2019-05-12 00:39 被阅读0次

    ExecutorService的submit和execute

    ExecuteService代表的是Executors创建的线程池
    submit提交的是Callable方法,返回Future,说明submit是有返回值的
    execute执行的是Runnable方法,没有返回值
    所以submit和execute的区别是提交的方法和是否有返回值

    ExecutorService的shutdown,shutdownNow,awaitTermination

    flume中的关闭源码

      public void stop() {
        LOGGER.info("Configuration provider stopping");
    
        executorService.shutdown();
        try {
          if (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
            LOGGER.debug("File watcher has not terminated. Forcing shutdown of executor.");
            executorService.shutdownNow();
            while (!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
              LOGGER.debug("Waiting for file watcher to terminate");
            }
          }
        } catch (InterruptedException e) {
          LOGGER.debug("Interrupted while waiting for file watcher to terminate");
          Thread.currentThread().interrupt();
        }
        lifecycleState = LifecycleState.STOP;
        LOGGER.debug("Configuration provider stopped");
      }
    

    shutdown方法:平滑的关闭ExecutorService,当此方法被调用时,ExecutorService停止接收新的任务并且等待已经提交的任务(包含提交正在执行和提交未执行)执行完成。当所有提交任务执行完毕,线程池即被关闭。
    awaitTermination方法:接收人timeout和TimeUnit两个参数,用于设定超时时间及单位。当等待超过设定时间时,会监测ExecutorService是否已经关闭,若关闭则返回true,否则返回false。一般情况下会和shutdown方法组合使用。

    • 场景
      应用场景为线程池的有效执行时间为20S,20S之后不管子任务有没有执行完毕,都要关闭线程池。代码如下
    ExecutorService es = Executors.newFixedThreadPool(10);        
    es.execute(new Thread());//执行子线程任务     
       try {    
        es.shutdown();  
            if(!es.awaitTermination(20,TimeUnit.SECONDS)){//20S 
           System.out.println(" 到达指定时间,还有线程没执行完,不再等待,关闭线程池!");      
       es.shutdownNow();    
        }
        } catch (Throwable e) {     // TODO Auto-generated catch block  
        es.shutdownNow();   
        e.printStackTrace();
        }
    

    awaitTermination方法调用会被阻塞,直到所有任务执行完毕并且shutdown请求被调用,或者参数中定义的timeout时间到达或者当前线程被打断,这几种情况任意一个发生了就会导致该方法的执行。
    当我们调用pool.awaitTermination时,首先该方法会被阻塞,这时会执行子线程中的任务,子线程执行完毕后该方法仍然会被阻塞,因为shutdown()方法还未被调用,而代码中将shutdown的请求放在了awaitTermination之后,这样就导致了只有awaitTermination方法执行完毕后才会执行shutdown请求,这样就造成了死锁。
    shutdown的请求一定要放在awaitTermination之前

    ExecuteService执行任务的异常处理

    https://www.cnblogs.com/langtianya/p/4520373.html

    下面这段代码执行的结果是什么?
    executorService.submit(() -> {
        System.out.println(1 / 0);
    });
    我被它坑过无数回了:它什么也不会输出。没有任何的java.lang.ArithmeticException: / by zero的征兆,啥也没有。线程池会把这个异常吞掉,就像什么也没发生过一样。如果是你自己创建的java.lang.Thread还好,这样 UncaughtExceptionHandler 还能起作用。不过如果是线程池的话你就得小心了。如果你提交的是Runnable对象的话(就像上面那个一样,没有返回值),你得将整个方法体用try- catch包起来,至少打印一下异常。如果你提交的是Callable<Integer>的话,得确保你在用get()方法取值的时候重新抛 出异常:
    final Future<Integer> division = executorService.submit(() -> 1 / 0);
    //below will throw ExecutionException caused by ArithmeticException
    division.get();
    

    监控队列长度,确保队列有界

    不当的线程池大小会使得处理速度变慢,稳定性下降,并且导致内存泄露。如果配置的线程过少,则队列会持续变大,消耗过多内存。而过多的线程又会 由于频繁的上下文切换导致整个系统的速度变缓——殊途而同归。队列的长度至关重要,它必须得是有界的,这样如果线程池不堪重负了它可以暂时拒绝掉新的请 求:
    final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);
    executorService = new ThreadPoolExecutor(n, n,
    0L, TimeUnit.MILLISECONDS,
    queue);
    上面的代码等价于Executors.newFixedThreadPool(n),然而不同的是默认的实现是一个无界的 LinkedBlockingQueue。这里我们用的是一个固定100大小的ArrayBlockingQueue。也就是说如果已经有100个任务在 队列中了(还有N个在执行中),新的任务就会被拒绝掉,并抛出RejectedExecutionException异常。由于这里的队列是在外部声明 的,我们还可以时不时地调用下它的size()方法来将队列大小记录在到日志/JMX/或者你所使用的监控系统中。

    Executors.newCacheThreadPool线程和new ThreadPoolExecutor的区别和用法

    Executors.去创建线程

    五种线程池的适应场景

    • newCachedThreadPool:用来创建一个可以无限扩大的线程池,适用于服务器负载较轻,执行很多短期异步任务。
    • newFixedThreadPool:创建一个固定大小的线程池,因为采用无界的阻塞队列,所以实际线程数量永远不会变化,适用于可以预测线程数量的业务中,或者服务器负载较重,对当前线程数量进行限制。
    • newSingleThreadExecutor:创建一个单线程的线程池,适用于需要保证顺序执行各个任务,并且在任意时间点,不会有多个线程是活动的场景。
    • newScheduledThreadPool:可以延时启动,定时启动的线程池,适用于需要多个后台线程执行周期任务的场景。
    • newWorkStealingPool:创建一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用cpu数量的线程来并行执行,适用于大耗时的操作,可以并行来执行
    为什么Executors创建线程不安全

    Java中的BlockingQueue主要有两种实现,分别是ArrayBlockingQueue 和 LinkedBlockingQueue。
    ArrayBlockingQueue是一个用数组实现的有界阻塞队列,必须设置容量。
    LinkedBlockingQueue是一个用链表实现的有界阻塞队列,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE。
    这里的问题就出在:不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE。也就是说,如果我们不设置LinkedBlockingQueue的容量的话,其默认容量将会是Integer.MAX_VALUE。
    而newFixedThreadPool中创建LinkedBlockingQueue时,并未指定容量。此时,LinkedBlockingQueue就是一个无边界队列,对于一个无边界队列来说,是可以不断的向队列中加入任务的,这种情况下就有可能因为任务过多而导致内存溢出问题。
    上面提到的问题主要体现在newFixedThreadPool和newSingleThreadExecutor两个工厂方法上,并不是说newCachedThreadPool和newScheduledThreadPool这两个方法就安全了,这两种方式创建的最大线程数可能是Integer.MAX_VALUE,而创建这么多线程,必然就有可能导致OOM。

    自定义创建线程池
    private static ExecutorService executor = new ThreadPoolExecutor(10, 10,
            60L, TimeUnit.SECONDS,
            new ArrayBlockingQueue(10));
    
    使用guava的ThreadFactoryBuilder
    public class ExecutorsDemo {
        private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
            .setNameFormat("demo-pool-%d").build();
        private static ExecutorService pool = new ThreadPoolExecutor(5, 200,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
        public static void main(String[] args) {
            for (int i = 0; i < Integer.MAX_VALUE; i++) {
                pool.execute(new SubThread());
            }
        }
    }
    
    positionWriter = Executors.newSingleThreadScheduledExecutor(
            new ThreadFactoryBuilder().setNameFormat("positionWriter").build());
    

    other

    。。。

    相关文章

      网友评论

        本文标题:ExecutorService

        本文链接:https://www.haomeiwen.com/subject/pplqaqtx.html