美文网首页
Java并发-30.Executor框架

Java并发-30.Executor框架

作者: 悠扬前奏 | 来源:发表于2019-06-08 23:48 被阅读0次
    • HotSpot VM线程模型中Java线程被一对一映射为本地操作系统线程。
    • 应用程序通过Executor框架控制上层调度,下层调度由操作系统内核控制,不受应用程序影响

    1. Executor框架结构

    • 任务,包括执行任务需要实现的接口:
      • Runnable接口和Callable接口的实现类,用于被ThreadPoolExecutor或ScheduledThreadExecutor执行
    • 任务的执行,包括任务执行机制的核心接口Executor,和继承自Executor的ExecutorService接口,有两个实现类:
      • ThreadPoolExecutor:用来执行被提交的任务
      • ScheduledThreadExecutor:在给定延时后执行任务,或定时执行任务
    • 异步运算结果,包括接口Future和实现Future接口的FutureTask类

    2. Executor成员

    2.1 ThreadPoolExecutor

    工厂类Executors来创建,有三种:

    • FixeThreadPool:可重用固定线程数的线程池
      • 用于需要限制当前线程数量的应用场合
      • 它的corePoolSize和maximunPoolSize设置为创建时的参数。
      • 线程池中线程数大于corePoolSize,多余的空闲线程立即终止
      • 使用无界队列作为工作队列
      • 源码:
        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    
    • SingleThreadExecutor:单个worker线程的Executor
      • 用于保证顺序执行各个任务,在任意时间点也不会有多个活动线程
      • corePoolSize= maximunPoolSize=1
      • 其余和ThreadPoolExecutor一样
      • 源码
        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
        public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>(),
                                        threadFactory));
        }
    
    • CachedThreadPool
      • 根据需要创建新线程,用于执行很多的短期异步小任务
      • 大小无界的线程池,corePoolSize = 0, maximunPoolSize=Integer.MAX_VALUE
      • 空闲线程等待60秒
      • SynchronousQueue作为工作队列
      • 源码:
        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
        public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>(),
                                          threadFactory);
        }
    

    2.2 ScheduledThreadPoolExecutor

    通常用工厂类Executors类创建,有两种:

    • ScheduledThreadPoolExecutor:包含固定个线程的ScheduledThreadPoolExecutor。
      • 使用DelayQueue作为任务队列,放入其中的是ScheduledFutureTask,主要包含3个成员变量:

        • long time,标识任务将要被执行的具体时间,DelayQueue中封装了一个PriorityQueue,根据它排序。
        • long sequenceNumber,标识这个任务被添加到ScheduledThreadPoolExecutor中的序号,先按time排序后按照它排序
        • long period,表示任务执行的时间间隔
      • 执行周期任务后,增加额外处理

      • 构造器源码:

        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
        public static ScheduledExecutorService newScheduledThreadPool(
                int corePoolSize, ThreadFactory threadFactory) {
            return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
        }
    
    • SingleThreadScheduledExecutor只包含一个线程的ScheduledThreadPoolExecutor。
      • 构造器源码:
        public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
            return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1));
        }
        public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
            return new DelegatedScheduledExecutorService
                (new ScheduledThreadPoolExecutor(1, threadFactory));
        }
    

    2.3 Future接口

    • Future接口和实现它的FutureTask类用来表示异步运算的结果。
    • 可以看到submit()方法会返回一个FutureTask对象
    • 三种状态
      • 未启动
      • 已启动
        • 正常结束
        • 取消而结束(FutureTask.cancel())
        • 异常而结束
    • 使用
      • 交给Executor执行
      • 通过ExecutorService.submit()方法返回FutureTask,执行get()方法或则cancel()方法。
    • 实现
      • 基于AbstractQueuedSynchronized(AQS)实现:AQS是一个同步框架,提供通用机制来原子性的管理同步状态,阻塞和唤醒线程,以及维护阻塞线程的队列(基于它实现的同步器包括ReentrantLock,Semaphore,ReentrantReadWriteLock,CountDownLatch和FutureTask)
        • 基于AQS的同步器包含两种操作:
          • 至少一个acquire操作,阻塞调用线程,直到AQS状态允许这个线程继续执行
          • 至少一个release操作,改变AQS状态,改变后可允许一个或多个阻塞线程被解除阻塞,Future的release操作包括run()和cancel()

    AQS作为“模板方法模式”的基础类提供给FutureTask的内部子类Sync,这个内部子类只需要实现AQS的tryAcquireShared(int)方法检查同步状态,实现了tryReleaseShared(int)方法更新同步状态,他们控制FutureTask的获取和释放操作。

    Sync是FutureTask的内部私有类,继承与AQS,FutureTask的所有公共方法都委托给了内部私有的Sync:

    • FutureTask.get()方法调用AQS.acquireSharedInturruptibli(int arg)方法:
      1. AQS.acquireSharedInterruptibly(int arg)方法,返回子类Sync中实现的tryAcquireShared()方法来判断acquire方法能否成功,成功条件为state执行完成状态RUN或者已取消状态CANCELLED,且runner不为null
      2. 成功则get()方法立即返回,失败则线程到线程等待队列中去等待其他线程执行release操作
      3. 其他线程执行release操作(FutureTask.run()或者FutureTask.cancel())唤醒后,当前线程再次执行tryAcquireShared()方法返回正值1,当前线程离开线程等待队列并唤醒它的后继线程
      4. 返回计算的结果或抛出异常
    • FutureTask.run()
      1. 执行构造函数中指定的任务(Callable.call())
      2. 原子方式更新同步状态(AQS.compareAndSetState(int except, int update), 设置state为RAN),如果原子操作成功,设置代表计算结果的变量result值为Callable.call()的返回值,然后调用AQS.releaseShared(int arg)
      3. AQS.releaseShared(int arg)首先返回子类Sync中实现的tryReleaseShared(arg)来执行release操作(设置运行任务的线程runner为null,然后返回true);AQS.releaseShared(int arg),然后唤醒线程等待队列中的第一个线程
      4. 调用FutureTask.done()

    2.4 Runnable接口和Callable接口

    • 两个接口的实现类都可以被ScheduledThreadPoolExecutor或者ThreadPoolExecutor执行。
    • Runnable不会返回结果,Callable会返回结果
    • Executors提供了API把Runnable包装成Callable

    相关文章

      网友评论

          本文标题:Java并发-30.Executor框架

          本文链接:https://www.haomeiwen.com/subject/mwjbzqtx.html