美文网首页
OKHTTP基础篇之线程池ThreadPoolExecutor(

OKHTTP基础篇之线程池ThreadPoolExecutor(

作者: CircleLee | 来源:发表于2018-12-25 17:59 被阅读50次

    在开发过程中,当我们需要使用线程的时候就常常会去new一个Thread,但是这样写会有什么问题呢?
    假如并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。
    那么有没有一种办法使执行完一个任务,并不被销毁,而是可以继续执行其他的任务呢?

    那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?
    JAVA的线程池可以解决此问题,线程池为线程生命周期的开销和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。

    使用线程池的好处

    • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
    • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
    • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

    线程池状态

    @ReachabilitySensitive
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    

    RUNNING: 可以接收新任务,和处理阻塞队列任务;
    SHUTDOWN: 不接收新任务,但是可以处理阻塞队列任务;
    STOP: 既不接收新任务,也不处理阻塞队列任务,直接终止运行中的任务;
    TIDYING: 所有任务都已经终止,有效线程数workcout为0,线程池进入TIDYING状态后会调用 terminated() 方法进入TERMINATED 状态;
    TERMINATED:terminated()方法执行完成。

    图1 线程池的状态转换过程

    ThreadPoolExecutor构造方法

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    

    corePoolSize:核心线程池大小
    maximumPoolSize:线程池最大容量大小
    keepAliveTime:线程池空闲时,线程存活时间
    unit: keepAliveTime的时间单位
    workQueue:等待队列,当任务提交时,如果线程池中的线程数量大于等于corePoolSize的时候,把该任务封装成一个Worker对象放入等待队列;
    threadFactory:线程工厂
    handler:它是RejectedExecutionHandler类型的变量,表示线程池的饱和策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。线程池提供了4种策略:

    • AbortPolicy:直接抛出异常,这是默认策略;
    • CallerRunsPolicy:用调用者所在的线程来执行任务;
    • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    • DiscardPolicy:直接丢弃任务。

    execute方法

    ThreadPoolExecutor被初始化后,通过execute方法提交线程任务

      public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {    //1
            if (addWorker(command, true))       
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {     //2
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))    //3
                reject(command);
            else if (workerCountOf(recheck) == 0)      //4
                addWorker(null, false);
        }
        else if (!addWorker(command, false))     //5
            reject(command);
      }
    

    处理步骤:

    1. 判断当前的活动线程数是否小于核心线程大小。如果小于,则新建一个线程放入线程池中,并启动该任务;
    2. 如果当前活动的线程数不小于核心线程池大小,判断当前线程池是否是RUNNING状态。如果是, 则将任务添加到阻塞队列workQueue中;
    3. 重新获取当前线程,并判断是否处于RUNNING状态,如果不是RUNINING状态,则从阻塞队列中删除该任务,并通过handler使用拒绝策略对该任务进行处理,整个方法返回;
    4. 如果条件3判断失败 ,判断当前线程数是否为0。如果等于0,则执行addWorker方法。需要注意的是,这里的addWorker方法第一参数为null,表示在线程池中创建一个线程,但不去启动;第二个参数为false,表示将线程池的有限线程数量的上限设置为maximumPoolSize;
    5. 回到步骤2的判断,步骤2判断失败,调用addWorker方法,如果add失败则直接拒绝任务。

    addWorker方法

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
    
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))   //1 core为true,比较核心线程池大小;false,比较最大线程池大小
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
    
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();   //2 启动线程
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    
    1. 当传入core为false时,检测当前线程数大小是否大于最大线程池大小,如果大于直接返回false。
    2. 启动线程。

    总结执行过程:


    图2 ThreadPoolExecutor执行过程
    1. 调用ThreadPoolExecutor的execute提交线程任务,检查当前线程大小,如果小于核心线程corePoolSize大小,则新创建线程执行任务;
    2. 如果大于核心线程corePoolSize大小,但是小于最大线程池maximumPoolSize大小,则将线程任务加入到BlockingQueue队列,等待处理;
    3. 如果大于最大线程池maximumPoolSize大小,则直接reject。

    相关文章

      网友评论

          本文标题:OKHTTP基础篇之线程池ThreadPoolExecutor(

          本文链接:https://www.haomeiwen.com/subject/bkimkqtx.html