1.固定数量线程池newFixedThreadPool
创建一个可重用的固定数量的无界队列的线程池。在任何时候,最多有nThreads个活跃线程处理任务。如果在所有线程都处于活动状态时提交了其他任务,则它们将在队列中等待,直到某个线程可用为止。如果在线程池关闭之前,执行任务的过程中由于执行失败导致任何线程终止,则在执行后续任务时将使用新线程代替。线程池中的线程将一直存活直到调用shutdown方法。
源码:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
使用:
private static void useFixedThreadPool() throws InterruptedException {
final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(10);
// Returns the approximate number of threads that are actively
System.out.println(executor.getActiveCount());
IntStream.range(0, 100).boxed().forEach(i -> {
executor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 1============");
});
});
System.out.println(executor.getActiveCount());
TimeUnit.SECONDS.sleep(3);
System.out.println(executor.getActiveCount());
}
运行结果:
运行结果.png2.缓存线程池useCachedThreadPool
创建一个线程池,该线程池根据需要创建新线程,如果有可获得的缓存线程,将重用已经构造的线程。该线程池通常能提高执行许多短期异步任务的程序的性能。对execute的调用将重用先前已经构造的线程(如果存在)。如果没有可用的现有线程,则将创建一个新线程并将其添加到池中。六十秒内未使用的线程将终止并从缓存中删除。因此,保持空闲时间足够长的池不会消耗任何资源。
源码:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
使用:
private static void useCachedThreadPool() throws InterruptedException {
final ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
System.out.println(executor.getActiveCount());
executor.execute(() -> {
executor.execute(() -> {
System.out.println("============");
});
});
System.out.println(executor.getActiveCount());
IntStream.range(0, 100).boxed().forEach(i -> {
executor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("============");
});
});
System.out.println(executor.getActiveCount());
TimeUnit.SECONDS.sleep(3);
System.out.println(executor.getActiveCount());
}
运行结果:
运行结果.png3.只有一个线程的线程池useSingleThreadPool
创建一个只有单个线程的线程池。等价于new FinalizableDelegatedExecutorService(Executors.newFixedThreadPool(1));返回的不是ThreadPoolExecutor的实例,仅仅暴露ExecutorService的方法。
源码:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
使用:
private static void useSingleThreadPool() throws InterruptedException {
final ExecutorService executor = Executors.newSingleThreadExecutor();
IntStream.range(0, 100).boxed().forEach(i -> {
executor.execute(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "============");
});
});
TimeUnit.SECONDS.sleep(3);
}
运行结果:
运行结果.png4.延迟线程池ScheduledThreadPoolExecutor
源码:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
使用1:
private static void testSchedule() throws InterruptedException, ExecutionException {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
ScheduledFuture<?> future = executor.schedule(() -> {
System.out.println("=======");
}, 2, TimeUnit.SECONDS);
//System.out.println(future.cancel(true));
System.out.println(future.get());
}
使用2:
private static void TestScheduledAtFixedRate() {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
AtomicLong interval = new AtomicLong(0L);
// 如果period小于任务执行的时长,则period失效,当上一个任务执行完毕之后,再执行下一个任务
ScheduledFuture<?> scheduledFuture = executor.scheduleAtFixedRate(() -> {
long currentTime = System.currentTimeMillis();
if (interval.get() == 0) {
System.out.printf("The first time trigger task at %d\n", currentTime);
} else {
System.out.printf("The actually spend [%d]\n", currentTime - interval.get());
}
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
interval.set(currentTime);
}, 3, 2, TimeUnit.SECONDS);
}
运行结果.png
使用3:
private static void testScheduleWithFixedDelay() throws InterruptedException {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(2);
System.out.println(executor.getExecuteExistingDelayedTasksAfterShutdownPolicy());
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
AtomicLong interval = new AtomicLong(0L);
// 上一次任务结束之后的时间+delay=下一次任务开始的时间
executor.scheduleWithFixedDelay(() -> {
long currentTime = System.currentTimeMillis();
if (interval.get() == 0) {
System.out.printf("The first time trigger task at %d\n", currentTime);
} else {
System.out.printf("The actually spend [%d]\n", currentTime - interval.get());
}
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
interval.set(currentTime);
}, 3, 2, TimeUnit.SECONDS);
TimeUnit.SECONDS.sleep(15);
executor.shutdown();
}
运行结果.png
5.并行线程池newWorkStealingPool
源码:
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
使用:
public static void main(String[] args) throws InterruptedException {
Optional.of(Runtime.getRuntime().availableProcessors()).ifPresent(System.out::println);
final ExecutorService executorService = Executors.newWorkStealingPool();
final List<Callable<String>> callableList = IntStream.range(0, 20).boxed().map(i -> (Callable<String>) () -> {
System.out.println(Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(3);
return "Task" + i;
}).collect(Collectors.toList());
executorService.invokeAll(callableList).stream().map(future -> {
try {
return future.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}).forEach(System.out::println);
}
网友评论