package com.example.concurrenttest.pool;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TestPool {
public static void main(String[] args) {
//带超时的线程池,超过指定时间没有任务会结束
ThreadPool pool = new ThreadPool(2, 1, TimeUnit.SECONDS, 5, (queue, task) -> {
//1 阻塞放入
// return queue.put(task);
//2 带超时放入
return queue.put(task, 1, TimeUnit.SECONDS);
//3 放弃丢弃
// System.out.println("丢弃任务" + task);
// return false;
//4 抛出异常
// throw new RuntimeException("队列已满!");
//5 调用者自己执行
// task.run();
// return true;
});
for (int i = 0; i < 10; i++) {
int j = i;
pool.execute(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(j);
});
}
}
}
/**
* 拒绝策略
* @param <T>
*/
@FunctionalInterface
interface RejectStrategy<T> {
boolean reject(BlockingQueue<T> queue, T task);
}
class ThreadPool {
private final BlockingQueue<Runnable> taskQueue;
private final Set<Worker> workers = new HashSet<>();
private final int coreSize;
private final long times;
private final TimeUnit timeUnit;
private final RejectStrategy<Runnable> rejectStrategy;
/**
*
* @param coreSize 线程数
* @param times 超时时间(超过times时间后没有任务结束线程,times=0则永不关闭)
* @param timeUnit 时间单位
* @param queueSize 排队任务最大数量
* @param rejectStrategy 超过队列容量时的拒绝策略
*/
public ThreadPool(int coreSize, long times, TimeUnit timeUnit, int queueSize,
RejectStrategy<Runnable> rejectStrategy) {
this.coreSize = coreSize;
this.times = times;
this.timeUnit = timeUnit;
this.rejectStrategy = rejectStrategy;
taskQueue = new BlockingQueue<>(queueSize);
}
public void execute(Runnable task) {
synchronized (workers) {
if (workers.size() < coreSize) {
Worker worker = new Worker(task);
System.out.println("create worker:" + worker + "," + task);
workers.add(worker);
worker.start();
} else {
System.out.println("try put to wait queue:" + task);
try {
boolean result = taskQueue.tryPut(task, rejectStrategy);
if (!result) {
System.out.println("任务"+task+"被拒绝");
}
} catch (RuntimeException e) {
System.out.println("任务"+task+"被拒绝");
}
}
}
}
/**
* 线程包装类
*/
class Worker extends Thread {
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
if (times == 0) {
runTask();
} else {
runTimeoutTask();
}
}
private void runTask() {
while (true) {
System.out.println("begin task:" + task);
task.run();
task = taskQueue.take();
}
}
private void runTimeoutTask() {
Optional<Runnable> nextTask = Optional.of(task);
while (nextTask.isPresent()) {
task = nextTask.get();
System.out.println("begin task:" + task);
task.run();
nextTask = taskQueue.take(times, timeUnit);
}
synchronized (workers) {
System.out.println("remove worker:" + this);
workers.remove(this);
}
}
}
}
/**
* 任务队列
* @param <T>
*/
class BlockingQueue<T> {
//1.任务队列
private final Deque<T> queue = new ArrayDeque<>();
//2.lock
private final ReentrantLock lock = new ReentrantLock();
//3.消费者条件变量
private final Condition fullWaitSet = lock.newCondition();
//4.生产者条件变量
private final Condition emptyWaitSet = lock.newCondition();
//5.容量
private final int capcity;
public BlockingQueue(int capcity) {
this.capcity = capcity;
}
//阻塞获取元素
public T take() {
lock.lock();
try {
while (isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}
//带超时的获取元素
public Optional<T> take(long timeout, TimeUnit unit) {
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while (isEmpty()) {
try {
if (nanos <= 0) {
return Optional.empty();
}
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return Optional.of(t);
} finally {
lock.unlock();
}
}
//放入元素
public boolean put(T element) {
lock.lock();
try {
while (size() == capcity) {
try {
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(element);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
//带超时的放入元素
public boolean put(T t, long timeout, TimeUnit unit) {
lock.lock();
try {
long nanos = unit.toNanos(timeout);
while (isFull()) {
try {
if (nanos <= 0) {
return false;
}
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(t);
emptyWaitSet.signal();
return true;
} finally {
lock.unlock();
}
}
//尝试放入元素,如果未满则直接放入,否则
public boolean tryPut(T task, RejectStrategy<T> rejectStrategy) {
lock.lock();
try {
if (isFull()) {
return rejectStrategy.reject(this, task);
} else {
queue.addLast(task);
emptyWaitSet.signal();
return true;
}
} finally {
lock.unlock();
}
}
public int size() {
return queue.size();
}
public boolean isEmpty() {
return queue.size() == 0;
}
public boolean isFull() {
return queue.size() == capcity;
}
}
网友评论