美文网首页
Java 线程池框架

Java 线程池框架

作者: 自负的鱼 | 来源:发表于2019-01-22 11:54 被阅读14次

    线程池通过多个任务重用线程方案,解决了线程的生命周期开销和资源不足的问题(不需要频繁创建和销毁线程)。

    线程池优点:

    • 降低资源消耗,重复利用已经创建的线程,降低重复创建和销毁线程的资源开销
    • 提高相应速度,任务的执行可以不等待线程创建完成就可以执行
    • 提高线程的管理,使用线程池统一配置、监控、优化线程的创建。


      image.png

    java中的线程池使用ThreadPoolExecutor来实现,同时提供了Executor框架异步任务调度框架。

    • Executor:任务执行接口类,只有execute()方法来执行传入的线程任务。
    • ExecutorService:扩展了Executor接口,提供了线程池管理和终止的方法。shutDown(),shutDownNow(),invokeAll(),invokeAny()和submit()等方法
    • ScheduledExecutorService:扩展了ExecutorService接口,增加了schedule方法,按照指定时间间隔定期执行任务的scheduleAtFixedRate()方法和scheduleWithFixedDelay()方法。
    • ThreadPoolExecutor:继承自AbstractExecutorService,也是实现了ExecutorService接口。

    线程池的状态:

    • RUNNING:可以接收新任务,并且处理阻塞队列中的任务
    • SHUTDOWN:关闭状态,不能接收新任务,可以继续处理阻塞队列中的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。
    • STOP:不能接收新任务,也不能处理阻塞队列中的任务,会中断正在处理的任务。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态。
    • TIDYING:所有任务都执行完成,线程池中workerCount (有效线程数) 为0。线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。
    • TERMINATED:调用terminated()方法进入该状态。

    ThreadPoolExecutor 详解

    java线程池ThreadPoolExecutor,是线程池实现类,负责线程池初始化,生命周期管理,以及线程任务执行等工作。

    线程池构造方法

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ?
                    null :
                    AccessController.getContext();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    

    构造方法中的字段含义如下:

    • corePoolSize:核心线程数,当有新任务在execute()方法提交时处理方法
      1. 如果运行的线程数小于corePoolSize ,则创建新线程执行任务,即使线程池中的线程都是空闲的
      2. 如果运行线程数大于corePoolSize 但小于maximumPoolSize,则只有workQueue满时才会创建新线程去执行任务,否则任务放入到workQueue中等待执行
      3. 如果运行的线程大于等于maximumPoolSize,并且workQueue已满了,则通过配置的拒绝策略handler来执行
    • maximumPoolSize:最大线程数
    • workQueue:等待队列,当提交任务时当线程池中线程数大于corePoolSize,则把该任务组装成Worker对象放入workQueue队列等待执行。
    • keepAliveTime:线程池维护线程的所允许的最大线程空闲时间。当线程池中线程数量大于corePoolSize时候,这是又没有新的任务提交,核心线程并不会立即销毁,而是等待keepAliveTime时间后才回回收
    • threadFactory:ThreadFactory类型的变量,用来创建新的线程。默认使用Executors.defaultThreadFactory() 来创建线程。
    • handler:配置线程池饱和策略,如果阻塞队列满并且没有空闲线程时,再添加任务就会采用该配置策略。
      1. AbortPolicy:直接抛出异常,默认配置
      2. CallerRunsPolicy:用调用者所在的线程执行任务
      3. DiscardOldestPolicy:丢弃阻塞队列中最靠前的任务,并执行当前任务
      4. DiscardPolicy:直接丢弃任务

    ThreadPoolExecutor提交任务

    通过ThreadPoolExecutor的execute()方法用来提交任务,原理为通过addWorker()方法将任务封装为Worker线程对象。并执行Worker对象执行任务。

    Worker对象为线程,实现了Runnable接口,Worker类中的run方法调用了runWorker方法来执行任务,该方法循环通过getTask()方法阻塞从workQueue队列中获取待执行的任务,调用任务的run()方法执行任务逻辑。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        // 获取第一个任务
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 允许中断
        w.unlock(); // allow interrupts
        // 是否因为异常退出循环
        boolean completedAbruptly = true;
        try {
            // 如果task为空,则通过getTask来获取任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    相关文章

      网友评论

          本文标题:Java 线程池框架

          本文链接:https://www.haomeiwen.com/subject/cxjljqtx.html