本文主要解析 org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor 和 Java.util.concurrent.ThreadPoolExecutor 的执行流程
自己在之前写多线程代码的时候 都是这么玩的
executor=Executors.newCachedThreadPool();
但是有一次 在大量数据的时候 由于入库速度远大于出库速度 导致内存急剧膨胀 最后悲剧了 重写代码 。。。 以此为戒
后面 看到同事 都这么玩 研究了下源码 发现真的不错啊 记录下 分析的源码
通常在spring的配置文件中使用
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="20" /> <property name="keepAliveSeconds" value="200" /> <property name="maxPoolSize" value="45" /> <property name="queueCapacity" value="100" /> </bean>
然后在其他的bean中进行属性注入
<property name="executor" ref="taskExecutor" />
四个属性在org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor里面都是传递给了java.util.concurrent.ThreadPoolExecutor 所以基本上线程的创建、任务的提交、任务的执行、线程的销毁都是ThreadPoolExecutor来做的
protected ExecutorService initializeExecutor( ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { BlockingQueue<Runnable> queue = createQueue(this.queueCapacity); ThreadPoolExecutor executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); if (this.allowCoreThreadTimeOut) { executor.allowCoreThreadTimeOut(true); } this.threadPoolExecutor = executor; return executor; }
</pre><p></p><p>稍微花点时间 看下 如何调用 initializeExecutor方法</p><p>在其ThreadPoolTaskExecutor继承类 org.springframework.scheduling.concurrent.ExecutorConfigurationSupport 中</p><p></p><pre name="code" class="java">/** * Calls <code>initialize()</code> after the container applied all property values. * @see #initialize() */ public void afterPropertiesSet() { initialize(); } /** * Set up the ExecutorService. */ public void initialize() { if (logger.isInfoEnabled()) { logger.info("Initializing ExecutorService " + (this.beanName != null ? " '" + this.beanName + "'" : "")); } if (!this.threadNamePrefixSet && this.beanName != null) { setThreadNamePrefix(this.beanName + "-"); } this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler); }
可以看到 先调用 afterPropertiesSet 然后在 initialize调用 initializeExecutor
而afterPropertiesSet方法 被调用 源于 org.springframework.beans.factory.InitializingBean 接口 其作用类似于 init-method
/** * Interface to be implemented by beans that need to react once all their * properties have been set by a BeanFactory: for example, to perform custom * initialization, or merely to check that all mandatory properties have been set. * * <p>An alternative to implementing InitializingBean is specifying a custom * init-method, for example in an XML bean definition.
上面看完 ThreadPoolTaskExecutor 的初始化流程 再来看看重点的ThreadPoolExecutor 是如何玩转四个参数的
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */
上面的英文 很好的解释了 我稍微提下 当然这里暂时不考虑 allowCoreThreadTimeOut (此参数可影响是否阻塞等待任务)参数的影响
corePoolSize--线程组保留的最小线程数,如果线程组中的线程数少于此数目,则创建
maximumPoolSize--线程组中最大线程数
keepAliveTime--线程组中线程最大不活动时间,则清除此线程
workQueue----队列中,用于存放提交的任务
threadFactory--用来创建新线程的 例如
this.thread = getThreadFactory().newThread(this);
RejectedExecutionHandler----当线程数达到最大,队列中也已存满,再来任务,该如何处理
看完这几个参数 还有一个特别重要的参数 workers--用于保持线程组中运行的线程
/** * Set containing all worker threads in pool. Accessed only when * holding mainLock. */ private final HashSet<Worker> workers = new HashSet<Worker>();
而存放的java.util.concurrent.ThreadPoolExecutor.Worker 这个类 是核心、精华所在
在Worker中 主要是有这么两段代码需要注意
/** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }看到在创建Worker的时候 使用ThreadFactory创建线程
/** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); }
在外围启动Worker线程
再来看看 runWorker
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }
最核心的
while (task != null || (task = getTask()) != null)
getTask里面 就是通过
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
不断的从workQueue中取出任务进行执行
在这里就看到了 从workQueue中取出任务 是超过keepAliveTime就不等待了 返回NULL,还是一直阻塞等待,取决于timed,而此参数取决于
timed = allowCoreThreadTimeOut || wc > corePoolSize
这里就用到了 上面提到过的 allowCoreThreadTimeOut 参数 当设置此参数为true的时候 那么就不会一直阻塞等待 等待超时时间即可返回NULL
当然当workers中的线程数超过corePoolSize的时候 新建的worker也不会一直一直阻塞等待
那上面就是在worker中 调用runWorker去执行任务 那是谁调用它呢 任务是如何添加到workQueue中的呢??? 接着看
来看ThreadPoolExecutor 的execute方法
/** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
看到上面的英文描述 加上代码 就应该差不多了 还是稍微提下吧
当每次任务添加进来的时候 分三种情况进行处理
1、当运行的线程数小于corePoolSize的时候 ,创建新的线程即Worker执行提交的任务
2、如果线程数大于等于corePoolSize的时候,将任务提交到workQueue队列中 ,如果成功添加 ,即在上面的runWorker就会执行调用了,当然这里会重新的核查此时的线程数,看下是否有线程减少,如果减少,则创建新的线程来使线程数维持在corePoolSize的数目
3、如果队列满了后,则创建新的线程来执行,当然这里有一种极端情况,当线程数等于maximumPoolSize时,并且workQueue也满了后,则会使用
/** * Invokes the rejected execution handler for the given command. * Package-protected for use by ScheduledThreadPoolExecutor. */ final void reject(Runnable command) { handler.rejectedExecution(command, this); }
使用handler来进行处理 而此handler的定义来源于
private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy
/** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always. */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }
查看上面代码 会发现 当最终线程数等于maximumPoolSize时,并且workQueue也满了后,当再有任务进来后,则会抛出异常,任务不执行
上面一步看到了任务添加到worker中或者workQueue中 ,启动执行worker线程的来自于
/** * Checks if a new worker can be added with respect to current * pool state and the given bound (either core or maximum). If so, * the worker count is adjusted accordingly, and, if possible, a * new worker is created and started, running firstTask as its * first task. This method returns false if the pool is stopped or * eligible to shut down. It also returns false if the thread * factory fails to create a thread when asked. If the thread * creation fails, either due to the thread factory returning * null, or due to an exception (typically OutOfMemoryError in * Thread#start), we roll back cleanly. * * @param firstTask the task the new thread should run first (or * null if none). Workers are created with an initial first task * (in method execute()) to bypass queuing when there are fewer * than corePoolSize threads (in which case we always start one), * or when the queue is full (in which case we must bypass queue). * Initially idle threads are usually created via * prestartCoreThread or to replace other dying workers. * * @param core if true use corePoolSize as bound, else * maximumPoolSize. (A boolean indicator is used here rather than a * value to ensure reads of fresh values after checking other pool * state). * @return true if successful */ private boolean addWorker(Runnable firstTask, boolean core) {
此方法中有一段
if (workerAdded) { t.start(); workerStarted = true; }
即启动worker线程,即运行worker的run方法 ,调用runWorker方法,执行worker中的或者workQueue中的任务
好了 上面就是java.util.concurrent.ThreadPoolExecutor的完整流程 再大致的梳理下
通过execute添加任务,通过addWorker启动线程,启动线程后,调用runWorker来执行
网友评论