线程池的其实就是为了利用减少线程创建和销毁,因为创建线程需要分配1M的左右的空间,销毁线程都是需要消耗系统资源的,而且不控制线程的数量,一直创建,对于CPU不停的调度,切换上线文也是很耗时的,所以就有了线程池。
线程池会创建指定数量的线程,这些线程会不停的运行提交给线程池的任务,重复利用这些线程的资源。Java中的线程池实现类是ThreadPoolExecutor类。举个简单的例子。
static AtomicInteger threadId = new AtomicInteger(1);
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor threadPool = (ThreadPoolExecutor)Executors.newFixedThreadPool(5, new ThreadFactory() {
//自定义线程池创建工厂,为了实现线程编号的查看
public Thread newThread(Runnable r) {
return new Thread(r, "pool-thread-id-" + threadId.getAndIncrement());
}
});
for (int i = 0; i < 10; i++) {
threadPool.submit(new Runnable() {
public void run() {
//打印出执行这个任务的线程名称
System.out.println("执行线程名称" + Thread.currentThread().getName());
}
});
}
//关闭线程池
threadPool.shutdown();
}
利用建立线程的工具类Executors,创建固定大小为5的线程池对象,后续会讲解这个方法。newFixedThreadPool中,还传入了一个ThreadFactory对象,这个对象是为了创建线程池内的工作线程的工厂类,一般不传入,会使用默认Executors中的静态内部类DefaultThreadFactory,这里为了演示,特意实现了一下。
我们向线程池中提交了10个任务,每个任务是打印执行当前代码的线程名称,执行的结果如下:
执行结果
可以看出,线程池内的线程被重复使用了。这就是线程池为了重复利用线程资源,完成相应的任务。
上面使用代码介绍了线程池的使用,如果直接看jdk里面的线程池实现,由于实现源码还是比较复杂的,可以先来简单实现一个简单的线程池,掌握一下基本原理。
在实现线程池之前,可能有几个问题会想到。
1.如果保证工作线程一直运行
2.如果提交的任务超过了线程池设置的大小,存储到哪里了
public class MyThreadPool {
//任务添加到阻塞队列中
private BlockedQueue<Runnable> blockedQueue;
//核心线程大小
private int corePoolSize;
//当前核心线程个数
private AtomicInteger currentCorePoolSize = new AtomicInteger(0);
private volatile boolean poolRunning;
private HashSet<Worker> workers = new HashSet<Worker>();
private ReentrantLock lock = new ReentrantLock();
public MyThreadPool(int corePoolSize) {
this.corePoolSize = corePoolSize;
blockedQueue = new BlockedQueue<Runnable>();
poolRunning = true;
}
//提交任务的方法
public void submit(Runnable task) throws InterruptedException {
if (!poolRunning) {
throw new IllegalStateException("线程池非运行状态");
}
//1. 如果当前工作线程个数没有达到核心线程,增加工作线程
if (currentCorePoolSize.get() < corePoolSize) {
if(addWorker(task)){
currentCorePoolSize.incrementAndGet();
}
} else if (blockedQueue.add(task)) {//2. 将任务放到阻塞队列
System.out.println("任务添加到队列");
}
}
private Runnable getTask() throws InterruptedException {
return blockedQueue.remove();
}
private boolean addWorker(Runnable task) {
//创建工作线程,并运行提交的任务。
Worker worker = new Worker(task);
lock.lock();
try {
//添加到worker集合中
workers.add(worker);
//运行当前worker线程
worker.thread.start();
return true;
} finally {
lock.unlock();
}
}
public void shutdown() {
for (Iterator<Worker> workerIterator = workers.iterator(); workerIterator.hasNext(); ) {
Worker worker = workerIterator.next();
//关闭运行的工作线程
worker.setRunFlag(false);
if(!worker.thread.isInterrupted()){
//将运行的线程进行中断处理
worker.thread.interrupt();
}
}
poolRunning = false;
}
/**
* 真正执行任务的线程
*/
private class Worker implements Runnable {
//需要执行的任务
private Runnable task;
//线程运行标志
private boolean runFlag = true;
private Thread thread;
public void setRunFlag(boolean runFlag) {
this.runFlag = runFlag;
}
public Worker(Runnable task) {
this.task = task;
thread = new Thread(this);
}
public void run() {
try {
while (runFlag && (task != null || (task = getTask()) != null)) {
task.run();
//执行完成,让gc回收
task = null;
}
} catch (InterruptedException e) {
System.out.println("线程被中断");
}
}
}
}
现在来回答上面的几个问题
- 如何保证工作线程一直执行
利用了一个阻塞队列完成,任务不断的从阻塞队列中取出,然后执行,如果阻塞队列没有任务则当前线程进入等待队列。这要是阻塞队列的原理,其实就是消费者和生产者的模型。
2.任务存储在哪里
由上面可以知道,存储在阻塞队列中。
上面使用的阻塞队列是自己简单实现的,为了方便理解,代码如下:
public class BlockedQueue<T> {
public Object[] objects;
//当前数组元素个数
private int count;
//添加元素的下标位置
private int addIndex;
//删除元素的下标位置
private int removeIndex;
private Lock lock = new ReentrantLock();
//队列未满信号
private Condition notFull = lock.newCondition();
//队列不为空信号
private Condition notEmpty = lock.newCondition();
private final static int DEFAULT_SIZE = 1000;
public BlockedQueue() {
//初始化数组,大小为默认大小
objects = new Object[DEFAULT_SIZE];
}
public BlockedQueue(int size) {
//初始化数组
objects = new Object[size];
}
public boolean add(T t) throws InterruptedException {
lock.lock();
try {
while (count == objects.length) {
//队列满了,阻塞线程,等待notFull信号
System.out.println("begin wait for notFull signal");
notFull.await();
}
//说明队列有空位了
objects[addIndex++] = t;
//等于数组的大小
if (addIndex == objects.length) {
//循环添加队列元素
addIndex = 0;
}
count++;
//发送notEmpty信号
System.out.println("send notEmpty signal");
notEmpty.signal();
return true;
} finally {
lock.unlock();
}
}
public T remove() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
//空的队列等待数据加入
System.out.println("begin wait notEmpty signal");
notEmpty.await();
}
T t = (T) objects[removeIndex];
objects[removeIndex++] = null;
if (removeIndex == objects.length) {
removeIndex = 0;
}
--count;
System.out.println("send notFull signal");
notFull.signal();
return t;
} finally {
lock.unlock();
}
}
}
可以看到,这个简单的阻塞队列利用了ReentrantLock和Condition信号量完成。
参考《java并发编程艺术》
网友评论