美文网首页
Java基础篇之ThreadPoolExecutor

Java基础篇之ThreadPoolExecutor

作者: 街角下的蚂蚁 | 来源:发表于2020-07-27 09:56 被阅读0次
  • 导语

使用线程池的好处是能够减少在创建和销毁线程上所花费的时间以及系统资源的开销,提升系统整体性能。那么线程池是怎么实现这一目标的呢?

线程池的创建

public ThreadPoolExecutor(
  int corePoolSize,
  int maximumPoolSize,
  long keepAliveTime,
  TimeUnit unit,
  BlockingQueue<Runnable> workQueue,    
  ThreadFactory threadFactory,
  RejectedExecutionHandler handler) {
    ...      
}
线程池的构造方法好几个,但最终都是调用的该构造方 法,下面我们分析下每个构造参数的具体含义:
  • corePoolSize 最小核心线程数,线程池保持的最少线程数
  • maximumPoolSize 最大核心线程数,线程池能创建的线程数上限(实际并非如此,需要根据具体的workQueue队列类型作区分)
  • keepAliveTime 线程的回收策略,允许线程最大的空闲时间,超过时间则线程被回收
  • unit 与keepAliveTime联合使用,代表超时时间的单位
  • workQueue 线程池的缓存任务队列,不同的队列,线程池的实现形式略有差异,具体可以查阅相关资料
  • threadFactory 线程工厂,就一个创建线程方法,可以设置线程名称,线程优先级等
  • handler 线程池拒绝策略,具体可以查询相关资料

线程池线程复用分析

1、将任务放入线程执行 ThreadPoolExecutor.java

public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();
  int c = ctl.get();
  // 判断当前线程数如果小于核心线程数,则新建线程
  if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
      return;
    c = ctl.get();
  }
  if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
      reject(command);
    // 如果当前线程为0,那么新建一个线程,放入线程池中
    else if (workerCountOf(recheck) == 0)
      addWorker(null, false);
  } else if (!addWorker(command, false))
    reject(command);
  }

2、其中重要的方法为addWorker(),看下它的实现

private boolean addWorker(Runnable firstTask, boolean core) {
        ...
        Worker w = null;
        try {
            // 新建一个Worker(实现了Runnable接口),将当前任务放入Worker中,启动Worker后,就会执行该任务
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        // 将刚刚创建的Worker缓存起来
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 启动刚刚创建的worker线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

3、在第2步的时候新建了一个Worker,并将Worker线程启动了。现在看看Worker中是怎么执行任务的,Worker.run()

public void run() {
  runWorker(this);
}

Worker.runWorker(this),实现线程复用的具体实现就在这里

final void runWorker(Worker w) {
        ...
        try {
            // 关键点,先判断当前的task是否为空,如果为空,则调用getTask(),从队列获取任务继续执行,如果队列一直存在任务,则一直执行,直到任务执行完成,循环退出,线程销毁,实现了一个线程处理多任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 调用具体执行业务逻辑的run()
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

4、核心线程数空闲的时候,是怎么保持可用状态的呢?关键在于getTask()

// 是否允许核心线程数超时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
...
try {
  // 如果核心线程数配置了超时失效,则调用队列的poll方法,如果核心线程数不失效,则调用队列的阻塞方法,实现线程阻塞永不回收。当队列中新加入任务时,立马获取任务执行
  Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
  if (r != null)
  return r;
  timedOut = true;
} catch (InterruptedException retry) {
  timedOut = false;
}

以上就是线程池实现线程复用的原理。

扫描关注.jpg

相关文章

网友评论

      本文标题:Java基础篇之ThreadPoolExecutor

      本文链接:https://www.haomeiwen.com/subject/mtnalktx.html