在java
中我们常用到各种池,比如线程池、数据库连接池等,各种池其目的之一就是为了提高资源的利用率。很多时候初学者都是直接使用java
提供的api
,这样很方便。为了更好地使用提供的api
,还是需要更深入的了解它的原理。下面代码清单是一个简单的自己实现的线程池
public interface ThreadPool {
void execute(Runnable runnable);
void shutdown();
void addWorkers(int num);
void removeWorker(int num);
int getJobSize();
}
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicLong;
public class MyPoolThread implements ThreadPool {
private static final int MAX_WORKER_NUMBERS = 30;
private static final int MIN_WORKER_NUMBERS = 1;
private static final int DEFAULT_WORKER_NUMBERS = 10;
private final LinkedList<Worker> workers = new LinkedList<>();
private final LinkedList<Runnable> jobs = new LinkedList<>();
private AtomicLong nextId = new AtomicLong(0);
private int workerNum;
public MyPoolThread(int num) {
int workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS :
num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num;
initializeWorkers(workerNum);
}
public MyPoolThread() {
this(DEFAULT_WORKER_NUMBERS);
}
@Override
public void execute(Runnable runnable) {
synchronized (jobs) {
jobs.add(runnable);
jobs.notify();
}
}
@Override
public void shutdown() {
synchronized (workers) {
removeWorker(workerNum);
}
}
@Override
public void addWorkers(int num) {
synchronized (workers) {
if (num > 0 && num + workers.size() <= MAX_WORKER_NUMBERS) {
initializeWorkers(num);
} else if (num <= 0) {
throw new IllegalArgumentException("num = " + num + "不能小于 1");
} else {
throw new IllegalArgumentException("工作者数量已到达最大值");
}
}
}
private void initializeWorkers(int num) {
for (int i = 0; i < num; i++) {
Worker worker = new Worker();
workers.add(worker);
Thread thread = new Thread(worker, "ThreadPool-Worker-" + nextId.getAndAdd(1));
thread.start();
}
workerNum += num;
}
@Override
public void removeWorker(int num) {
synchronized (workers) {
checkNum(num, 1, workerNum);
for (int i = 0; i < num; i++) {
Worker worker = workers.removeLast();
worker.cancel();
worker.thread.interrupt();
worker = null;
}
workerNum -= num;
}
}
private void checkNum(int num, int min, int max) {
if (num > max || num < min) {
throw new IllegalArgumentException("num = " +
num + "应该大于等于" + min + "并且小于等于" + max);
}
}
@Override
public int getJobSize() {
return jobs.size();
}
class Worker implements Runnable {
private boolean running = true;
private Thread thread;
boolean cancel() {
if (!running) {
throw new RuntimeException("worker is cancel");
}
running = false;
return true;
}
@Override
public void run() {
thread = Thread.currentThread();
while (running) {
Runnable job;
synchronized (jobs) {
if (jobs.size() == 0) {
try {
jobs.wait();
} catch (InterruptedException e) {
setEx(thread, e);
}
}
job = jobs.remove();
}
try {
executeJob(job);
} catch (Throwable e) {
setEx(thread, e);
}
}
}
private void executeJob(Runnable job) {
job.run();
}
}
private void setEx(Thread thread, Throwable e) {
// 异常处理
}
}
下面是主方法
public static void main(String[] args) {
MyPoolThread myPoolThread = new MyPoolThread();
for (int i = 0; i < 50; i++) {
int j = i;
myPoolThread.execute(() -> {
System.out.println(j+":我是消息");
});
}
myPoolThread.shutdown();
}
可以看到这里线程池还是有许多的问题了,不过作为了解其基本原理,已经够了。这个线程池在初始化的时候就初始化了一定数目的线程。并且启动了它们,通过synchronized
锁的notify/wait
机制实现一旦有任务进来,就会有工作线程去执行。
网友评论