最近耽误的事情比较多,比如找工作,比如又想研究go语言,go语言都是协程的,然而我们连线程都没搞定.线程池也是我老早就想搞的,一直没有时间,这东西之前去同程面试的时候就被问到了,今天去面试的时候又被考到了~,人生总是这么惨,不想它出现偏要出现..
JDK对于线程提供了5种创建线程池的方法..
Executors.newFixedThreadPool(10);
Executors.newCachedThreadPool();
Executors.newSingleThreadExecutor();
Executors.newWorkStealingPool();
Executors.newScheduledThreadPool(10);
值得一提的事,Executors.newWorkStealingPool()是jdk1.8才有的,官方是这么解释的:Creates a work-stealing thread pool using all available processors as its target parallelism level.大致意思就是创建 一个work-stealing的线程池使用所有可用的处理器作为其目标并行性级别,嗯,,有点抽象,还没研究过,这里也就pass掉了不解释.
先不讲每个线程池创建方式的区别,直接看底层.首先 Executors.newCachedThreadPool() ↓ ↓ ↓ ↓ ↓
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
先不管里面的参数什么意思,再看Executors.newFixedThreadPool(10) ↓ ↓ ↓ ↓ ↓
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
发现一点小规律了,都是new ThreadPoolExecutor(),接着再看Executors.newScheduledThreadPool()↓ ↓ ↓ ↓ ↓
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
继续看里面的ScheduledThreadPoolExecutor↓↓↓↓↓
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
super就是用父类的,也就是new ThreadPoolExecutor(),只是queue类型不一样,其他长的都差不多,最后看Executors.newSingleThreadExecutor()↓ ↓
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
不考虑 外面那层FinalizableDelegatedExecutorService,里面的还是new ThreadPoolExecutor,都是这玩意.
看其行,知其本意~这才是架构师做的!所以这里直接不将jdk提供的方法,直接对着new Thread里去讲.
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
corePoolSize表示核心线程,意思就是你创建线程池的时候就存在多少个线程;
maximumPoolSize:max嘛就是最大,最大线程数量;
keepAliveTime:keep就是空闲,keepAliveTime就是空闲线程存活的时间;
unit:就表示存活时间的单位了;
workQueue:队列,这玩意就需要你有队列基础了,我会讲一点,剩下靠你自己去了解了~
然后再回过来看Executors.newFixedThreadPool(10) ↓ ↓ ↓ ↓ ↓
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
nThreads为10,所以就是创建线程池时核心线程为10,(就是new 这个线程池的时候存在了10个线程),最大值也是10,时间为0,单位不管它,最后用的是LinkedBlockingQueue,这里补充一点,就是什么是LinkedBlockingQueue?概念上说:首先它是一个无界队列,然后他是基于链表阻塞的队列,其内部实现了读写分离锁,所以能高效的实现并发.ok,这个newFixedThreadPool使用的是无界的.
然后newCachedThreadPool用的是SynchronousQueue,这东西特别好理解,中间没有缓存区,生产端直接通向消费端.
查了好多百度,度娘对于自定义线程池是这么解释的:
在使用有界队列时,若有新的线程需要执行,如果线程池实际线程小于corePoolsize,则优先创建线程,若大于corePoolSize,则会将任务加入队列,若队列已满,则在总线程不大于maximumPoolSize的前提下,创建新的线程,若线程数大于maximumPoolSize,则执行拒绝策略.
无界的队列时:LinkedBlockingQueue.除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况.当有新任务到来,系统的线程数小于corepoolSize时,则新建线程执行任务.当达到corePoolSize后,线程不会增加,反过来说,就是用无界队列跟 maximumPoolSize半毛钱关系都没有.若后续还有新的任务,则进入队列中等待,直至宕机.
概念性的东西就上面的了,首先你得多读几遍,直至能理解了,或许还是不懂,没事,我们直接上demo,无界队列就没什么好试的了,直接上有界队列↓↓↓
public class MyTask implements Runnable {
private int taskId;
private String taskName;
public MyTask(int taskId, String taskName) {
this.taskId = taskId;
this.taskName = taskName;
}
public int getTaskId() {
return taskId;
}
public void setTaskId(int taskId) {
this.taskId = taskId;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public void run() {
try {
System.out.println("当前线程Id-->" + taskId + ",任务名称-->" + taskName);
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
然后是main方法
public class ThreadPool {
public static void main(String[] args) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3));
MyTask task1 = new MyTask(1, "任务1");
MyTask task2 = new MyTask(2, "任务2");
MyTask task3 = new MyTask(3, "任务3");
MyTask task4 = new MyTask(4, "任务4");
MyTask task5 = new MyTask(5, "任务5");
MyTask task6 = new MyTask(6, "任务6");
pool.execute(task1);
}
}
我们直接执行看打印结果↓↓↓
当前线程Id-->1,任务名称-->任务1
这个是肯定的,然后我们继续pool.execute(task2),让线程池执行任务2,打印结果↓↓↓
当前线程Id-->1,任务名称-->任务1
当前线程Id-->2,任务名称-->任务2
情况有点不一样了,任务2是在任务1结束之后才出来,也就是在任务1执行要5秒,任务2在任务1出现的5秒后才打印.继续pool.execute(task3),打印结果如下↓↓↓
当前线程Id-->1,任务名称-->任务1
当前线程Id-->2,任务名称-->任务2
当前线程Id-->3,任务名称-->任务3
先执行任务1,然后5秒后打印任务2,再5秒后打印任务3,,,也就是说,当前线程池只有一个线程去执行任务.也就是corePoolsize这个值,究竟是不是呢?自己去试了~(答案是肯定的,如果把上面的值换成2,那么打印结果就是任务1和任务2同时出现,5秒后再打印任务3)
我们接着看~,直接pool.execute(task4);pool.execute(task5),这回看打印结果↓↓↓↓↓
当前线程Id-->5,任务名称-->任务5
当前线程Id-->1,任务名称-->任务1
当前线程Id-->2,任务名称-->任务2
当前线程Id-->3,任务名称-->任务3
当前线程Id-->4,任务名称-->任务4
我们会发现任务5和任务1是同时打印的,然后5秒一间隔执行其他任务,再回过来看上面的概念-->若队列已满,则在总线程不大于maximumPoolSize的前提下,创建新的线程.概念是这样的,我们再来解释,我们的核心线程只有1个,然后现在我们有5个任务要执行,因为有一个线程可以执行任务,所以还剩下4个任务,我们上面的ArrayBlockingQueue有界队列只能放3个任务,所以最后只剩下一个任务,这一个任务怎么办?判断maximumPoolSize,就是如果小于这个值,那么我新建线程去执行,优先级是很高的.(如果还不懂,自己敲~看我说的,多读几遍)
最后我们再去pool.execute(task6);
打印结果↓↓↓↓↓↓
当前线程Id-->1,任务名称-->任务1
当前线程Id-->5,任务名称-->任务5
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.threadpool.MyTask@4e25154f rejected from java.util.concurrent.ThreadPoolExecutor@70dea4e[Running, pool size = 2, active threads = 2, queued tasks = 3, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at com.threadpool.ThreadPool.main(ThreadPool.java:24)
当前线程Id-->2,任务名称-->任务2
当前线程Id-->3,任务名称-->任务3
当前线程Id-->4,任务名称-->任务4
任务6没有执行!而是被拒绝了,但是其他任务还是可以正常执行,new ThreadPoolExecutor()就可以看到里面有些构造方法还带有RejectedExecutionHandler,这就是拒绝策略,默认是AbortPolicy(直接抛出异常组织系统正常工作),还有jdk其他提供的:
CallerRunsPolicy: 只要线程池未关闭,该策略直接在调用者的线程中,运行当前被丢弃的线程.
DiscardOldestPolicy:丢弃最老的一个请求,尝试再次提交当前任务(这个要用脑子去想队列的先进先出,最先进队列的会被丢弃掉,比如我们上面的代码要是用了这个策略,那么任务2就会被丢弃掉).
DiscardPolicy:丢弃最老的一个请求,不给予任何处理.
如果需要自定义拒绝策略可以实现RejectedExecutionHandler接口
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
通常我们在生产环境中,当然我没在生产中搞过线程池,不过我猜,如果要用这个拒绝策略,当然也得看需求,如果是访问的,那直接拒绝没事,那如果是要处理数据的这种拒绝,我们就要记录日志,然后跑个job感觉是最好的方案,也有的可以用urlClient(apache的) 调api去处理,,,,方法多种~
拒绝这个就不演示了,其实还有好多没讲,有些还是得靠自己去研究,比如这个时间我就没去说,这个时间结合DelayedWorkQueue是可以实现定时job的等等还有好多的玩法~
如果我有哪些讲的不对,请及时联系我!!
有啥不懂的请加qq727865942,微信号 cto_zej,觉得是干货请打赏~~
网友评论