美文网首页
一次ThreadPoolExecutor中maximumPool

一次ThreadPoolExecutor中maximumPool

作者: Saxon_323e | 来源:发表于2019-10-18 11:07 被阅读0次

    之前对于ThreadPoolExecutor的几个参数corePoolSize,maximumPoolSize,keepAliveTime,BlockingQueue的理解是:
    corePoolSize:核心线程数
    maximumPoolSize:线程池最大线程数,包括coreThread,所以应大于等于corePoolSize
    keepAliveTime:允许存活时间,线程池是为了避免线程的频繁创建和销毁,所以当非核心线程没有任务要做时,会等待keepAliveTime时间。如果超过keepAliveTime时间没有任务到来,就销毁。核心线程会一直存在,即使一直没有任务到来。
    BlockingQueue:任务队列,当线程池线程数超过maximumPoolSize时,将任务存入BlockingQueue;当BlockingQueue满时,拒绝并丢弃之后来的任
    务。如果线程池中线程当前任务执行完毕后,会从BlockingQueue中取任务,继续执行。
    最近发现,当线程池线程数超过maximumPoolSize时,将任务存入BlockingQueue理解是错误的。
    发现的缘由,用线程池实现上传多张图片功能。想在线程池执行完毕后,不再保留线程,所以将corePoolSize设为0;想最多有两个线程执行,maximumPoolSize设置为2;存活时长设为60;任务多时,能无限存储,BlockingQueue使用LinkedBlockingQueue。代码如下:

    ExecutorService executorService = new ThreadPoolExecutor(0, 2,
                60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    

    运行一下,没有问题。过了好久发现,所有的任务都在同一个线程中执行,已经很久了,有坑!
    想不明白为什么,只好看ExcutorService的源码。任务是通过executorService.execute()方法执行的,就从execute()方法看起:

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) { // 1
                if (addWorker(command, true))  // 1.5
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) { // 2
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command)) // 2.1
                    reject(command);
                else if (workerCountOf(recheck) == 0) // 2.5
                    addWorker(null, false);
            }
            else if (!addWorker(command, false)) 
                reject(command);
        }
    

    在1处,判断当前线程池中的线程数目是否小于核心线程数,如果小于,在1.5处创建核心线程,如果创建成功,就return。注意,此处传入了command,应该是创建完成后就直接执行。

    因为我设置到corePoolSize为0,所以1.5不会执行,执行到2。
    在2处,判断线程池是否处于running的状态,如果处于running状态,任务是否成功添加到workQueue中!!!
    注意了!!!当线程数目大于核心线程数后,就会将任务添加到workQueue中!!!

    2处,线程池为running状态,且LinkedWorkQueue能添加无限任务,所以2为true,会在2.1处再次判断线程池是否running,此处为runnig,则走到2.5,判断线程池的数目是否为0,如果为0,则创建非核心线程。注意,此处传入null,应该是在线程执行中,从workQueue中取任务来执行。令人意外的是,如果线程池数目不为0,那么这个方法就结束了!!!只做了将任务添加到workQueue中一件事!

    那接着看下工作者线程的执行:

    final void runWorker(Worker w) {
            // ...省略无关代码
            try {
                while (task != null || (task = getTask()) != null) { // 1
                    // ...省略无关代码
                    try {
                        task.run();  // 2
                        // ...省略无关代码
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    只有两步:1 取任务 2 任务执行, 看getTask()

    
    private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
     
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
     
                int wc = workerCountOf(c);
     
                // Are workers subject to culling?
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
     
     
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();  // 1
                    if (r != null)
                        return r;
                    timedOut = true;
            }
        }
    

    从workQueue中取任务执行 那么,回到最初的代码

    ExecutorService executorService = new ThreadPoolExecutor(0, 2,
                60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    

    当首次调用execute()方法时,由于核心线程数为0, 所以只创建了一个非核心线程;之后再调用execute()方法,只将runnable放入LinkedBlockingQueue中,然后,一个线程不断得循环,从workQueue中取任务执行,直到没有任务后,这个就会等待60秒,如果60秒内无新任务到来,这个线程就退出了。

    所以,我成功得创建了一个单线程的线程池,此时无论传入的maximumPoolSize为多少,都只会创建最多一个线程!
    正确的使用方法:

    val executorService = Executors.newFixedThreadPool(3)
    

    或者使用RxJava

    
    Observable.from(photos)
                    .onBackpressureBuffer()
                    .flatMap({ upload(it) }, 3)
                    .buffer(photos.size)
                    .flatMap {
                        Observable.just(zipPairs(it.toTypedArray()))
                    }
                    .subscribeOn(Schedulers.io())
    

    ————————————————
    版权声明:本文为CSDN博主「ZHxin」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/ZHXLXH/article/details/85342570

    相关文章

      网友评论

          本文标题:一次ThreadPoolExecutor中maximumPool

          本文链接:https://www.haomeiwen.com/subject/fskimctx.html