1.线程池的作用
降低资源消耗的,复用线程,减少对象创建及销毁的资源;
提高线程的管理,统一分配,调优和监控;
提高响应速度,任务到达可以复用原先的线程,不需要等到线程创建才能执行;
功能的扩展,提供定时执行,单线程及并发数的控制;
2.初入jdk线程池核心类
image.pngMyExecutor
/**
* @author summit
* @since 2020/12/5 10:44
*
* @see java.util.concurrent.Executor
* @see ExecutorService
*
* @see java.util.concurrent.AbstractExecutorService
*/
public interface MyExecutor {
/**
* 接收执行任务
*
* @param command 任务
*/
void execute(Runnable command);
}
MyExecutorService
public interface MyExecutorService extends MyExecutor {
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
void shutdown();
List<Runnable> shutdownNow();
}
MyAbstractExecutorService
public abstract class MyAbstractExecutorService implements MyExecutorService {
@Override
public <T> Future<T> submit(Callable<T> task) {
FutureTask<T> futureTask = new FutureTask<>(task);
execute(futureTask);
return futureTask;
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
FutureTask<T> futureTask = new FutureTask<>(task, null);
execute(futureTask);
return futureTask;
}
@Override
public void shutdown() {
}
@Override
public List<Runnable> shutdownNow() {
return null;
}
}
MytThreadPoolExecutor
/**
* @author summit
* @see java.util.concurrent.ThreadPoolExecutor
* @since 2020/12/5 10:45
*/
public class MytThreadPoolExecutor extends MyAbstractExecutorService {
private volatile int corePoolSize;
private volatile int maxNumPoolSize;
private final AtomicInteger ctl = new AtomicInteger(0);
private LinkedBlockingQueue<Runnable> workQueue;
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
public MytThreadPoolExecutor(int corePoolSize, int maxNumPoolSize,
LinkedBlockingQueue<Runnable> blockingQueue) {
this.corePoolSize = corePoolSize;
this.maxNumPoolSize = maxNumPoolSize;
this.workQueue = blockingQueue;
}
public MytThreadPoolExecutor(int corePoolSize, int maxNumPoolSize,
LinkedBlockingQueue<Runnable> workQueue, long keepAliveTime,
boolean allowCoreThreadTimeOut) {
this.corePoolSize = corePoolSize;
this.maxNumPoolSize = maxNumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = keepAliveTime;
if (keepAliveTime > 0) {
allowCoreThreadTimeOut = true;
}
this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
}
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException();
}
int c = ctl.get();
if (c < corePoolSize) {
addWorker(task, true);
} else if (workQueue.offer(task)) {
//放入等待队列
addWorker(null, false);
} else {
// 拒绝策略
reject(task);
}
}
static class RejectExecutedHandler {
public void reject(Runnable runnable) {
throw new RejectedExecutionException("任务处理不了:" + runnable);
}
}
private void reject(Runnable task) {
new RejectExecutedHandler().reject(task);
}
private void addWorker(Runnable task, Boolean coreFlag) {
if (coreFlag) {
ctl.incrementAndGet();
}
Worker worker = new Worker(task);
worker.getThread().start();
}
@EqualsAndHashCode(callSuper = true)
@Data
class Worker extends ReentrantLock implements Runnable {
private Runnable firstTask;
private Thread thread;
public Worker(Runnable firstTask) {
this.firstTask = firstTask;
thread = new Thread(this);
}
@Override
public void run() {
runWorker(this);
}
private void runWorker(Worker w) {
try {
w.lock();
Runnable task = w.firstTask;
if (task != null || ((task = getTask()) != null)) {
task.run();
}
} finally {
processWorkerExit(w);
w.unlock();
}
}
private void processWorkerExit(Worker w) {
addWorker(null, false);
}
private Runnable getTask() {
try {
if (workQueue.isEmpty()) {
return null;
}
return allowCoreThreadTimeOut ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
: workQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
}
测试
public class MainTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
myThreadPool();
}
public static void myThreadPool() throws ExecutionException, InterruptedException {
MytThreadPoolExecutor executor = new MytThreadPoolExecutor(0,1,
new LinkedBlockingQueue<>(100));
// for (int i = 0; i < 10 ; i++) {
// executor.execute(new Runnable() {
// @Override
// public void run() {
// System.out.println("myThreadPool start=======");
// }
// });
// }
for (int i = 0; i < 10 ; i++) {
Future<Object> f = executor.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
System.out.println("callable start========");
return "callable";
}
});
System.out.println("main : ="+f.get());
}
}
public static void jdkThreadPool() {
// ThreadPoolExecutor
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(new Runnable() {
@Override
public void run() {
System.out.println("start work by thread pool");
}
});
}
}
输出结果
callable start
main : =callable
callable start
main : =callable
callable start
main : =callable
callable start
main : =callable
callable start
main : =callable
callable start
main : =callable
callable start
main : =callable
callable start
main : =callable
callable start
main : =callable
callable start
main : =callable
网友评论