写之前:
jdk自带的ThreadPoolExecutor,线程池种类比较多,功能比较丰富。比较完善。
自己写的,肯定就是简单,不完善。
而且,自己写的,没有maxSize最大线程数。处理逻辑和threadPoolExecutor不一样:executor一个任务,直接往堵塞队列里放,然后让coreSize个线程去轮询处理。core线程一段时间都没有任务,走死掉。
threadPoolExecutor是在核心线程都在工作,处理不过来的时候,才会把队伍往堵塞队列放,然后堵塞队列放不下,才会再开启maxSize的线程处理,maxSize的线程都满了还再提交任务,则使用指定的拒绝策略。而且,coreSize的线程是不会死的。
public class ThreadPool {
private BlockingQueue<Runnable> dutyBlockQueue;
private Integer coreSize;
private AtomicInteger workCount = new AtomicInteger();
private Long idleSecond;
private Reject reject;
public ThreadPool(BlockingQueue<Runnable> dutyBlockQueue, Integer coreSize, Long idleSecond, Reject reject){
this.dutyBlockQueue = dutyBlockQueue;
this.coreSize = coreSize;
this.idleSecond = idleSecond;
this.reject = reject;
}
public void execute(Runnable runnable) {
boolean inQueue = dutyBlockQueue.offer(runnable);
// 堵塞队列放不下了,则拒绝
if(!inQueue){
// 使用指定的拒绝策略
reject.reject();
return;
}
// 自旋
while (true){
int num = workCount.get();
if(num < coreSize){
// 在这里的时候,workCount很可能已经改变了,所以通过cas原子替换workCount,替换失败,则自旋,再走一次
boolean setSuccess = workCount.compareAndSet(num, num + 1);
if(setSuccess){
Work work = new Work(dutyBlockQueue, idleSecond);
work.start();
break;
}
System.out.println(num);
}else {
break;
}
}
}
// 真正工作的线程
class Work extends Thread{
private BlockingQueue<Runnable> dutyBlockQueue;
private Long idleSecnod;
public Work(BlockingQueue<Runnable> dutyBlockQueue, Long idleSecnod){
this.dutyBlockQueue = dutyBlockQueue;
this.idleSecnod = idleSecnod;
}
@Override
public void run() {
try {
while (true){
// 堵塞idleSecond还没有任务,则死掉这个线程
Runnable duty = dutyBlockQueue.poll(idleSecnod, TimeUnit.SECONDS);
if(duty == null){
break;
}
duty.run();
}
System.out.println("dea");
workCount.decrementAndGet();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
// 拒绝策略抽象类
static abstract class Reject{
public abstract void reject();
}
// 直接抛弃策略
static class Abandon extends Reject{
@Override
public void reject() {
}
}
// 抛错策略
static class ThrowException extends Reject{
@Override
public void reject() {
throw new RuntimeException("线程池满了");
}
}
}
测试
public class Test {
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(new LinkedBlockingQueue<>(100),
3, (long) 1, new ThreadPool.ThrowException());
AtomicInteger atomicInteger = new AtomicInteger();
for(int i = 0; i < 100; i++){
new Thread(){
@Override
public void run() {
for(int i = 0; i < 10; i++) {
threadPool.execute(new Thread(() -> {
int a = atomicInteger.getAndIncrement();
/* try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
System.out.println(Thread.currentThread().getName() + " " + a);
}));
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
}
}
}
输出:
1、这里的线程,线程名是默认的,而jdk的ThreadPoolExecutor的线程是threadFactory生产的,线程名是自定义连续的。
2、100个线程都execute一个任务,然后100个线程都休眠了10秒。在这个过程中,coreSize的线程3秒之后,就死了,之后重新开启新任务处理,如此循环。
Thread-111 1
Thread-115 2
Thread-114 0
Thread-115 4
Thread-111 3
...
dea
dea
dea
Thread-211 100
Thread-211 101
Thread-213 102
如此循环
网友评论