线程池
1. 什么是线程池
线程的池化,一个线程的容器、集合,包含多个线程
2. 为什么要用线程池
线程
对于操作系统来说是珍贵资源,创建和销毁线程也都会消耗资源,为了
(1)复用线程、节约资源
(2)提高响应速度
(3)提高线程的可管理性
所以采用线程池。
3. 实现一个线程池
public class MyThreadPool2 {
// 线程池中默认线程的个数为5
private static int WORK_NUM = 5;
// 队列默认任务个数为100
private static int TASK_COUNT = 100;
// 工作线程组
private WorkThread[] workThreads;
// 任务队列(阻塞队列),作为一个缓冲
private final BlockingQueue<Runnable> taskQueue;
private final int worker_num;//用户在构造这个池,希望的启动的线程数
// 创建具有默认线程个数的线程池
public MyThreadPool2() {
this(WORK_NUM, TASK_COUNT);
}
// 创建线程池,worker_num为线程池中工作线程的个数
public MyThreadPool2(int worker_num, int taskCount) {
if (worker_num <= 0) worker_num = WORK_NUM;
if (taskCount <= 0) taskCount = TASK_COUNT;
this.worker_num = worker_num;
taskQueue = new ArrayBlockingQueue<>(taskCount);
workThreads = new WorkThread[worker_num];
for(int i=0;i<worker_num;i++) {
workThreads[i] = new WorkThread();
workThreads[i].start();
}
// int count = Runtime.getRuntime().availableProcessors();
// Runtime.getRuntime().availableProcessors()*2
}
// 执行任务,其实只是把任务加入任务队列,什么时候执行有线程池管理器决定
public void execute(Runnable task) {
try {
taskQueue.put(task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 销毁线程池,该方法保证在所有任务都完成的情况下才销毁所有线程,否则等待任务完成才销毁
public void destroy() {
// 工作线程停止工作,且置为null
System.out.println("ready close pool.....");
for(int i=0;i<worker_num;i++) {
workThreads[i].stopWorker();
workThreads[i] = null;//help gc
}
taskQueue.clear();// 清空任务队列
}
// 覆盖toString方法,返回线程池信息:工作线程个数和已完成任务个数
@Override
public String toString() {
return "WorkThread number:" + worker_num
+ " wait task number:" + taskQueue.size();
}
/**
* 内部类,工作线程
*/
private class WorkThread extends Thread{
@Override
public void run(){
Runnable r = null;
try {
while (!isInterrupted()) {
r = taskQueue.take();
if(r!=null) {
System.out.println(getId()+" ready exec :"+r);
r.run();
}
r = null;//help gc;
}
} catch (Exception e) {
// TODO: handle exception
}
}
public void stopWorker() {
interrupt();
}
}
}
Android中的一种实现
(Android中有Executor接口,可以通过new ThreadPoolExecutor的方式新建线程池)
/**
* TODO 连接池 (连接对象)
* 网络连接拦截的时候,需要连接池:“减少服务器的压力,增强自身性能” ,才有了复用Socket的概念
*/
public class ConnectonPool {
private final static String TAG = ConnectonPool.class.getSimpleName();
// 仓库 池子 (双端队列)
private static Deque<HttpConnection> httpConnectionDeque = null;
private boolean cleanRunnableFlag; // 标记
// 定义 (以后很好的扩展) 最大允许的闲置时间 1分钟
private long keepAlive;
public ConnectonPool() {
this(1, TimeUnit.MINUTES);
httpConnectionDeque = new ArrayDeque<>();
}
public ConnectonPool(long keepAlive, TimeUnit timeUnit) {
keepAlive = timeUnit.toMillis(keepAlive);
}
/**
* 任务 -- 专门去检查 连接池里面的连接对象,清理连接池里面的连接对象
*/
private final Runnable cleanRunnable = new Runnable() {
@Override
public void run() {
while (true) {
// 下一次检查的时间
long nextCheckCleamTime = clean(System.currentTimeMillis());
if (-1 == nextCheckCleamTime) {
cleanRunnableFlag = false;
return; // while(true) 全部结束了
}
if (nextCheckCleamTime > 0) {
// 等待一段时间后,再去检查,是否需要清理
synchronized (ConnectonPool.this) {
try {
ConnectonPool.this.wait(nextCheckCleamTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
};
private long clean(long currentTimeMillis) {
// 定义最终最大的闲置时间 result
long idleRecordSave = -1;
synchronized (this) {
// 遍历 池子 容器
Iterator<HttpConnection> iterator = httpConnectionDeque.iterator();
while (iterator.hasNext()) {
HttpConnection httpConnection = iterator.next();
// TODO 我们添加一个连接对象,操作了(最大闲置时间)就会移除这个连接对象
// 计算出来的闲置时间
long idleTime = currentTimeMillis - httpConnection.hastUseTime;
if (idleTime > keepAlive) {
// 移除对象
iterator.remove();
// 关闭Socket
httpConnection.closeSocket();
continue; // 继续
}
// 得到最终的 最大闲置时间
if (idleRecordSave < idleTime) {
idleRecordSave = idleTime;
}
} // while end
// 当我们循环之后,代表idleRecordSave值最终计算出来了(最终闲置时间)
if (idleRecordSave >= 0) {
// keepAlive=60s 最终计算的时间idleRecordSave=30s = 30s
return (keepAlive - idleRecordSave);
}
}
// 没有计算好,连接池里面没有连接对象,直接返回-1,不做任何事情,马上结束所有任务
return idleRecordSave;
}
/**
* 有缓存的线程池
*/
private Executor threadPoolExecutor =
/**
* 参数一:0 核心线程数是 0
* 参数二:MAX_VALUE 线程池中最大的值
* 参数三:60 参数四:时分秒
* 参数五:队列 SynchronousQueue 内部就会用这个队列 存储内部管理
* 参数六:工厂 线程任务设置 ConnectionPoolRun 名字
*
* 执行的任务大于(核心线程数) 满足条件 ---> 启用(开启闲置时间60s)
* 如果60s没有过,还在闲置时间中 (复用之前的线程)
*/
new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "ConnectionPoolRun");
thread.setDaemon(true); // 设置为守护线程
return thread;
}
});
/**
* TODO 添加 (连接对象) ---> 连接池里面去
*/
public synchronized void putConnection(HttpConnection httpConnection) {
// 一旦put的时候,就要求检查,连接池里面,是否要去清理 todo(清理机制)
if (!cleanRunnableFlag) {
cleanRunnableFlag = true;
// 启动检查 清理的机制
threadPoolExecutor.execute(cleanRunnable);
}
httpConnectionDeque.add(httpConnection);
int size = httpConnectionDeque.size();
Log.d(TAG, "putConnection: size:" + size);
}
/**
* TODO get (连接对象)
*/
public HttpConnection getConnection(String host, int port) {
Iterator<HttpConnection> iterator = httpConnectionDeque.iterator();
while (iterator.hasNext()) {
HttpConnection httpConnection = iterator.next();
if (httpConnection.isConnectionAction(host, port)) {
// 移除(如果get 我们就把容器的 连接对象 移除掉)
iterator.remove();
// 代表我们找到了,可以复用的
return httpConnection;
}
}
return null;
}
}
/**
* TODO 连接对象,其实就是Socket的封装
*/
public class HttpConnection {
private final static String TAG = HttpConnection.class.getSimpleName();
Socket socket; // 套接字:重点:域名,端口号
long hastUseTime; // 连接对象的最后使用时间
/**
* @param host 域名
* @param port 端口号
*/
public HttpConnection(final String host, final int port) {
try {
socket = new Socket(host, port);
} catch (IOException e) {
e.printStackTrace();
}
}
// 连接对象,自身最清楚,上一次的Socket 和 当前Socket到底是否重复
// 决定是否要复用的条件
public boolean isConnectionAction(String host, int port) {
if (socket == null) {
Log.d(TAG, "isConnectionAction: socket is null");
return false;
}
Log.d(TAG, "isConnectionAction: this hostName:" + socket.getInetAddress().getHostName());
if (socket.getPort() == port && socket.getInetAddress().getHostName().equals(host)) {
return true;
}
return false;
}
// Socket耗时的,一定要释放
public void closeSocket() {
if (socket != null) {
try {
socket.close(); // 关闭的时候会有可能意外
} catch (IOException e) {
e.printStackTrace();
Log.d(TAG, "closeSocket exception:" + e.getMessage());
}
}
}
}
/**
* 连接拦截器 要去使用 复用池
*/
public class UseConnectionPool {
private final static String TAG = UseConnectionPool.class.getSimpleName();
public void useConnectoinPool(ConnectonPool connectonPool, String host, int port) {
HttpConnection httpConnection = connectonPool.getConnection(host, port);
if (httpConnection == null) {
httpConnection = new HttpConnection(host, port);
Log.d(TAG, "useConnectoinPool: 连接池里面没有连接对象,需要实例化一个连接对象....");
} else {
Log.d(TAG, "useConnectoinPool: 复用一个连接对象");
}
// 模拟请求
// .... 请求服务器
// ......
// 使用完成之后,一定要 更新时间,并且加入到 复用池
httpConnection.hastUseTime = System.currentTimeMillis(); // 更新时间
connectonPool.putConnection(httpConnection);
Log.d(TAG, "useConnectoinPool: 给服务器发送请求.....");
}
}
4. JDK中的线程池和工作机制
4.1 线程池的创建,及各个参数的含义
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
-
int corePoolSize 线程池中的核心线程数
-
int maximumPoolSize 线程池总任务数
-
long keepAliveTime 线程空闲时间,保留corePoolSize数量存活
-
TimeUnit unit 时间单位
-
BlockingQueue<Runnable> workQueue
队列:先进先出
存入阻塞:队列满、存入元素的动作会阻塞、等待队列有位置;
取出阻塞:队列空、取元素的动作会阻塞、等待队列有元素;
add
、remove
是一对:add满队列、remove空队列都会报错抛异常
offer
、poll
是一对:offer满队列会返回false、poll空队列都返回null
put
、take
是一对:会像上文所述一样阻塞 -
ThreadFactory threadFactory //起名用
-
RejectedExecutionHandler handler
-- DiscardOldestPolicy 丢弃阻塞队列最前面、最先加入的任务
-- AbortPolicy 默认的策略:报错抛出异常策略,
-- CallerRunsPolicy 谁提交任务 线程由谁来执行的策略,
-- DiscardPolicy 丢弃任务策略
首先由
corePoolSize
来限制创建线程数,
超出corePoolSize
数目后,加入workQueue
阻塞队列进行等待
如果再超出workQueue
的约定大小,就制继续启动线程,由maximumPoolSize
来控继续启动的线程的数目
如果继续添加,超出maximumPoolSize
数量,由handler拒绝策略
(饱和策略)来发挥作用,来决定剩下的任务怎么来做。
ArrayBlockingQueue
与LinkedBlockingQueue
的区别
- ArrayBlockingQueue需要声明大小
- LinkedBlockingQueue可以不指定大小,最大容量是
Integer.MAX_VALUE
- ArrayBlockingQueue的
put
take
用的是同一把锁- LinkedBlockingQueue的
put
take
用的是两把锁,因为是头部放,尾部取
4.2 提交任务
- Future<> submit(Runnable task) 会返回任务
- execute(Runnable command) 不会返回任务
4.3 关闭线程池
- shutdownNow()
5. 合理的配置线程池
-
CPU密集型
当前任务完全依赖于CPU,大量计算型任务
线程大小不要超过机器上面CPU(逻辑CPU)同时可以运行的线程数(Runtime.getRuntime().avaliableProcessors()),开多了没有意义 -
IO密集型
与IO操作有关,包括磁盘IO、网络IO
多一些线程数, 2 * 机器上的CPU个数(逻辑CPU) -
混合型
耗费时间差不多时,尽量进行拆分。如果相差特别大就算了。
悲观锁乐观锁
悲观锁:
总有刁民想害朕,总觉得有人要改数据,我一定要抢到锁才做业务工作,不抢到锁不干活。synchronized
和lock
都是悲观锁机制。
乐观锁:
1. get数据 oldValue
2. 写回数据 newValue
CAS(compare and swap)
(1)比较 数据==oldValue
(2)写回 oldValue = newValue
CAS会导致“ABA问题”。
线程1准备用CAS将变量的值由A替换为C,在此之前,线程2将变量的值由A替换为B,又由B替换为A,然后线程1执行CAS时发现变量的值仍然为A,所以CAS成功。但实际上这时的现场已经和最初不同了,尽管CAS成功,但可能存在潜藏的问题。
现有一个用单向链表实现的堆栈,栈顶为A,这时线程T1已经知道A.next为B,然后希望用CAS将栈顶替换为B;
在T1执行上面这条指令之前,线程T2介入,将A、B出栈,再pushD、C、A;
此时轮到线程T1执行CAS操作,检测发现栈顶仍为A,所以CAS成功,栈顶变为B,但实际上B.next为null,所以此时的情况变为:堆栈中只有B一个元素,C和D组成的链表不再存在于堆栈中,C、D丢掉了。
Java并发包为了解决这个问题,提供了一个带有标记的原子引用类“AtomicStampedReference”,它可以通过控制变量值的版本来保证CAS的正确性。每次在执行数据的修改操作时,都会带上一个版本号,一旦版本号和数据的版本号一致就可以执行修改操作并对版本号执行+1操作,否则就执行失败。
volatile
volatile
修饰易变的变量时,告诉虚拟机 这个变量很容易变化,虚拟机你要经常的从主内存中读取,修改之后要及时的写回主内存。
jdk提供的最轻量的同步机制。
public class VolatileCase {
private volatile static boolean ready;
private static int number;
private static class PrintThread extends Thread{
@Override
public void run() {
System.out.println("PrintThread is running.......");
while(!ready);
System.out.println("number = "+number);
}
}
public static void main(String[] args) {
new PrintThread().start();
SleepTools.second(1);
number = 51;
ready = true;
SleepTools.second(5);
System.out.println("main is ended!");
}
}
ready不加volatile时,打印结果为
PrintThread is running.....
main is ended!
PrintThread这个线程,没有感知到ready的变化
ready加volatile后,打印结果为
PrintThread is running.....
number = 51
main is ended!
打印了number值,可见volatile保证了数据的可见性。
但是!!volatile不会像synchronized一样保证操作的原子性,也就无法保证数据的正确。
public class NotSafe {
private volatile long count =0;
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
//count进行累加
public void incCount(){
count++;
}
//线程
private static class Count extends Thread{
private NotSafe simplOper;
public Count(NotSafe simplOper) {
this.simplOper = simplOper;
}
@Override
public void run() {
for(int i=0;i<10000;i++){
simplOper.incCount();
}
}
}
public static void main(String[] args) throws InterruptedException {
NotSafe simplOper = new NotSafe();
//启动两个线程
Count count1 = new Count(simplOper);
Count count2 = new Count(simplOper);
count1.start();
count2.start();
Thread.sleep(50);
System.out.println(simplOper.count);
}
}
不加volatile,结果是无法确知的
加了volatile后,结果依然是无法确知的,因为volatile不能保证count++这个赋值操作(数值修改操作)不受其他线程的影响。
- 所有线程只访问自己的工作内存,线程间不能跨工作内存访问。(为了利用CPU的缓存,CPU的寄存器等等,提高访问速度)
- 线程会把主内存中的数据读取到工作内存中,保存一个副本,操作完之后再写回主内存。
- 当线程发现工作内存中有该变量(count),可能就不从主内存中读取这个变量了,直接修改工作内存中的数据,导致数据错误。
- 加锁(synchronized)之后,同一时间只有一个线程在操作,每个线程看到的数据总是别的线程操作过的、同步的、最新的数据。
- volatile不同,它会保证count在工作内存中修改之前,一定会去主内存中读取,但是它无法保证count此时只有一个线程在操作、即无法保证操作原子性、即无法保证数据正确。
为什么还要提供volatile呢?
在业务能够保证单独一个线程操作数据的情况下,适用volatile。
网友评论