执行器

作者: 原创迷恋者 | 来源:发表于2019-10-16 08:27 被阅读0次

    在之前的例子中,线程执行的任务,在Runnable对象中定义,和线程,在Thread对象中定义,两者之间总是有一种密切的关联。这样的机制对小型应用管用,但是在大型的应用中,把线程管理和应用的其他部分分离会更好。负责封装这些功能的对象被成为Executors执行器。之后的子章节详细介绍了执行器的细节。

    • Executor接口定义了三种执行器对象类型。
    • Thread Pools是最常见的执行器实现。
    • Fork/Join是JAVA 7中新引入的一种利用多处理器优势的框架。

    Executor Interfaces

    java.util.concurrent包定义了三种执行器接口:

    • Executor,一个支持发起新任务的简单接口。
    • ExecutorService,Executor的一个子接口,它增加了帮助生命期管理的功能,可以用在单个的任务和执行器自己之上。
    • ScheduledExecutorService,是ExecutorService的子接口,支持未来and/or阶段性的任务执行。

    执行器接口

    执行器接口提供了一种简单的方法,execute,被设计成线程创造动作的一个简单代替。如果r是一个Runnable对象,那么e是一个可以替换的Executor对象,(new Thread(r)).start(); 可以替换成e.execute(r);。然而,execute的定义没有那么具体。底层次的做法是建立一个线程,然后立刻执行。依赖于Executor的实现,execute也会做同样的事,但是更有可能会使用一个现有的工人线程来运行r,或者是把r放进一个队列,等待工人线程可用。(我们将在线程池的章节讲到工人线程。)

    java.util.concurrent包中的执行器实现是被设计成能充分利用更高级的ExecutorService和ScheduledExecutorService接口,尽管他们也是基于基本的Executor接口工作。

    ExecutorService接口

    ExecutorService接口使用一个更加全能的方法submit来支持execute方法。和execute方法一样,submit方法接受Runnable对象,但也接受Callable对象,Callable对象允许任务拥有一个返回值。submit方法会返回一个Future对象,它可以获取Callable的返回值,以及管理Callable和Runnable任务的状态。

    ExecutorService也为提交大规模Callable集合对象提供了方法。最后,ExecutorService提供了一系列的方法来管理执行器的关闭。为了支持立刻关闭,任务需要正确处理中断。

    ScheduledExecutorService接口

    与它的父类ExecutorService相比,ScheduledExecutorService接口提供了schedule方法,该方法可以在一段时间的延迟后执行Runnable或Callable任务。此外,该接口定义了scheduleAtFixedRate方法和scheduleWithFixedDelay方法,他们可以以特定的间隔,反复执行特定的任务。

    线程池

    java.util.concurrent包中大部分的执行器实现使用了线程池。这些线程池和Runnable、Callable任务分别存在,并且总是被用来运行多任务。

    使用工人线程最小程度地减少了由于线程创建带来的开销。新建线程会使用大量的内存,在大规模的应用中,分配和回收很多线程对象会带来巨大的内存管理开销。

    一个常见的线程池类型是固定线程池。这种类型的线程池拥有指定数量的正在执行的线程;如果一个线程在运行时意外退出了,另一个新的线程会自动取代它。任务通过内部的队列分配给池子,队列负责保存多余的任务,当任务的数量超过线程数量时。

    固定线程池的一个显著的优点是应用会非常小心地使用它。想要理解这一点,考虑一个网页服务器应用,其中的每个HTTP应用都被一个不同的线程处理。如果这个应用在每次获得新线程的时候都创建一个新的线程,那么假如这个系统忽然收到超出它能力的请求数量,它可能会忽然停止响应所有请求,因为这些操作的额外成本超出了系统的能力。

    通过限制系统中能创建的线程数量,应用没办法像请求发送速度一样地处理请求,但它能够尽它所能地来服务请求。

    创建固定线程池的一个简单方法是使用在java.util.concurrent包中的newFixedThreadPool工厂方法。这个类也提供了其他种类的工厂方法如下所示:

    • newCachedThreadPool方法,会创建一个能够扩展线程池尺寸的执行器。这种执行器适合生命周期很短的应用。
    • newSingleThreadExecutor方法,创建一次只会执行一个任务的执行器。
    • 还有几个工厂方法是上面执行器的ScheduledExecutorService版本。

    如果上面提供的方法没有能满足你的要求,构造java.util.concurrent.ThreadPoolExecutor或java.util.concurrent.ScheduledThreadPoolExecutor的实例会给你更多的选择。

    Fork/Join

    Fork/Join框架是ExecutorService接口的一个实现,它可以帮助你利用多处理器。它被设计成针对那些能够被递归分解为更小任务的工作。目的是最大限度地使用可提供地处理器能力来提高应用的性能。

    和其他ExecutorService的实现一样,fork/join框架把任务分发给线程池中的若干工人线程。让Fork/Join与众不同的是,它使用了工作偷窃算法。做完了事情的工人线程会从其他繁忙线程中偷任务。

    Fork/Join框架的核心类是ForkJoinPool,它是AbstractExecutorService类的扩展。ForkJoinPool实现了核心的工作偷窃算法,并且会执行ForkJoinTask进程。

    基础用法

    使用fork/join框架的第一步,是写能执行一部分工作的代码。你的代码会看上去和下面的很像:

    if (my portion of the work is small enough)
      do the work directly
    else
      split my work into two pieces
      invoke the two pieces and wait for the results
    

    把这些代码包裹在ForkJoinTask子类中,或是使用它的更为具体的一个子类,比如RecursiveTask或RecursiveAction。

    等你的ForkJoinTask子类完成之后,创建一个代表所有工作的对象,然后把它传递给ForkJoinPool实例的invoke方法。

    为了清晰而模糊

    为了帮助你理解fork/join框架的工作原理,考虑下面的案例。假设你想要涂掉一幅画。原始的画素材由一个整数数组代表,其中每个整数都包含着一个像素的颜色值。混淆后的目标图也是由相同大小的整数数组代表。

    实施模糊操作是由每次操作一个像素完成的。每个像素都被赋值为它周围像素的平均值,结果存放在结果数组中。因为这个图像是一个很大的数组,这个过程会花费很长的时间。你可以通过实现算法来使用fork/join,利用多核系统的并行处理能力。下面是一种可能的实现:

    public class ForkBlur extends RecursiveAction {
        private int[] mSource;
        private int mStart;
        private int mLength;
        private int[] mDestination;
      
        // Processing window size; should be odd.
        private int mBlurWidth = 15;
      
        public ForkBlur(int[] src, int start, int length, int[] dst) {
            mSource = src;
            mStart = start;
            mLength = length;
            mDestination = dst;
        }
    
        protected void computeDirectly() {
            int sidePixels = (mBlurWidth - 1) / 2;
            for (int index = mStart; index < mStart + mLength; index++) {
                // Calculate average.
                float rt = 0, gt = 0, bt = 0;
                for (int mi = -sidePixels; mi <= sidePixels; mi++) {
                    int mindex = Math.min(Math.max(mi + index, 0),
                                        mSource.length - 1);
                    int pixel = mSource[mindex];
                    rt += (float)((pixel & 0x00ff0000) >> 16)
                          / mBlurWidth;
                    gt += (float)((pixel & 0x0000ff00) >>  8)
                          / mBlurWidth;
                    bt += (float)((pixel & 0x000000ff) >>  0)
                          / mBlurWidth;
                }
              
                // Reassemble destination pixel.
                int dpixel = (0xff000000     ) |
                       (((int)rt) << 16) |
                       (((int)gt) <<  8) |
                       (((int)bt) <<  0);
                mDestination[index] = dpixel;
            }
        }
    

    你现在实现了抽象的compute()方法,它要么直接进行模糊操作,要么把任务分成两份。一个简单的数组长度阈值决定了这个工作是直接运算,还是分割。

    protected static int sThreshold = 100000;
    
    protected void compute() {
        if (mLength < sThreshold) {
            computeDirectly();
            return;
        }
        
        int split = mLength / 2;
        
        invokeAll(new ForkBlur(mSource, mStart, split, mDestination),
                  new ForkBlur(mSource, mStart + split, mLength - split,
                               mDestination));
    }
    

    相关文章

      网友评论

          本文标题:执行器

          本文链接:https://www.haomeiwen.com/subject/cgjjmctx.html