CompletionService介绍
CompletionService通过上面的截图,我们大概可以猜到CompletionService跟线程池的队列有关。java.util.concurrent.CompletionService
是对 ExecutorService
的一个功能增强封装,优化了获取异步操作结果的接口。CompletionService接口功能是以异步的方式将执行任务和处理任务分别执行,避免阻塞。
在上一节介绍execute和submit的区别的时候有提到submit有返回值,可以返回Future。当我们执行Future的get方法获取结果时,可能拿到的Future并不是第一个执行完成的Callable的Future,就会进行阻塞,可能会导致严重的性能损耗问题。而CompletionService正是为了解决这个问题,它是Java8的新增接口,它的实现类是ExecutorCompletionService。CompletionService会根据线程池中Task的执行结果按执行完成的先后顺序排序,任务先完成的可优先获取到。下面通过代码来分别对别下:
直接get获取:
public class CompletionServiceTest {
private static final int PROCESS_SIZE = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = PROCESS_SIZE;
private static final int KEEP_ALIVE_TIME = 5;
private static final int WORK_QUEUE_SIZE = PROCESS_SIZE;
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE,
PROCESS_SIZE * 2,
KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(WORK_QUEUE_SIZE));
List<Future<String>> futureList = new ArrayList<>();
// 两个任务,分别休眠4s,1s
futureList.add(threadPoolExecutor.submit(new MyCallable(4_000)));
futureList.add(threadPoolExecutor.submit(new MyCallable(1_000)));
for (Future future : futureList) {
try {
// get方法是阻塞的,如果线程池中某一个任务执行较长,会阻塞其他已经完成的任务获取结果.
System.out.println("get方法阻塞:" + future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
threadPoolExecutor.shutdown();
}
/**
* call():休眠sleepTime,任务开始以及结束打印下内容.
*
*/
static class MyCallable implements Callable<String> {
private int sleepTime;
public MyCallable(int sleepTime) {
this.sleepTime = sleepTime;
}
@Override
public String call() throws Exception {
System.out.println(Thread.currentThread().getName() + " start,sleepTime:" + sleepTime);
Thread.sleep(sleepTime);
return "end.当前线程名字:" + Thread.currentThread().getName() + ",休眠了:" + sleepTime;
}
}
}
**************************************************************
pool-1-thread-2 start,sleepTime:1000
pool-1-thread-1 start,sleepTime:4000
get方法阻塞:end.当前线程名字:pool-1-thread-1,休眠了:4000
get方法阻塞:end.当前线程名字:pool-1-thread-2,休眠了:1000
使用CompletionService:
public class CompletionServiceTest {
private static final int PROCESS_SIZE = Runtime.getRuntime().availableProcessors();
private static final int CORE_POOL_SIZE = PROCESS_SIZE;
private static final int KEEP_ALIVE_TIME = 5;
private static final int WORK_QUEUE_SIZE = PROCESS_SIZE;
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE,
PROCESS_SIZE * 2,
KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(WORK_QUEUE_SIZE));
CompletionService completionService = new ExecutorCompletionService(threadPoolExecutor);
//提交Callable任务
completionService.submit(new MyCallable(4_000));
completionService.submit(new MyCallable(1_000));
//获取future结果,不会阻塞
Future<String> pollFuture = completionService.poll();
//call方法中会休眠1s,这里因为没有执行完成的Callable,所以返回null
System.out.println("没有执行完毕:" + pollFuture);
//获取future结果,最多等待1秒,不会阻塞
/* Future<Integer> pollTimeOutFuture = completionService.poll(1, TimeUnit.SECONDS);*/
//通过take获取Future结果,此方法会阻塞
for (int i = 0; i < 2; i++) {
System.out.println("--------" + completionService.take().get());
}
threadPoolExecutor.shutdown();
}
/**
* call():休眠sleepTime,任务结束打印结果.
*/
static class MyCallable implements Callable<String> {
private int sleepTime;
public MyCallable(int sleepTime) {
this.sleepTime = sleepTime;
}
@Override
public String call() throws Exception {
System.out.println(Thread.currentThread().getName() + " start,sleepTime:" + sleepTime);
Thread.sleep(sleepTime);
return "end.当前线程名字:" + Thread.currentThread().getName() + ",休眠了:" + sleepTime;
}
}
}
*********************************************************
没有执行完毕:null
pool-1-thread-1 start,sleepTime:4000
pool-1-thread-2 start,sleepTime:1000
--------end.当前线程名字:pool-1-thread-2,休眠了:1000
--------end.当前线程名字:pool-1-thread-1,休眠了:4000
上面代码可以清晰看出使用CompletionService优点,CompletionService没有采取依次遍历 Future 的方式,而是在中间加上了一个结果队列,任务完成后马上将结果放入队列,那么从队列中取到的就是最早完成的结果。如果队列为空,那么 take()
方法会阻塞直到队列中出现结果为止。此外 CompletionService 还提供一个 poll()
方法,返回值与 take() 方法一样,不同之处在于它不会阻塞,如果队列为空则立刻返回 null。
网友评论