使用线程池原因
Android开发,用传统new Thread创建线程,造成一些问题:
- 任务众多时,每个任务创建一个线程,任务结束后销毁,造成线程频繁创建和销毁
- 多个线程频繁创建占用大量资源,资源竞争容易出错,缺乏统一管理,造成界面卡顿。
- 多线程频繁销毁,会频繁调用GC机制,会使性能降低,非常耗时
线程池好处:
1、多线程统一管理,避免资源竞争中出现问题
2、对线程复用,线程执行完后不会立即销毁,等待另外任务,不会频繁创建、销毁线程和调用GC
3、提供了一套完整的ExecutorService线程池创建api,创建功能不一的线程池,使用方便。
几种常见的线程池
ThreadPoolExecutor创建基本线程池
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
corePoolSize:线程池核心线程数量
注意:线程池存在核心线程和非核心线程;核心线程一旦创建会一直执行任务或等待任务到来;非核心线程只会任务队列塞满任务时去执行多出的任务,非核心线程一段时间后会被回收,这个时间作为参数可调配。
maximumPoolSize:最大线程数量。
keepAliveTime:非核心线程要等待下一个任务到来的时间;任务很多、执行时间很短可调大有助提高线程利用率。
unit:时间单位
workQueue:任务队列
threadFactory:线程工厂。设置线程名字等。
fun test1() {
// 1、核心线程数量;最大线程数;线程空闲时等待下一个任务到来时间;时间单位;任务队列长度
// 2、任务队列为100,不会开启额外的 5-3=2 个非核心线程。如果队列设置25,则前三个任务核心线程执行,剩下的30-3=27进入队列会满
// 此时会开启2个非核心线程来执行剩下的2个任务
// 3、疑问:为何每个for循环里都有一个sleep(200) ,为何每2s打印三个任务;因为开始时候,只声明runnable对象,并重写run方法,并没有运行,
// 而后execute(runnable)才会sleep,又因为核心线程为3,所以会开启三个核心线程,各执行run方法。
// 4、如果队列给25,for给31个,则会报任务拒绝异常
val threadPoolExecutor = ThreadPoolExecutor(
3, 5, 1,
TimeUnit.SECONDS, LinkedBlockingQueue<Runnable>(25)
)
// 30个runnable
for (i in 0..29) {
Log.d(TAG, "index:$i")
val runnable = Runnable {
run {
try {
Thread.sleep(2000)
Log.d(TAG, "index:$i ")
Log.d(TAG, "当前线程:${Thread.currentThread().name}")
} catch (e: Exception) {
Log.e(TAG, "e:$e")
}
}
}
// 1、execute 一个线程之后,如果未达到核心线程数,立马启用一个核心线程执行。
// 2、execute 一个线程之后,如果已达到核心线程数,且workQueue未满,则将新线程放入workQueue中等待执行
// 3、execute 一个线程之后,如果已达到超过非核心线程数,则拒绝执行该任务,采取饱和策略,抛出RejectedExecutionException异常
try {
// 如果任务溢出,任务拒绝异常
threadPoolExecutor.execute(runnable)
}catch (e:Exception){
Log.e(TAG, "开启线程异常-e:$e")
}
}
}
FixedThreadPool(可重用固定线程数)
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
特点:参数核心线程数。无非核心线程数,阻塞队列无界
val fixedThreadPool = Executors.newFixedThreadPool(5)
for (i in 0..29){
val runnable = Runnable {
run {
try {
Thread.sleep(2000)
Log.d(TAG, "index:$i")
Log.d(TAG, "当前线程:${Thread.currentThread().name}")
}catch (e:Exception){
Log.d(TAG, "e:$e")
}
}
}
fixedThreadPool.execute(runnable)
}
每2s打印5次任务
CachedThreadPool(按需创建)
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
特点:没有核心线程。只有非核心线程,并且每个非核心线程空闲等待为60s采用SynchronousQueue队列。
fun test1() {
val cachedThreadPool = Executors.newCachedThreadPool();
for (i in 0..29) {
val runnable = Runnable {
run {
try {
Thread.sleep(2000)
Log.d(TAG, "index:$i")
Log.d(TAG, "name:${Thread.currentThread().name}")
} catch (e: Exception) {
Log.d(TAG, "e:$e")
}
}
}
cachedThreadPool.execute(runnable)
}
}
过2s后直接打印30个任务
分析:
- 因为没有核心线程,全为非核心线程,SynchronousQueue是不存储元素的,每次插入操作伴随一个移除操作,一个移除操作伴随一个插入操作。
- 当一个任务执行时,先用SynchronousQueue的offer提交任务,如果线程池中又空闲线程,则调用SynchronousQueue的poll方法来移除任务并交给线程处理;如果没有线程空闲,则开启新非核心线程处理任务
- 由于maximumPoolSize是无界的,所以如果线程处理任务速度小于提交任务速度,则会不断创建新线程,注意过度创建,应调整双方速度,不然创建过多影响性能。
- CachedThreadPool使用于大量需要立即执行的耗时任务的情况。
SingleThreadPool(单个核心的fixed)
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
fun test1() {
val singlePoolExecutor = Executors.newSingleThreadExecutor();
for (i in 0..29){
val runnable = Runnable {
run {
try {
Thread.sleep(2000)
Log.d(TAG, "index:$i")
Log.d(TAG, "name:${Thread.currentThread().name}")
}catch (e:Exception){
Log.d(TAG, "e:$e")
}
}
}
singlePoolExecutor.execute(runnable)
}
}
每2s打印一个任务;只有一个核心线程,无非核心线程,当被占用时,其的任务需要进入队列等待。
ScheduledThreadPool(定时延时执行)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
fun test1() {
val scheduleExecutorService = Executors.newScheduledThreadPool(3)
val runnable = Runnable {
run {
Log.d(TAG, "延迟执行-name:${Thread.currentThread().name}")
}
}
scheduleExecutorService.schedule(runnable,5,TimeUnit.SECONDS)
}
自定义PriorityThreadPool(队列中优先级比较的线程池)
自定义Runnable, 继承Comparable接口
public abstract class PriorityRunnable implements Runnable, Comparable<PriorityRunnable> {
private int priority;
public PriorityRunnable(int priority) {
if (priority < 0) {
throw new IllegalArgumentException();
}
this.priority = priority;
}
public int getPriority() {
return priority;
}
@Override
public int compareTo(PriorityRunnable o) {
int me = this.priority;
int anOtherPri = o.getPriority();
return me == anOtherPri ? 0 : me < anOtherPri ? 1 : -1;
}
@Override
public void run() {
doSomeThing();
}
protected abstract void doSomeThing();
}
利用抽象类Comparable接口重写compareTo方法来比较优先级
fun test1() {
val priorityThreadPool = ThreadPoolExecutor(
3, 3, 0, TimeUnit.SECONDS,
PriorityBlockingQueue<Runnable>()
)
for (i in 0..29) {
priorityThreadPool.execute(object : PriorityRunnable(i) {
override fun doSomeThing() {
Log.d(TAG, "优先级-pri:$i 的任务被执行 name:${Thread.currentThread().name}")
try {
Thread.sleep(2000)
} catch (e: Exception) {
Log.d(TAG, "e:$e")
}
}
})
}
}
前三个任务被创建的三个核心线程执行,之后27个任务进入队列,并调用compareTo方法进行排序,从大到小顺序打印。
Java中的阻塞队列
生产者--消费者。生产者向队列放元素,消费者取,队列中无元素,消费者取则阻塞,如果队列元素满,则生产者阻塞。
常见阻塞队列7种:
ArrayBlockingQueue:数组结果组成有界阻塞队列
LinkedBlockingQueue:链表结果组成有界阻塞队列
PriorityBlockingQueue:优先级排序无界队列
DelayQueue:优先级队列实现无界队列
SynchronousQueue:不存储元素阻塞队列
LinkedTransferQueue:链表结构无界阻塞队列
LinkedBlockingDeque:链表结构双向阻塞队列
线程池使用场景
-
newCachedThreadPool:
底层:核心线程为0,最大线程为Integer.MAX_VALUE,等待60s,SynchronousQueue同步队列。
当有新任务来,则插入队列中,寻找可用线程来执行,若有则执行,若无则创建来执行;若线程空闲时间超过指定大小,则线程会被销毁。
适合:执行很多短期异步的小程序或者负载较轻的服务器 -
newFixedThreadPool:
适合:执行长期任务,性能好
*newScheduledThreadPool
周期执行任务场景
线程池常用方法
shutDown()// 关闭线程池,不影响已经提交的任务
shotDownNow()// 关闭线程池,并尝试去终止正在执行的线程
allowCoreThreadTimeOut(boolean)// 允许核心线程闲置超时时被回收
submit // 一般用execute提交任务。submit有返回值
beforeExecute()// 任务执行前执行的方法
afterExecute()// 任务执行结束后执行的方法
terminated() // 线程池关闭后执行的方法
网友评论