-
java线程池的实现 ThreadPoolExecutor
-
java线程池几个参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize
当线程数没达到corePoolSize
时就创建新的线程,直到创建到corePoolSize
就会用队列,队列满了就会创建新的线程直到maximumPoolSize
,详见下面 线程池任务提交的判断逻辑 - keepAliveTime
当线程池的线程数超过corePoolSize
时,如果没有新的任务提交会等待keepAliveTime
后才销毁 - ThreadFactory
创建新线程,默认使用Executors.defaultThreadFactory()
创建,使用默认的ThreadFactory来创建线程时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设置了线程的名称。 - BlockingQueue
Queue名称 | 是否有界 | 是否阻塞 | 是否有优先级 | executors | 其他说明 |
---|---|---|---|---|---|
ArrayBlockingQueue | 有界 | 阻塞 | 无优先级 | FIFO,底层是个Object数组 | |
LinkedBlockingQueue | 无界 | 阻塞 | 无 | newFixedThreadPool, newSingleThreadExecutor |
FIFO,比ArrayBlockingQueue吞吐量高 |
SynchronousQueue | 不存储元素,等上一个元素移除才能插入 | 阻塞 | 无 | newCachedThreadPool | 吞吐量比LinkedBlockingQueue通常要高 |
PriorityBlockingQueue | 无界 | 阻塞 | 有 | 底层是数组,但是会自动增 |
- RejectedExecutionHandler 见下方
-
溢出的时候的抛弃策略 RejectedExecutionHandler
- AbortPolicy 直接抛异常
- CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务
- DiscardPolicy 什么也不干,空方法.
- DiscardOldestPolicy 当任务拒绝添加时,会抛弃任务队列中最旧的任务也就是最先加入队列的,再把这个新任务添加进去。
-
线程池任务提交的判断逻辑
当有新任务在execute()方法提交时,会执行以下判断:
- 如果运行的线程少于 corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的;
- 如果线程池中的线程数量大于等于 corePoolSize 且小于 maximumPoolSize,则只有当workQueue满时才创建新的线程去处理任务;
- 如果设置的corePoolSize 和 maximumPoolSize相同,则创建的线程池的大小是固定的,这时如果有新任务提交,若workQueue未满,则将请求放入workQueue中,等待有空闲的线程去从workQueue中取任务并处理;
- 如果运行的线程数量大于等于maximumPoolSize,这时如果workQueue已经满了,则通过handler所指定的策略来处理任务;
所以,任务提交时,判断的顺序为 corePoolSize --> workQueue --> maximumPoolSize
- 用法区别
synchronized:在需要同步的对象中加入此控制,synchronized可以加在方法上,也可以加在特定代码块中,括号中表示需要锁的对象。
lock:需要显示指定起始位置和终止位置。一般使用ReentrantLock类做为锁,多个线程中必须要使用一个ReentrantLock类做为对象才能保证锁的生效。且在加锁和解锁处需要通过lock()和unlock()显示指出。所以一般会在finally块中写unlock()以防死锁。 - 性能区别
synchronized是托管给JVM执行,lock是java写的;在JDK1.5中是性能低下的,因为这个是个重量级操作,有可能加锁时间比执行的时间还长;但是到了JDK1.6以后就有很大不同了,因为synchronized的语义很清晰,可以进行很多优化,比如:适应性自旋(如果上一次失败了,下一次就会少自旋几次),锁消除,轻量级锁,偏向锁等;synchronized原始采用的是CPU悲观锁机制,即线程获得的是独占锁,都赞锁意味着其他线程只能依靠线程阻塞来等待锁释放,而CPU在转换线程阻塞时会引起上下文切换,当有很多线程竞争时,上下文切换回导致效率很低.
而Lock是用乐观锁的方式,采用的是CAS操作,性能相对较高. - 用途区别
复杂用途时用ReentrantLock,比如:
1.某个线程在等待一个锁的控制权的这段时间需要中断,lock.lockInterruptibly()能有效的响应中断
2.需要分开处理一些wait-notify,ReentrantLock里面的Condition应用,能够控制notify哪个线程
3.具有公平锁功能,每个到来的线程都将排队等候
4.尝试非阻塞式的获取锁,tryLock
-
java跳表的并发问题
-
什么是跳表
image.pngSkipList
SkipList是基于链表的数据结构,而且是有层次的链表结构,是有序的,默认按照key值升序
如图:
普通的链表搜索时时间复杂度是O(N),跳表的按层次进行,一层一层的搜索.时间复杂度为O(logn)
当需要添加一个元素的时候,会先按照上面的方法很容易找到应该加入的位置,然后在用一种随机的算法来确定层次,再重新调整不同层次的链表,如图:
image.png
java中提供了跳表的两种实现,ConcurrentSkipListMap 和 ConcurrentSkipListSet -
ConcurrentSkipListMap
内部有3个比较重要的结构:Node,Index,HeadIndex
;Node表示最底层的单链表有序节点、Index表示为基于Node的索引层,HeadIndex用来维护索引层次;ConcurrentSkipListMap是通过HeadIndex维护索引层次,通过Index从最上层开始往下层查找,一步一步缩小查询范围,最后到达最底层Node时,就只需要比较很小一部分数据了. -
容器
线程安全容器 | 说明 | 线程不安全容器 | 说明 |
---|---|---|---|
CopyOnWriteArrayList | volatile修饰数组,修改时同步并复制; 只适合小数据量的,大数据量复制效率超低 |
ArrayList | |
CopyOnWriteArraySet | 内部包含了一个CopyOnWriteArrayList | HashSet | |
ConcurrentSkipListSet | 用ConcurrentSkipListMap实现 | TreeSet | |
ConcurrentHashMap | HashMap | ||
ConcurrentSkipListMap | TreeMap | 红黑树实现排序 | |
ConcurrentLinkedQueue | 无界队列,支持FIF | ||
ConcurrentLinkedDeque | 无界双端队列,支持FIFO和FILO。 | ||
ArrayBlockingQueue | 数组实现的 线程安全的 有限 阻塞队列 | ||
LinkedBlockingQueue | 单链表实现的、线程安全的、无限 阻塞队列 | ||
LinkedBlockingDeque | 双向链表实现的、线程安全的、 双端 无限 阻塞队列 |
-
怎么设计线程池的大小
- 影响因素: 负载特性以及底层硬件;任务阻塞的频率.
- 任务的性质:IO密集型,CPU密集型,混合型.
- 任务的优先级: 高中低
- 任务的平均执行时间:长,中,短
- 任务的依赖性:是否依赖其他系统资源,如数据库连接等。
线程数 |
---|
CPU密集型线程数 = CPU 数+1,设置尽可能小的线程数,和CPU数一样就可以了,减少线程上下文的切换 |
IO密集型线程数 = 2*CPU数 +1 ,尽可能大的线程数,不要让CPU闲下来 |
混合型= 等待资源的时间越长,线程数应该设置的越大,不要让 CPU闲下来 |
- 公式
最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目 =(线程等待时间/线程CPU时间 + 1)* CPU数目
场景 | 设置方法 |
---|---|
高并发,任务执行时间短 | 设置为CPU数+1,减少上下文切换 |
低并发,任务执行时间长 | 1.CPU密集型:线程数 = CPU数+1 2.IO密集型:适当加大线程数以免CPU闲下来 |
高并发,任务执行时间长 | 解决这种类型任务的关键不在于线程池而在于整体架构的设计.主要方向为减少任务执行时间,比如 1.是否可以利用缓存;2.增加服务器;3.能否用消息中间件进行解耦 |
-
应用KISS原则(Keep it simple,stupid),可以将ThreadPoolExecutor的核心线程数和最大线程数设置成一样,在选择队列方面,如果适合使用无界任务列表就用
LinkedBlockingQueue
,如果适合有界的就用ArrayBlockingQueue
-
AbstractQueuedSynchronizer剖析
队列同步器AbstractQueuedSynchronizer
是用来构建锁和其他同步组件的框架,使用了一个int成员变量private volatile int state
表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作
同步器的三个操作状态的方法分别是:getState(),setState(int newState),compareAndSetState(int expect,int update)
1. 同步队列
一个FIFO的双向队列
2. 独占式同步状态获取与释放
3. 共享式同步状态获取与释放
4. 超时获取同步状态
-
ReentrantLock剖析
-
死锁
-
例子
package com.byedbl.lock;
public class DeadLockDemo {
private String A = "a";
private String B = "b";
public static void main(String[] args) {
new DeadLockDemo().deadlock();
}
private void deadlock() {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (A) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (B) {
System.out.println("t1");
}
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
synchronized (B) {
synchronized (A) {
System.out.println("t2");
}
}
}
});
t1.start();
t2.start();
}
}
运行发现一直在跑,没打印任何东西出来,查看dump文件有如下内容:
image.png
- 怎么避免
1.避免一个线程同时获取多个锁
2.避免一个线程在锁内同时占用多个资源,尽量保证每个锁只占一个资源
3.尝试使用定时锁,lock.tryLock(timeout)来替代使用内部锁
4.对于数据库锁,加锁和解锁必须在数据库连接内,否则会出现解锁失败的情况.
多线程的问题
1.上下文切换
线程让出CPU时间片会产生上下文切换;
解决方案:
1)无锁编程,数据hash(id)取模分段处理.
2)用CAS,Atomic包就是CAS算法.
3)合理的创建线程,不要创建过多的线程.
Synchronize 关键字原理:
有三种使用方式:
- 同步方法,锁的是当前的实例对象
- 同步静态方法,锁的是class类对象
- 同步代码块,锁的是{}代码中的对象
实现原理就是 JVM通过在进入退出对象监视器Monitor来实现对方法,同步块的同步的.没有获取到Monitor的就进入等待队列.
锁优化:
synchronize为重量级锁
轻量锁:
认为大多数锁在整个同步周期都不存在竞争,所以使用CAS比使用互斥开销更小,但是如果竞争激烈,轻量锁就不仅有互斥开销,而且有CAS开销,效率更差.
偏向锁:
进一步降低了获取锁的代价,偏向锁可以提高带有同步却没有竞争的程序性能,但如果程序中大多数锁都存在竞争时,那偏向锁就起不到太大作用.
适应性自旋:
自旋耗CPU,所以加入了适应性自旋,下次就会减少自旋.
volatile 可以保证可见性和顺序性.
JVM会重排指令,所以双重检查锁机制会加个volatile关键字.
锁:
重入锁 ReentrantLock :每次获取锁时会判断当前线程是否为获取锁的线程,如果是就状态+1,释放时是将状态-1,只有将同步状态为0时才会释放锁.
AQS(AbstractQueuedSynchronizer)
ReentrantLock分公平锁和非公平锁,公平锁要关心队列情况效率更慢
读写锁:
ReentrantReadWriteLock,同时维护一堆锁,读锁和写锁,当写线程时则其他锁都将阻塞,读时就不会.这样可大大增加吞吐量和并发量.
分布式锁
1)基于数据库主键
2)基于数据库for update
3)基于Redis的 setNX 和 setEX(timeout)
4)基于ZK
-
什么时候用线程池
-
手写线程池
- 创建线程池接口 ThreadPool
package com.byedbl.threadpool;
import java.util.List;
public interface ThreadPool {
//执行一个Runnable类型的任务
void execute(Runnable task);
void execute(Runnable[] tasks);
void execute(List<Runnable> tasks);
//返回已经执行任务的个数 executor执行者 task任务
int getExecuteTaskNumber();
//返回任务队列的长度,即还没处理的任务个数 wait等待
int getWaitTaskNumber();
//返回工作线程的个数
int getWorkThreadNumber();
//关闭线程池
void destroy();
}
- 实现线程池接口
package com.byedbl.threadpool;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class ThreadPoolManager implements ThreadPool {
//线程池中默认的线程个数为5
private static int workerNum=5;
//工作线程组
WorkThread[] workThreads;
//执行任务的数量 volatile
private static AtomicInteger executeTaskNumber= new AtomicInteger(0);
//任务队列,作为一个缓冲,List线程不安全(阻塞队列)
private BlockingQueue<Runnable> taskQueue=new LinkedBlockingDeque<Runnable>();
private static ThreadPoolManager threadPool;
//可以用原子方式更新的 long 值(原子类)
private AtomicLong threadNum=new AtomicLong();
private ThreadPoolManager(){
this(workerNum);//创建默认线程个数的线程池
}
private ThreadPoolManager(int worker_Num){
ThreadPoolManager.workerNum=worker_Num;
workThreads=new WorkThread[worker_Num];
for(int i=0;i<worker_Num;i++){
workThreads[i]=new WorkThread();//初始化(调用父类Thread类的无参构造方法,分配新的Thread对象)
System.out.println("线程池中的工作线程数量:"+(i+1)+" 当前线程的名称是:"+workThreads[i].getName());
workThreads[i].start();//一个native方法,它将启动一个新线程,并执行run()方法
}
}
public static ThreadPool getThreadPool(){//获得默认线程个数的线程池
return getThreadPool(ThreadPoolManager.workerNum);
}
//单例模式
public static ThreadPool getThreadPool(int worker_Num){
if(worker_Num<=0){
worker_Num=ThreadPoolManager.workerNum;
}
if(threadPool==null){
threadPool=new ThreadPoolManager(worker_Num);
}
return threadPool;
}
// 执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器觉定
public void execute(Runnable task) {
synchronized (taskQueue) {
taskQueue.add(task);
taskQueue.notifyAll();
}
}
// 批量执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器觉定
public void execute(Runnable[] task) {
synchronized (taskQueue) {
for (Runnable t : task)
taskQueue.add(t);
taskQueue.notifyAll();
}
}
// 批量执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器觉定
public void execute(List<Runnable> task) {
synchronized (taskQueue) {
for (Runnable t : task)
taskQueue.add(t); //把任务加入队列
taskQueue.notifyAll(); //当调用execute()方法的时候,执行notiry(),只会唤醒线程池中的一个线程,注意与notoryAll()的区别
}
}
@Override
public int getExecuteTaskNumber() {
return executeTaskNumber.get();
}
@Override
public int getWaitTaskNumber() {
return taskQueue.size();
}
@Override
public int getWorkThreadNumber() {
return workerNum;
}
@Override
public String toString() {
return "当前线程数量:" +workerNum+ " 已完成任务:"
+getExecuteTaskNumber()+ " 等待任务数:" + getWaitTaskNumber();
}
@Override
public void destroy() {
//不断检查任务队列中存在任务
while (!taskQueue.isEmpty()) {// 如果还有任务没执行完成,就先睡会吧
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 工作线程停止工作,且置为null
for (int i = 0; i < workerNum; i++) {
workThreads[i].stopWorker();
workThreads[i] = null;
}
threadPool=null;
taskQueue.clear();// 清空任务队列
}
/*内部类 即一个线程池对象*/
private class WorkThread extends Thread{
//该工作线程是否有效,用来接收该工作线程
private volatile boolean isRunnable=true;
/*
* 关键所在,如果任务队列不空,则取出任务执行,若任务队列为空,则等待
*/
@Override
public void run() {
//接收队列当中的任务对象 任务对象Runnable类型
Runnable r=null;
while(isRunnable){
//队列同步机制
synchronized(taskQueue){
while(isRunnable && taskQueue.isEmpty()){//队列为空
try {
taskQueue.wait(20);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if(!taskQueue.isEmpty()){
try {
r=taskQueue.take();// 获取并移除第一个元素
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if(r!=null){
r.run();//执行任务
executeTaskNumber.incrementAndGet();
}
r=null;
}
}
}
public void stopWorker(){
isRunnable=false;
}
}
}
- 客户端实现
package com.byedbl.threadpool;
public class Test {
public static void main(String[] args) {
// 创建3个线程的线程池
ThreadPool t = ThreadPoolManager.getThreadPool();
t.execute(new Runnable[] { new Task(), new Task(), new Task() });
t.execute(new Runnable[] { new Task(), new Task(), new Task() });
System.out.println(t);
t.destroy();// 所有线程都执行完成才destory
System.out.println(t);
}
// 任务类
static class Task implements Runnable {
private static volatile int i = 1;
@Override
public void run() {// 执行任务
System.out.println("任务 " + (i++) + " 完成");
}
}
}
网友评论