线程池中线程都可复用,大大提高了效率。线程的创建和销毁是很耗费资源的
- 线程池的ExecutorService 服务 有execute和submit方法 。submit除了可以传入Runable。还可以传入Callable。故可以有返回值
- Callable 类似Runnable 但是Callable可以有返回值。实现Callable的线程结束后可接收
- Executors为Executor,ExecutorService,ScheduledExecutorService,ThreadFactory和Callable类提供了一些工具方法,类似于集合中的Collections类的功能。
- 固定数量的线程池 ExecutorService service = Executors.newFixedThreadPool(4):
ExecutorService service = Executors.newFixedThreadPool(4);
MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20
MyTask t2 = new MyTask(80001, 130000);
MyTask t3 = new MyTask(130001, 170000);
MyTask t4 = new MyTask(170001, 200000);
Future<List<Integer>> f1 = service.submit(t1);
Future<List<Integer>> f2 = service.submit(t2);
Future<List<Integer>> f3 = service.submit(t3);
Future<List<Integer>> f4 = service.submit(t4);
- 不固定数量线程池 ExecutorService service = Executors.newCachedThreadPool(); 线程结束后默认60秒之后在线程池中销毁
- 单线程的线程池。ExecutorService service = Executors.newSingleThreadExecutor(); 可保证任务执行顺序
- 定时任务线程池。ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
/**
* scheduleAtFixedRate方法四个参数
* Runnable command,
* long initialDelay,任务开始时间
* long period,每隔多少时间执行
* TimeUnit unit,时间单位
*/
service.scheduleAtFixedRate(()->{
try {
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}, 0, 500, TimeUnit.MILLISECONDS);
- 可执行其他线程任务WorkStealingPool :ExecutorService service = Executors.newWorkStealingPool(); 该线程池中每个线程都维护自己的任务队列。当自己的任务队列执行完成时,会帮助其他线程执行其中的任务。主动找活干。PS:它底层是ForkJoinPool的实现。用守护线程daemon的方式来执行。JDK1.8之后新加
- ForkJoinPool 分任务进行计算再合并结果。计算任务需要基础RecursiveTask: class AddTask extends RecursiveTask<Long> 并重写compute()方法 方法内进行递归调用。 RecursiveTask(递归任务) JDK1.7之后新加。 代码示例:
class AddTask extends RecursiveTask<Long> {
int start, end;
AddTask(int s, int e) {
start = s;
end = e;
}
@Override
protected Long compute() {
if(end-start <= MAX_NUM) {
long sum = 0L;
for(int i=start; i<end; i++) sum += nums[i];
return sum;
}
int middle = start + (end-start)/2;
AddTask subTask1 = new AddTask(start, middle);
AddTask subTask2 = new AddTask(middle, end);//递归
subTask1.fork(); //启动新线程
subTask2.fork();
return subTask1.join() + subTask2.join(); //返回计算结果
}
}
- 所有线程池类内部实现都是ThreadPoolExecutor 除了ForkJoinPool。ThreadPoolExecutor 线程池的通用类 jdk源码中的ThreadPoolExecutor:
public ThreadPoolExecutor(int corePoolSize, //线程数量
int maximumPoolSize,//最多线程数
long keepAliveTime,//线程空闲销毁的时间
TimeUnit unit,//线程空闲销毁的时间单位
BlockingQueue<Runnable> workQueue) {//任务容器。阻塞式容器
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
- 从各类pool的源码可以看见。使用的都是ThreadPoolExecutor
/**
* FixedThreadPool 固定数量
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
/**
* CachedThreadPool 不固定数量,空闲60秒后销毁
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
/**
* SingleThreadExecutor 单个线程,保证了顺序执行。
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- parallel Stream API : nums.parallelStream().forEach(...);多线程执行任务.效率很高。底层为ForkJoinPool的实现 。JDK1.8之后的特性 示例.质数的计算
public class ParallelStreamAPI {
public static void main(String[] args) {
List<Integer> nums = new ArrayList<>();
Random r = new Random();
for(int i=0; i<10000; i++) nums.add(1000000 + r.nextInt(1000000));
start = System.currentTimeMillis();
nums.parallelStream().forEach(ClassName::isPrime);
end = System.currentTimeMillis();
System.out.println(end - start);
}
static boolean isPrime(int num) {
for(int i=2; i<=num/2; i++) {
if(num % i == 0) return false;
}
return true;
}
}
网友评论