以前我们知道创建线程的两种方式:
- 继承Thread类;
- 实现Runnable接口;
本文再来讲讲另外两种创建线程的方式:
- 实现Callable接口;
- 使用线程池
一、Callable接口
1、有什么特点?
Callable接口带返回值。
2、为什么要用Callable接口?
举个例子:
我要计算从1加到10,再加上从11乘到15的结果,需要怎么做?如果是用一个线做,那就从头算到尾,先算1加到10的结
果,再加上后面乘法运算的结果。显然乘法是比较费时间的,那么就可以创建一个新线程来计算11乘到15的结果。但是
其他两种创建线程的方式都是没办法拿到线程里面的返回值的,所以Callable接口出现了。
3、怎么用?
- 线创建资源类实现Callable接口;
- 重写call方法;
思考:Thread类的构造只能接受Runnable接口,并不能接口Callable接口,怎么办?
解决:找中间人。如果有一个中间人同时实现了Runnable和Callable,那不就行了嘛。这就是适配器模式。这个中间人就是FutureTask实现类。
接下来看看编码实现:
class Resource implements Callable<Integer> {
@Override
public Integer call() throws Exception {
return 11 * 12 * 13 * 14 * 15;
}
}
main方法:
public static void main(String[] args) throws Exception{
Resource callable = new Resource();
FutureTask<Integer> futureTask = new FutureTask<>(callable);
new Thread(futureTask, "AA").start();
Integer result1 = 55; // main线程计算出来的从 1 加到 10 的结果
Integer result2 = futureTask.get();
System.out.println("result:\t" + (result1 + result2));
}
计算出的结果为:
运行结果
这个结果是正确的。那么问题来了:
- 如果还没计算完 11 乘到 15 的值,我就通过get方法去取,会发生什么情况?看下面的代码:
class Resource implements Callable<Integer> {
@Override
public Integer call() throws Exception {
//System.out.println(Thread.currentThread().getName());
TimeUnit.SECONDS.sleep(2); // 眯一会儿
return 11 * 12 * 13 * 14 * 15;
}
}
public static void main(String[] args) throws Exception{
Resource callable = new Resource();
FutureTask<Integer> futureTask = new FutureTask<>(callable);
new Thread(futureTask, "AA").start();
Integer result2 = futureTask.get(); // 还没计算完就取值
System.out.println(Thread.currentThread().getName());
System.out.println("hello");
System.out.println("hello");
Integer result1 = 55; // main线程计算出来的从 1 加到 10 的结果
System.out.println("result:\t" + (result1 + result2));
}
大家注意看运行结果中的两个 hello 是什么时候输出的:
运行结果
可以看到,如果还没计算完就取值,那么main线程就会被阻塞,直到计算完为止。这显然违背了我们设计的初衷。
- 我们使用Callable的目的就是:一个线程做这件事,另一个线程做另外一件事,在这两个线程没有需要用到对方的计算结果之前,互不干扰。其实这也是 Fork Join 的思想。
也就是说,输出两个 hello 并不需要用到 call 方法 的返回值,所以即使还没算完,也应该可以正常输出,而不是被阻塞。所以,get方法的调用要放在最后并且等计算完了再get。那么如何保证计算完了呢?看如下代码:
public static void main(String[] args) throws Exception{
Resource callable = new Resource();
FutureTask<Integer> futureTask = new FutureTask<>(callable);
new Thread(futureTask, "AA").start();
//new Thread(futureTask, "BB").start();
Integer result1 = 55; // main线程计算出来的从 1 加到 10 的结果
System.out.println(Thread.currentThread().getName());
System.out.println("hello");
System.out.println("hello");
while (!futureTask.isDone()){
}
Integer result2 = futureTask.get(); // get前先自旋
System.out.println("result:\t" + (result1 + result2));
}
把get方法的调用放在最后面,并且用while进行自旋操作,如果没计算完,就会一直在while循环中。看看这次的运行结果:
运行结果
可以看到,这次main线程并没被阻塞,运行后立即完成了自己该做的事。
注意上面的call方法里有一行注释掉的输出语句,以及main方法里有一个注释掉的线程BB。如果把注释放开,其实也还是只有AA线程会进去,BB线程根本就调不到call方法。也就说,多个线程共用一个 futureTask,只会进去一次。
二、线程池
1、为什么要用线程池?
线程池的工作就是控制运行的线程的数量,处理过程中将任务放入队列,线程创建后就从任务队列中取出来执行任务。好处就是:线程复用,降低了资源消耗,提高了响应速度、控制最大并发数、方便管理线程。
2、如何使用线程池?
- 架构:Java中的线程池是通过Executor框架实现的,创建线程池使用的是Executors工具类,底层使用的是ThreadPoolExecutor。
- 常见的三种线程池:
ExecutorService executor = Executors.newFixedThreadPool(3);
ExecutorService executor1 = Executors.newSingleThreadExecutor();
ExecutorService executor2 = Executors.newCachedThreadPool();
第一个是线程池中线程数固定的,第二个就是线程池中只有一个线程,第三个就是带缓存的,线程数量可变。这三个底层用的都是ThreadPoolExecutor。
- 具体使用:
public static void main(String[] args) throws Exception{
ExecutorService executor = Executors.newFixedThreadPool(3);
try {
for (int i=0; i<10; i++){
executor.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
}
}catch (Exception e){
}finally {
executor.shutdown();
}
}
运行结果:
运行结果
可以看到,确实创建了三个线程。
3、线程池的7大参数:
点进Executors创建线程池的源码去看看:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
发现只有5个参数,那为什么说是7个参数呢?再点ThreadPoolExecutor进去看看:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
可以发现,最后两个参数用的默认值,不需要我们传,所以我们刚才只看到5个。下面来说说这7大参数都是干嘛的。
- corePoolSize:线程池中的常驻核心线程数(银行今日的当值窗口数)。
- maximumPoolSize:线程池中能够容纳的最大线程数(这个银行网点总共的窗口数)。
- keepAliveTime:多余空闲线程存活的时间。
- unit:存活时间的单位。
- workQueue:任务队列(银行的等待区)。
- threadFactory:创建线程池中线程的工厂。
- handler:拒绝策略。
那么这些参数到底什么意思呢?举个生活当中的例子让你秒懂:
今天星期天,你去银行办理业务。由于星期天,所以只开放了两个窗口,所以corePoolSize就是2。如果刚好来了两个人,那么能满足需求。如果不止两个人,那么其他人就得在等待区等着,这个等待区就是workQueue。如果等待区也坐满了人,那么当值人员就会打电话给经理,让经理叫人加班多开窗口,假如这个银行总共有5个窗口,那么这个5就是maximumPoolSize。如果开了5个窗口还是忙不过来,等待区还是爆满,那么大堂经理就会对办理业务的人说:不好意思,我们这里忙不过来了,请您去别的网点吧。这就是拒绝策略。当办业务的人慢慢的少了,来加班的那几个窗口如果超过了keepAliveTime时间都还没有人来办理业务,那么他们就会下班。也就是说,两个窗口忙得过来就不会劳烦别人加班。
4、线程池的拒绝策略:
上面说到了,如果全部窗口开放了,等待区也满了,还有人来的话,大堂经理就会使用拒绝策略。线程池有四种拒绝策略:
- AbortPolicy(默认):抛异常。
- CallerRunsPolicy:将任务回退到调用者。
- DiscardOldestPolicy:抛弃等待最久的任务。
- DiscardPolicy:丢弃任务。
5、上面说到三个最常见的线程池,生产中使用哪个?
答案是,哪个都不用。也就是说,在实际开发中,不要使用Executors创建线程池。为什么?
因为FixedThreadPool和 SingleThreadExecutor 底层用的阻塞队列是 LinkedBlockingQueue,这个队列是有界,但是最大值是 int 的最大值,21亿多,相当于无界。也就是说可能会在这里堆积大量的任务,造成OOM。CachedThreadPool本身线程数就可变,允许的最大线程数也是 int 的最大值,所以可能会创建大量的线程,最终造成OOM。
既然都不用, 那我们如何创建线程池?答案是我们使用 ThreadPoolExecutor,手动配置那7个参数。如下:
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(
2, // corePoolSize
5, // maximumPoolSize
1, // keepAliveTime
TimeUnit.SECONDS, // unit
new LinkedBlockingQueue<>(3), // workQueue
Executors.defaultThreadFactory(), // threadFactory
new ThreadPoolExecutor.AbortPolicy()); // handler
try {
// 10 个请求
for (int i=1; i<=10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t 办理业务");
});
}
} catch (Exception e) {
}finally {
threadPool.shutdown();
}
}
既然我们说人家Executors创建的线程池不行,那么我们创建的怎么就行呢?那些参数怎么来的?corePoolSize设置为多少、maximumPoolSize又设置为多少才合适?请看下面的线程池参数配置。
6、线程池参数配置:
说参数配置前,先来了解两个概念:
- IO密集型:所谓IO,就是对磁盘进行读写操作。IO密集型就是CPU的运行速度很快,而磁盘读写能力较弱。所以在读写数据的时候,CPU游刃有余,CPU占用率不会很高。
- CPU密集型:CPU密集型就是CPU处理能力不太行,而磁盘读写能力很强。所以在读写的时候,可能出现CPU占用率100%的情况。
CPU密集型要尽可能的减少线程数量,一般公式:
最大线程数 = CPU核数 + 1
IO密集型则应尽可能多的配置线程,一般公式:
最大线程数 = CPU核数 * 2
或者
最大线程数 = CPU核数 / (1 - 0.9)
获取CPU核心数的方式:
Runtime.getRuntime().availableProcessors()
三、死锁问题
1、什么是死锁?
官方解释:两个或两个以上的线程在执行过程中因争夺资源而产生相互等待的现象。
一句话讲明白:吃着碗里的,想着锅里的。再说明白一点,线程1持有锁A,却还想着去拿锁B,线程2持有锁B,却想着拿锁A。
2、写一个死锁:
class DeadLock implements Runnable{
private String lockA;
private String lockB;
public DeadLock(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized(lockA) {
System.out.println(Thread.currentThread().getName() + "持有锁" + lockA + ",尝试获取锁" + lockB);
synchronized (lockB) {
System.out.println(Thread.currentThread().getName() + "持有锁" + lockB + ",尝试获取锁" + lockB);
}
}
}
}
测试:
public static void main(String[] args) {
String lockA = "lockA";
String lockB = "lockB";
new java.lang.Thread(new DeadLock(lockA, lockB), "线程1").start();
new java.lang.Thread(new DeadLock(lockB, lockA), "线程2").start();
}
看运行结果:
运行结果
可以看到,运行根本停不下来。
3、死锁定位分析:
出现上面这种情况,我们怎么知道是死锁造成的呢?也许是死循环呢!给个让人信服的理由!
我们知道Linux中有这样一条命令:
ps -ef | grep xxx
这条命令可以查看进程号,Java也提供了类似的命令,那就是:
jps -l
用这条命令查看到Java进程号后,找到可能出现异常的进程,再输入:
jstack 进程号
这样就可以查看到详细信息了。以上面的代码为例,先在cmd中进入项目路径,输入上面的命令。
执行结果
这样就说明这是死锁了。
网友评论