1.线程池的类型
-
FixedThreadPool
初始化有10个固定线程的线程池,并向其中加入100个task:private val fixedThreadPoolExecutor = Executors.newFixedThreadPool(10) fun main(args: Array<String>) { for (i in 0..100) { fixedThreadPoolExecutor.execute(Task()) } } class Task : Runnable { override fun run() { println(System.currentTimeMillis()) } }
线程池有固定的线程数,如果在加入task时,10个线程已经跑满了,那么就会把任务放到等待队列
LinkedBlockingQueue()
中。 -
CachedThreadPool
CachedThreadPool
和FixedThreadPool
不同点在于,CachedThreadPool
的线程数量是不固定的,会随着任务的增减动态的改变。 -
ScheduledThreadPool
ScheduledThreadPool
的特点在于可以完成延时任务或者完成定时任务。
其中关注三个主要方法:- 10秒后执行task:
scheduledThreadPool.schedule(Task(), 10000, TimeUnit.MILLISECONDS)
- 延迟10s,每10秒执行一次task:
scheduledThreadPool.scheduleAtFixedRate(Task(), 10000, 10000, TimeUnit.MILLISECONDS)
- 延迟10s,上次task完成10秒后再次执行task:
scheduledThreadPool.scheduleWithFixedDelay(Task(), 10000, 1000p, TimeUnit.MILLISECONDS)
- 10秒后执行task:
-
SingleThreadPoolExecutor
很好理解,这个线程池只有一个线程,可以保证所有添加的任务都按顺序执行。
如果使用的是FixedThreadPool
,就会涉及到到底使用几个线程好的问题。创建线程本身会占用一定的资源,Java中的一个线程就对应着OS中的一个线程,所以也无法无限制的创建线程个数。线程间的切换也会消耗资源,通常可以从这两个角度考虑:
- 线程池执行的任务是否是CPU密集型任务,例如:编解码、加解密或者需要其他需要被紧急执行的任务,这是可以把线程数设置为可用的CPU核心个数
val availableProcessors = Runtime.getRuntime().availableProcessors()
,这样可以保证每个被执行的任务都能一直占用CPU,直到任务完成而不会被线程调度暂停任务(理想情况)。 - 线程池执行的任务是否是I/O密集型等需要等待的任务,例如:请求网络需要等待响应任务或者不是很紧急需要处理的任务,那么可以将线程数设置为2或4倍的CPU核心线程数,这样可以保证尽量多的任务被执行,并且利用任务等待响应的时间切换给其他任务执行。
以上两条策略对于移动端(Android开发)更加适用,如果是服务端开发可以遵循这样的思路,但是在具体线程数量选择上有待商榷。
如果再向下跟踪下源码,发现以上提到的线程池类型都是通过
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
构造的,区别在于提供的参数不一致,这里面各个参数的详细解释可以参考上面提供的四个类型的线程池详细看下。这里需要提醒的以下的是RejectedExecutionHandler handler
这个类,这里会将线程池无法处理的task(因为超出等待队列容量等原因)抛出来,默认提供的实现会抛出异常导致Crash,那么最好在这里重新提供一个实现类,自行接管处理。
2.线程池生命周期
相关方法:
-
scheduledThreadPool.shutdown()
:调用这个方法后,线程池会等待当前正在执行的任务和等待队列中的task完成后关闭线程池。这个方法调用后就不能再向线程池提交task,否则会收到RejectionExecutionException
异常。 -
scheduledThreadPool.isShutdown()
:用于查询scheduledThreadPool.shutdown()
改方法是否被调用过 -
scheduledThreadPool.awaitTermination(10,TimeUnit.MILLISECONDS)
:调用该方法会阻塞当前线程直到任务全部完成或者后面设置的时间截止,然后关闭改线程池 -
isTerminated()
:用于查询线程池是否已经关闭,这里和scheduledThreadPool.isShutdown()
区别在于,只有当线程池当前没有任何正在执行的任务并且等待队列中也没有任何task时返回true
-
scheduledThreadPool.shutdownNow()
:这个方法和scheduledThreadPool.shutdown()
区别在于scheduledThreadPool.shutdownNow()
只会将正在执行的任务执行完毕,并且返回等待队列中的task
3.Callable和Future
向线程池提交任务有两种方式:
- 实现
Runnable
- 实现
Callable
class Task : Runnable {
override fun run() {
println(System.currentTimeMillis())
}
}
class TaskFuture : Callable<Int> {
@Throws(Exception::class)
override fun call(): Int {
return Random.nextInt()
}
}
这两种实现方式表面上的区别在于Runnable
的实现方法没有返回值,Callable
的方法有返回值,并且Callable
必须通过fixedThreadPoolExecutor.submit()
提交任务。
val submit: Future<Int> = fixedThreadPoolExecutor.submit(TaskFuture())
val get:Int = submit.get()
submit()
后会返回一个Future
类,通过Future.get()
方法即可得到执行结果。对于Future
类需要注意的是,调用Future.get()
方法,会阻塞当前线程直到这个task完成,当然如果这个task在调用Future.get()
之前已经完成了,那么会直接返回计算完成的结果。
前面讲到调用Future.get()
方法,会阻塞当前线程直到这个task完成,那么如果是在主线程上调用,仍然会阻塞主线程,这对本该异步执行的方法就没有意义了。看下面这个 查找员工--->计算税率--->发送邮件 例子:
for (id in ids) {
//get employee info from DB
val future = fixedThreadPoolExecutor.submit(EmployeeFetcher(id))
val employee = future.get()//blocking
//get employee tax rate from DB
val futureTaxRate = fixedThreadPoolExecutor.submit(TaxRateFetcher(employee))
val taxRate = futureTaxRate.get()//blocking
//send email to employee
fixedThreadPoolExecutor.submit(SendEmail(taxRate))
}
这时就要用到CompletableFuture
,上代码
for (id in ids) {
CompletableFuture.supplyAsync { getEmployeeInfo(id) }
.thenApplyAsync { employee -> getTaxRate(employee) }
.thenAcceptAsync { taxRate -> sendEmail(taxRate) }
}
这样就不会阻塞当前线程。其实很多库例如RxJava
对于这样的情况都有很好的处理,这里仅做一个概念介绍。
网友评论