美文网首页
spring ThreadPoolTaskExecutor和jd

spring ThreadPoolTaskExecutor和jd

作者: 柳岸花开 | 来源:发表于2017-03-20 09:35 被阅读2027次

    本文主要解析 org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor 和  Java.util.concurrent.ThreadPoolExecutor 的执行流程

     自己在之前写多线程代码的时候 都是这么玩的

    executor=Executors.newCachedThreadPool();

    但是有一次 在大量数据的时候 由于入库速度远大于出库速度 导致内存急剧膨胀 最后悲剧了 重写代码 。。。 以此为戒

    后面  看到同事 都这么玩  研究了下源码 发现真的不错啊 记录下 分析的源码

    通常在spring的配置文件中使用

    <bean id="taskExecutor"
    		class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    		<property name="corePoolSize" value="20" />
    		<property name="keepAliveSeconds" value="200" />
    		<property name="maxPoolSize" value="45" />
    		<property name="queueCapacity" value="100" />
    	</bean>

    然后在其他的bean中进行属性注入

    <property name="executor" ref="taskExecutor" />

    四个属性在org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor里面都是传递给了java.util.concurrent.ThreadPoolExecutor 所以基本上线程的创建、任务的提交、任务的执行、线程的销毁都是ThreadPoolExecutor来做的

    protected ExecutorService initializeExecutor(
    			ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
    
    		BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
    		ThreadPoolExecutor executor  = new ThreadPoolExecutor(
    				this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
    				queue, threadFactory, rejectedExecutionHandler);
    		if (this.allowCoreThreadTimeOut) {
    			executor.allowCoreThreadTimeOut(true);
    		}
    
    		this.threadPoolExecutor = executor;
    		return executor;
    	}

    </pre><p></p><p>稍微花点时间 看下 如何调用 initializeExecutor方法</p><p>在其ThreadPoolTaskExecutor继承类 org.springframework.scheduling.concurrent.ExecutorConfigurationSupport 中</p><p></p><pre name="code" class="java">/**
    	 * Calls <code>initialize()</code> after the container applied all property values.
    	 * @see #initialize()
    	 */
    	public void afterPropertiesSet() {
    		initialize();
    	}
    
    	/**
    	 * Set up the ExecutorService.
    	 */
    	public void initialize() {
    		if (logger.isInfoEnabled()) {
    			logger.info("Initializing ExecutorService " + (this.beanName != null ? " '" + this.beanName + "'" : ""));
    		}
    		if (!this.threadNamePrefixSet && this.beanName != null) {
    			setThreadNamePrefix(this.beanName + "-");
    		}
    		this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
    	}

    可以看到  先调用 afterPropertiesSet 然后在 initialize调用 initializeExecutor

    而afterPropertiesSet方法  被调用  源于 org.springframework.beans.factory.InitializingBean 接口 其作用类似于 init-method

    /**
     * Interface to be implemented by beans that need to react once all their
     * properties have been set by a BeanFactory: for example, to perform custom
     * initialization, or merely to check that all mandatory properties have been set.
     * 
     * <p>An alternative to implementing InitializingBean is specifying a custom
     * init-method, for example in an XML bean definition.

    上面看完 ThreadPoolTaskExecutor 的初始化流程 再来看看重点的ThreadPoolExecutor 是如何玩转四个参数的  

    /**
         * Creates a new {@code ThreadPoolExecutor} with the given initial
         * parameters.
         *
         * @param corePoolSize the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         * @param maximumPoolSize the maximum number of threads to allow in the
         *        pool
         * @param keepAliveTime when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         * @param unit the time unit for the {@code keepAliveTime} argument
         * @param workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.
         * @param threadFactory the factory to use when the executor
         *        creates a new thread
         * @param handler the handler to use when execution is blocked
         *        because the thread bounds and queue capacities are reached
         * @throws IllegalArgumentException if one of the following holds:<br>
         *         {@code corePoolSize < 0}<br>
         *         {@code keepAliveTime < 0}<br>
         *         {@code maximumPoolSize <= 0}<br>
         *         {@code maximumPoolSize < corePoolSize}
         * @throws NullPointerException if {@code workQueue}
         *         or {@code threadFactory} or {@code handler} is null
         */

    上面的英文 很好的解释了 我稍微提下  当然这里暂时不考虑 allowCoreThreadTimeOut (此参数可影响是否阻塞等待任务)参数的影响

    corePoolSize--线程组保留的最小线程数,如果线程组中的线程数少于此数目,则创建

    maximumPoolSize--线程组中最大线程数

    keepAliveTime--线程组中线程最大不活动时间,则清除此线程

    workQueue----队列中,用于存放提交的任务

    threadFactory--用来创建新线程的 例如

    this.thread = getThreadFactory().newThread(this);

    RejectedExecutionHandler----当线程数达到最大,队列中也已存满,再来任务,该如何处理

    看完这几个参数 还有一个特别重要的参数    workers--用于保持线程组中运行的线程

    /**
         * Set containing all worker threads in pool. Accessed only when
         * holding mainLock.
         */
        private final HashSet<Worker> workers = new HashSet<Worker>();

    而存放的java.util.concurrent.ThreadPoolExecutor.Worker 这个类  是核心、精华所在

    在Worker中 主要是有这么两段代码需要注意 

     /**
             * Creates with given first task and thread from ThreadFactory.
             * @param firstTask the first task (null if none)
             */
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    
    看到在创建Worker的时候 使用ThreadFactory创建线程 
     /** Delegates main run loop to outer runWorker  */
            public void run() {
                runWorker(this);
            }

    在外围启动Worker线程

    再来看看 runWorker

     final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }

    最核心的 

     while (task != null || (task = getTask()) != null) 

    getTask里面 就是通过

     Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();

    不断的从workQueue中取出任务进行执行

    在这里就看到了 从workQueue中取出任务 是超过keepAliveTime就不等待了 返回NULL,还是一直阻塞等待,取决于timed,而此参数取决于

     timed = allowCoreThreadTimeOut || wc > corePoolSize

    这里就用到了 上面提到过的 allowCoreThreadTimeOut 参数 当设置此参数为true的时候 那么就不会一直阻塞等待 等待超时时间即可返回NULL 

    当然当workers中的线程数超过corePoolSize的时候  新建的worker也不会一直一直阻塞等待

    那上面就是在worker中 调用runWorker去执行任务 那是谁调用它呢  任务是如何添加到workQueue中的呢???  接着看

    来看ThreadPoolExecutor 的execute方法

     /**
         * Executes the given task sometime in the future.  The task
         * may execute in a new thread or in an existing pooled thread.
         *
         * If the task cannot be submitted for execution, either because this
         * executor has been shutdown or because its capacity has been reached,
         * the task is handled by the current {@code RejectedExecutionHandler}.
         *
         * @param command the task to execute
         * @throws RejectedExecutionException at discretion of
         *         {@code RejectedExecutionHandler}, if the task
         *         cannot be accepted for execution
         * @throws NullPointerException if {@code command} is null
         */
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            /*
             * Proceed in 3 steps:
             *
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             *
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             *
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
             */
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }

    看到上面的英文描述 加上代码 就应该差不多了 还是稍微提下吧

    当每次任务添加进来的时候 分三种情况进行处理

    1、当运行的线程数小于corePoolSize的时候 ,创建新的线程即Worker执行提交的任务

    2、如果线程数大于等于corePoolSize的时候,将任务提交到workQueue队列中 ,如果成功添加 ,即在上面的runWorker就会执行调用了,当然这里会重新的核查此时的线程数,看下是否有线程减少,如果减少,则创建新的线程来使线程数维持在corePoolSize的数目

    3、如果队列满了后,则创建新的线程来执行,当然这里有一种极端情况,当线程数等于maximumPoolSize时,并且workQueue也满了后,则会使用

     /**
         * Invokes the rejected execution handler for the given command.
         * Package-protected for use by ScheduledThreadPoolExecutor.
         */
        final void reject(Runnable command) {
            handler.rejectedExecution(command, this);
        }

    使用handler来进行处理 而此handler的定义来源于

    private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy
     /**
             * Always throws RejectedExecutionException.
             *
             * @param r the runnable task requested to be executed
             * @param e the executor attempting to execute this task
             * @throws RejectedExecutionException always.
             */
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }

    查看上面代码 会发现 当最终线程数等于maximumPoolSize时,并且workQueue也满了后,当再有任务进来后,则会抛出异常,任务不执行

    上面一步看到了任务添加到worker中或者workQueue中 ,启动执行worker线程的来自于

     /**
         * Checks if a new worker can be added with respect to current
         * pool state and the given bound (either core or maximum). If so,
         * the worker count is adjusted accordingly, and, if possible, a
         * new worker is created and started, running firstTask as its
         * first task. This method returns false if the pool is stopped or
         * eligible to shut down. It also returns false if the thread
         * factory fails to create a thread when asked.  If the thread
         * creation fails, either due to the thread factory returning
         * null, or due to an exception (typically OutOfMemoryError in
         * Thread#start), we roll back cleanly.
         *
         * @param firstTask the task the new thread should run first (or
         * null if none). Workers are created with an initial first task
         * (in method execute()) to bypass queuing when there are fewer
         * than corePoolSize threads (in which case we always start one),
         * or when the queue is full (in which case we must bypass queue).
         * Initially idle threads are usually created via
         * prestartCoreThread or to replace other dying workers.
         *
         * @param core if true use corePoolSize as bound, else
         * maximumPoolSize. (A boolean indicator is used here rather than a
         * value to ensure reads of fresh values after checking other pool
         * state).
         * @return true if successful
         */
        private boolean addWorker(Runnable firstTask, boolean core) {

    此方法中有一段

      if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }

    即启动worker线程,即运行worker的run方法 ,调用runWorker方法,执行worker中的或者workQueue中的任务

    好了 上面就是java.util.concurrent.ThreadPoolExecutor的完整流程 再大致的梳理下

    通过execute添加任务,通过addWorker启动线程,启动线程后,调用runWorker来执行

    相关文章

      网友评论

          本文标题:spring ThreadPoolTaskExecutor和jd

          本文链接:https://www.haomeiwen.com/subject/rsyjnttx.html