美文网首页java多线程
Java 多线程(七):Executor 线程池框架

Java 多线程(七):Executor 线程池框架

作者: 聪明的奇瑞 | 来源:发表于2018-03-12 00:53 被阅读70次
  • 线程的频繁创建在高并发及大数据量时是非常消耗资源的
  • 因此 Java 提供了线程池,Java5 在 java.util.concurrent 中添加了 Exectuor 异步执行框架
  • 通过线程池减少线程创建和销毁的次数,重复利用,根据系统情况调整线程池数量,防止创建过多线程
  • java.util.concurrent 中包含了几个比较重要的类:
类名 描述
Executor 线程池接口
ExecutorService 在基础 Executor 线程池接口上生命了生命周期管理方法、任务执行状况跟踪方法
ScheduledExecutorService 一个定时调度任务的接口
ScheduledThreadPoolExecutor ScheduledExecutorService 的实现,实现了可定时调度任务的线程池
ThreadPoolExecutor 线程池,可以通过调用 Executors 以下静态工厂方法来创建线程池并返回一个 ExecutorService 对象

ThreadPoolExecutor 构造函数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) //后两个参数为可选参数
  • corePoolSize:核心线程数,如果运行的线程少于 corePoolSize 则创建新线程来执行新任务,即使线程池中的其他线程是空闲的
  • maximumPoolSize:最大线程数,corePoolSize和maximumPoolSize设置的边界自动调整池大小:
    • 当前线程数 >= corePoolSize(且任务队列已满时,线程会创建新线程来处理任务)
    • 当前线程数 = maxPoolSize(且任务队列已满时,线程池会拒绝处理任务而抛出异常)
  • keepAliveTime:如果线程数多于 corePoolSize 则这些多余的线程的空闲时间超过 keepAliveTime 时将被终止
  • unit:keepAliveTime 参数的时间单位
  • workQueue:保存任务的阻塞队列,与线程池的大小有关:
    • 当运行的线程数少于 corePoolSize 时,新任务直接创建新线程来执行任务而无需再进队列
    • 当运行的线程数等于或多于 corePoolSize 新任务添加时则选加入队列,不直接创建线程
    • 当队列满时,新任务就会创建新线程
  • threadFactory:使用 ThreadFactory 创建新线程,默认使用 defaultThreadFactory 创建线程
  • handle:定义处理被拒绝任务的策略,默认使用 ThreadPoolExecutor.AbortPolicy 任务被拒绝时将抛出 RejectExecutorException

Executors

  • Executors 提供了一系列静态工厂方法用于创建各种线程池,根据具体应用场景而选择不同的线程池

newFixedThreadPool

  • 创建可重用且固定线程数的线程池,如果线程池中的所有线程都处于活动状态,此时再提交任务就在队列中等待,直到有可用线程
  • 如果线程池中的某个线程由于异常而结束时,线程池就会再添加一个新线程

方法定义

//使用一个基于FIFO排序的阻塞队列,在所有corePoolSize线程都忙时新任务将在队列中等待
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

使用例子

public class Main {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        IntStream.range(0, 6).forEach(i -> executorService.execute(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
                String threadName = Thread.currentThread().getName();
                System.out.println("finished: " + threadName);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));
    }
}

//输出结果为:
//finished: pool-1-thread-1
//finished: pool-1-thread-2
//finished: pool-1-thread-3
//finished: pool-1-thread-4
//finished: pool-1-thread-5
//finished: pool-1-thread-1

newSingleThreadExecutor

  • 创建一个单线程的 Executor,它只会用唯一的线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
  • 如果该线程因为异常而结束就新建一条线程来继续执行后续的任务

方法定义

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    //corePoolSize和maximumPoolSize都等于,表示固定线程池大小为1
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}

使用例子

public class Main {
    private static int num = 100;

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        IntStream.range(0, 100).forEach(i -> executorService.execute(() -> {
            String runnableName = "Runnable"+i+":";
            System.out.println(runnableName + num--);
        }));
        try {
            executorService.shutdown();
            executorService.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (!executorService.isTerminated()) {
                executorService.shutdownNow();
            }
        }
        System.out.println("执行结束");
    }
}

/** 输出结果为:
    Runnable0:100
    Runnable1:99
    Runnable2:98
    .....
    Runnable98:2
    Runnable99:1
    执行结束
**/

newScheduledThreadPool

  • 创建一个可延迟执行或定期执行的线程池

方法定义

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

使用例子

public class Main {
    public static void main(String[] args) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
        IntStream.range(0, 2).forEach(i -> executorService.scheduleAtFixedRate(() -> {
                String threadName = Thread.currentThread().getName();
                System.out.println("finished: " + threadName);
        },1000, 2000, TimeUnit.MILLISECONDS));
    }
}

//输出结果为:
//finished: pool-1-thread-1
//finished: pool-1-thread-2
//finished: pool-1-thread-1
//finished: pool-1-thread-2
//finished: pool-1-thread-2
//finished: pool-1-thread-1

newCachedThreadPool

  • 创建可缓存的线程池,如果线程池中的线程在 60 秒未被使用就将被移除,在执行新的任务时,当线程池中有之前创建的可用线程就重用可用线程,否则就新建一个线程
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
public class Main {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        IntStream.range(0, 6).forEach(i -> executorService.execute(() -> {
                String threadName = Thread.currentThread().getName();
                System.out.println("finished: " + threadName);
        }));
    }
}

//输出结果为:
//finished: pool-1-thread-1
//finished: pool-1-thread-2
//finished: pool-1-thread-2
//finished: pool-1-thread-2
//finished: pool-1-thread-2
//finished: pool-1-thread-2
//线程池会缓存线程,尽可能的利用线程资源

ExecutorService 常用方法

  • shutdown:方法等待提交的任务执行完成并不再接受新任务,在完成全部提交的任务后关闭
  • shutdownNow:方法将强制终止所有运行中的任务并不再允许提交新任务
  • awaitTermination:阻塞,直到所有任务在关闭请求之后完成执行,或发生超时,或当前线程中断
  • isTerminated:当所有任务关闭后返回 true,除非 shutdown、shutdownNow 被调用,否则永远不为 true
  • submit:提交可执行的任务(Runnable、Thread、Callable)并执行,返回一个 Future 对象

相关文章

网友评论

  • changhr2013:大佬,多线程系列不再深入讲一下么?
    聪明的奇瑞:@changhr2013 嗯嗯,有空的话我会再补充一下的

本文标题:Java 多线程(七):Executor 线程池框架

本文链接:https://www.haomeiwen.com/subject/jijbfftx.html