美文网首页java多线程
Java 多线程(七):Executor 线程池框架

Java 多线程(七):Executor 线程池框架

作者: 聪明的奇瑞 | 来源:发表于2018-03-12 00:53 被阅读70次
    • 线程的频繁创建在高并发及大数据量时是非常消耗资源的
    • 因此 Java 提供了线程池,Java5 在 java.util.concurrent 中添加了 Exectuor 异步执行框架
    • 通过线程池减少线程创建和销毁的次数,重复利用,根据系统情况调整线程池数量,防止创建过多线程
    • java.util.concurrent 中包含了几个比较重要的类:
    类名 描述
    Executor 线程池接口
    ExecutorService 在基础 Executor 线程池接口上生命了生命周期管理方法、任务执行状况跟踪方法
    ScheduledExecutorService 一个定时调度任务的接口
    ScheduledThreadPoolExecutor ScheduledExecutorService 的实现,实现了可定时调度任务的线程池
    ThreadPoolExecutor 线程池,可以通过调用 Executors 以下静态工厂方法来创建线程池并返回一个 ExecutorService 对象

    ThreadPoolExecutor 构造函数

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) //后两个参数为可选参数
    
    • corePoolSize:核心线程数,如果运行的线程少于 corePoolSize 则创建新线程来执行新任务,即使线程池中的其他线程是空闲的
    • maximumPoolSize:最大线程数,corePoolSize和maximumPoolSize设置的边界自动调整池大小:
      • 当前线程数 >= corePoolSize(且任务队列已满时,线程会创建新线程来处理任务)
      • 当前线程数 = maxPoolSize(且任务队列已满时,线程池会拒绝处理任务而抛出异常)
    • keepAliveTime:如果线程数多于 corePoolSize 则这些多余的线程的空闲时间超过 keepAliveTime 时将被终止
    • unit:keepAliveTime 参数的时间单位
    • workQueue:保存任务的阻塞队列,与线程池的大小有关:
      • 当运行的线程数少于 corePoolSize 时,新任务直接创建新线程来执行任务而无需再进队列
      • 当运行的线程数等于或多于 corePoolSize 新任务添加时则选加入队列,不直接创建线程
      • 当队列满时,新任务就会创建新线程
    • threadFactory:使用 ThreadFactory 创建新线程,默认使用 defaultThreadFactory 创建线程
    • handle:定义处理被拒绝任务的策略,默认使用 ThreadPoolExecutor.AbortPolicy 任务被拒绝时将抛出 RejectExecutorException

    Executors

    • Executors 提供了一系列静态工厂方法用于创建各种线程池,根据具体应用场景而选择不同的线程池

    newFixedThreadPool

    • 创建可重用且固定线程数的线程池,如果线程池中的所有线程都处于活动状态,此时再提交任务就在队列中等待,直到有可用线程
    • 如果线程池中的某个线程由于异常而结束时,线程池就会再添加一个新线程

    方法定义

    //使用一个基于FIFO排序的阻塞队列,在所有corePoolSize线程都忙时新任务将在队列中等待
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    

    使用例子

    public class Main {
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newFixedThreadPool(5);
            IntStream.range(0, 6).forEach(i -> executorService.execute(() -> {
                try {
                    TimeUnit.SECONDS.sleep(1);
                    String threadName = Thread.currentThread().getName();
                    System.out.println("finished: " + threadName);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }));
        }
    }
    
    //输出结果为:
    //finished: pool-1-thread-1
    //finished: pool-1-thread-2
    //finished: pool-1-thread-3
    //finished: pool-1-thread-4
    //finished: pool-1-thread-5
    //finished: pool-1-thread-1
    

    newSingleThreadExecutor

    • 创建一个单线程的 Executor,它只会用唯一的线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
    • 如果该线程因为异常而结束就新建一条线程来继续执行后续的任务

    方法定义

    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        //corePoolSize和maximumPoolSize都等于,表示固定线程池大小为1
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
    

    使用例子

    public class Main {
        private static int num = 100;
    
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            IntStream.range(0, 100).forEach(i -> executorService.execute(() -> {
                String runnableName = "Runnable"+i+":";
                System.out.println(runnableName + num--);
            }));
            try {
                executorService.shutdown();
                executorService.awaitTermination(5, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                if (!executorService.isTerminated()) {
                    executorService.shutdownNow();
                }
            }
            System.out.println("执行结束");
        }
    }
    
    /** 输出结果为:
        Runnable0:100
        Runnable1:99
        Runnable2:98
        .....
        Runnable98:2
        Runnable99:1
        执行结束
    **/
    

    newScheduledThreadPool

    • 创建一个可延迟执行或定期执行的线程池

    方法定义

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    

    使用例子

    public class Main {
        public static void main(String[] args) {
            ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
            IntStream.range(0, 2).forEach(i -> executorService.scheduleAtFixedRate(() -> {
                    String threadName = Thread.currentThread().getName();
                    System.out.println("finished: " + threadName);
            },1000, 2000, TimeUnit.MILLISECONDS));
        }
    }
    
    //输出结果为:
    //finished: pool-1-thread-1
    //finished: pool-1-thread-2
    //finished: pool-1-thread-1
    //finished: pool-1-thread-2
    //finished: pool-1-thread-2
    //finished: pool-1-thread-1
    

    newCachedThreadPool

    • 创建可缓存的线程池,如果线程池中的线程在 60 秒未被使用就将被移除,在执行新的任务时,当线程池中有之前创建的可用线程就重用可用线程,否则就新建一个线程
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    
    public class Main {
        public static void main(String[] args) {
            ExecutorService executorService = Executors.newCachedThreadPool();
            IntStream.range(0, 6).forEach(i -> executorService.execute(() -> {
                    String threadName = Thread.currentThread().getName();
                    System.out.println("finished: " + threadName);
            }));
        }
    }
    
    //输出结果为:
    //finished: pool-1-thread-1
    //finished: pool-1-thread-2
    //finished: pool-1-thread-2
    //finished: pool-1-thread-2
    //finished: pool-1-thread-2
    //finished: pool-1-thread-2
    //线程池会缓存线程,尽可能的利用线程资源
    

    ExecutorService 常用方法

    • shutdown:方法等待提交的任务执行完成并不再接受新任务,在完成全部提交的任务后关闭
    • shutdownNow:方法将强制终止所有运行中的任务并不再允许提交新任务
    • awaitTermination:阻塞,直到所有任务在关闭请求之后完成执行,或发生超时,或当前线程中断
    • isTerminated:当所有任务关闭后返回 true,除非 shutdown、shutdownNow 被调用,否则永远不为 true
    • submit:提交可执行的任务(Runnable、Thread、Callable)并执行,返回一个 Future 对象

    相关文章

      网友评论

      • changhr2013:大佬,多线程系列不再深入讲一下么?
        聪明的奇瑞:@changhr2013 嗯嗯,有空的话我会再补充一下的

      本文标题:Java 多线程(七):Executor 线程池框架

      本文链接:https://www.haomeiwen.com/subject/jijbfftx.html