美文网首页
Java多线程学习之并发容器和框架

Java多线程学习之并发容器和框架

作者: Steven1997 | 来源:发表于2018-09-05 10:41 被阅读21次

    Java平台类库包含了丰富的并发基础构建模块,例如线程安全的容器类以及各种用于协调多个相互协作的线程控制流的同步工具类( Synchronizer)。本章将介绍其中一些最有用的并发构建模块,特别是在Java5.0和Java6中引入的一些新模块,以及在使用这些模块来构造并发应用程序时的一些常用模式。

    同步容器类

    同步容器类包括 Vector和 Hashtable,二者是早期JDK的一部分,此外还包括在JDK1.2中添加的一些功能相似的类,这些同步的封装器类是由 Collections. synchronizedxxx等工厂方法创建的。这些类实现线程安全的方式是:将它们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能访问容器的状态。

    同步容器类的问题





    迭代器与Concurrent-ModificationException



    隐藏迭代器



    并发容器



    ConcurrentHashMap

    为什么要使用ConcurrentHashMap

    在并发编程中使用HashMap可能导致程序死循环。而使用线程安全的HashTable效率又非常低下,基于以上两个原因,所以考虑使用ConcurrentHashMap。
    (1)线程不安全的HashMap
    在多线程环境下,使用HashMap进行put操作会引起死循环,导致CPU利用率接近100%,所以在并发情况下不能使用HashMap。例如,执行以下代码会引起死循环。


    HashMap在并发执行put操作时会引起死循环,是因为多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。

    (2)效率低下的HashTable
    HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下。因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同步方法时,会进入阻塞或轮询状态。如线程1使用put进行元素添加,线程2不但不能使用put方法添加元素,也不能使用get方法来获取元素,所以竞争越激烈效率越低。

    (3)ConcurrentHashMap的锁分段技术可有效提升并发访问率
    HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的线程都必须竞争同一把锁,假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术。首先将数据分成一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。
    ConcurrentHashmap与其他并发容器一起增强了同步容器类:它们提供的迭代器不会抛出ConcurrentModificationException,因此不需要在迭代过程中对容器加锁。 ConcurrentHashmap返回的迭代器具有弱一致性( Weakly Consistent),而并非“及时失败”。弱一致性的迭代器可以容忍并发的修改,当创建迭代器时会遍历已有的元素,并可以(但是不保证)在迭代器被构造后将修改操作反映给容器。

    ConcurrentHashMap的结构

    通过ConcurrentHashMap的类图来分析ConcurrentHashMap的结构,如下所示:



    ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色;HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组。Segment的结构和HashMap类似,是一种数组和链表结构。一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得与它对应的Segment锁,如下所示:


    ConcurrentHashMap的初始化

    ConcurrentHashMap初始化方法是通过initialCapacity、loadFactor和concurrencyLevel等几个参数来初始化segment数组、段偏移量segmentShift、段掩码segmentMask和每个segment里的HashEntry数组来实现的。

    1.初始化segments数组

    让我们来看一下初始化segments数组的源代码。


    由上面的代码可知,segments数组的长度ssize是通过concurrencyLevel计算得出的。为了能通过按位与的散列算法来定位segments数组的索引,必须保证segments数组的长度是2的N次方(power-of-two size),所以必须计算出一个大于或等于concurrencyLevel的最小的2的N次方值来作为segments数组的长度。假如concurrencyLevel等于14、15或16,ssize都会等于16,即容器里锁的个数也是16。

    注意:concurrencyLevel的最大值是65535,这意味着segments数组的长度最大为65536,对应的二进制是16位。

    2.初始化segmentShift和segmentMask

    这两个全局变量需要在定位segment时的散列算法里使用,sshift等于ssize从1向左移位的次数,在默认情况下concurrencyLevel等于16,1需要向左移位移动4次,所以sshift等于4。segmentShift用于定位参与散列运算的位数,segmentShift等于32减sshift,所以等于28,这里之所以用32是因为ConcurrentHashMap里的hash()方法输出的最大数是32位的,后面的测试中我们可以看到这点。segmentMask是散列运算的掩码,等于ssize减1,即15,掩码的二进制各个位的值都是1。因为ssize的最大长度是65536,所以segmentShift最大值是16,segmentMask最大值是65535,对应的二进制是16位,每个位都是1。

    3.初始化每个segment

    输入参数initialCapacity是ConcurrentHashMap的初始化容量,loadfactor是每个segment的负载因子,在构造方法里需要通过这两个参数来初始化数组中的每个segment。



    上面代码中的变量cap就是segment里HashEntry数组的长度,它等于initialCapacity除以ssize的倍数c,如果c大于1,就会取大于等于c的2的N次方值,所以cap不是1,就是2的N次方。
    segment的容量threshold=(int)cap*loadFactor,默认情况下initialCapacity等于16,loadfactor等于0.75,通过运算cap等于1,threshold等于零。

    定位Segment

    既然ConcurrentHashMap使用分段锁Segment来保护不同段的数据,那么在插入和获取元素的时候,必须先通过散列算法定位到Segment。可以看到ConcurrentHashMap会首先使用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再散列。



    之所以进行再散列,目的是减少散列冲突,使元素能够均匀地分布在不同的Segment上,从而提高容器的存取效率。假如散列的质量差到极点,那么所有的元素都在一个Segment中,不仅存取元素缓慢,分段锁也会失去意义。下面测试不通过再散列而直接执行散列计算:



    计算后输出的散列值全是15,通过这个例子可以发现,如果不进行再散列,散列冲突会非常严重,因为只要低位一样,无论高位是什么数,其散列值总是一样。我们再把上面的二进制数据进行再散列后结果如下(为了方便阅读,不足32位的高位补了0,每隔4位用竖线分割下)。

    可以发现,每一位的数据都散列开了,通过这种再散列能让数字的每一位都参加到散列运算当中,从而减少散列冲突。ConcurrentHashMap通过以下散列算法定位segment。



    默认情况下segmentShift为28,segmentMask为15,再散列后的数最大是32位二进制数据,向右无符号移动28位,意思是让高4位参与到散列运算中,(hash>>>segmentShift)&segmentMask的运算结果分别是4、15、7和8,可以看到散列值没有发生冲突。

    ConcurrentHashMap的操作

    下面介绍ConcurrentHashMap的3种操作——get操作、put操作、size操作和额外的原子操作。

    1.get操作

    Segment的get操作实现非常简单和高效。先经过一次再散列,然后使用这个散列值通过散列运算定位到Segment,再通过散列算法定位到元素,代码如下。



    get操作的高效之处在于整个get过程不需要加锁,除非读到的值是空才会加锁重读。我们知道HashTable容器的get方法是需要加锁的,那么ConcurrentHashMap的get操作是如何做到不加锁的呢?原因是它的get方法里将要使用的共享变量都定义成volatile类型,如用于统计当前Segement大小的count字段和用于存储值的HashEntry的value。定义成volatile的变量,能够在线程之间保持可见性,能够被多线程同时读,并且保证不会读到过期的值,但是只能被单线程写(有一种情况可以被多线程写,就是写入的值不依赖于原值),在get操作里只需要读不需要写共享变量count和value,所以可以不用加锁。之所以不会读到过期的值,是因为根据Java内存模型的happen before原则,对volatile字段的写入操作先于读操作,即使两个线程同时修改和获取volatile变量,get操作也能拿到最新的值,这是用volatile替换锁的经典应用场景。



    在定位元素的代码里我们可以发现,定位HashEntry和定位Segment的散列算法虽然一样,都与数组的长度减去1再相“与”,但是相“与”的值不一样,定位Segment使用的是元素的hashcode通过再散列后得到的值的高位,而定位HashEntry直接使用的是再散列后的值。其目的是避免两次散列后的值一样,虽然元素在Segment里散列开了,但是却没有在HashEntry里散列开。
    2.put操作

    由于put方法里需要对共享变量进行写入操作,所以为了线程安全,在操作共享变量时必须加锁。put方法首先定位到Segment,然后在Segment里进行插入操作。插入操作需要经历两个步骤,第一步判断是否需要对Segment里的HashEntry数组进行扩容,第二步定位添加元素的位置,然后将其放在HashEntry数组里。
    (1)是否需要扩容
    在插入元素前会先判断Segment里的HashEntry数组是否超过容量(threshold),如果超过阈值,则对数组进行扩容。值得一提的是,Segment的扩容判断比HashMap更恰当,因为HashMap是在插入元素后判断元素是否已经到达容量的,如果到达了就进行扩容,但是很有可能扩容之后没有新元素插入,这时HashMap就进行了一次无效的扩容。
    (2)如何扩容
    在扩容的时候,首先会创建一个容量是原来容量两倍的数组,然后将原数组里的元素进行再散列后插入到新的数组里。为了高效,ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容。

    3.size操作

    如果要统计整个ConcurrentHashMap里元素的大小,就必须统计所有Segment里元素的大小后求和。Segment里的全局变量count是一个volatile变量,那么在多线程场景下,是不是直接把所有Segment的count相加就可以得到整个ConcurrentHashMap大小了呢?不是的,虽然相加时可以获取每个Segment的count的最新值,但是可能累加前使用的count发生了变化,那么统计结果就不准了。所以,最安全的做法是在统计size的时候把所有Segment的put、remove和clean方法全部锁住,但是这种做法显然非常低效。
    因为在累加count操作过程中,之前累加过的count发生变化的几率非常小,所以ConcurrentHashMap的做法是先尝试2次通过不锁住Segment的方式来统计各个Segment大小,如果统计的过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment的大小。
    那么ConcurrentHashMap是如何判断在统计的时候容器是否发生了变化呢?使用modCount变量,在put、remove和clean方法里操作元素前都会将变量modCount进行加1,那么在统计size前后比较modCount是否发生变化,从而得知容器的大小是否发生变化。

    4.额外的原子Map操作

    CopyOnWriteArrayList


    阻塞队列

    什么是阻塞队列

    阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。
    1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
    2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。
    在阻塞队列不可用时,这两个附加操作提供了4种处理方式,如下表所示:

    • 抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException("Queue
      full")异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常。
    • 返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移除方法,则是从队列里取出一个元素,如果没有则返回null。
    • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。
    • 超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出。

    这两个附加操作的4种处理方式不方便记忆,所以我找了一下这几个方法的规律。put和take分别尾首含有字母t,offer和poll都含有字母o。

    注意:
    对于有界阻塞队列:如果队列为空 take() 将阻塞,直到队列中有内容;如果队列为满 put() 将阻塞,直到队列有空闲位置;
    对于无界阻塞队列:队列不可能会出现满的情况,所以使用put或offer方法永远不会被阻塞,而且使用offer方法时,该方法永远返回true。

    使用 BlockingQueue 实现生产者消费者问题

    public class ProducerConsumer {
    
        private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
    
        private static class Producer extends Thread {
            @Override
            public void run() {
                try {
                    queue.put("product");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.print("produce..");
            }
        }
    
        private static class Consumer extends Thread {
    
            @Override
            public void run() {
                try {
                    String product = queue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.print("consume..");
            }
        }
    }
    
    public static void main(String[] args) {
        for (int i = 0; i < 2; i++) {
            Producer producer = new Producer();
            producer.start();
        }
        for (int i = 0; i < 5; i++) {
            Consumer consumer = new Consumer();
            consumer.start();
        }
        for (int i = 0; i < 3; i++) {
            Producer producer = new Producer();
            producer.start();
        }
    }
    

    结果

    produce..produce..consume..consume..produce..
    consume..produce..consume..produce..consume..
    

    阻塞队列分类

    JDK 7提供了7个阻塞队列,如下:

    • ArrayBlockingQueue:一个基于数组结构的有界阻塞队列。
    • LinkedBlockingQueue:一个基于链表结构的有界/无界阻塞队列。
    • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
    • DelayQueue:一个支持延时获取元素的无界阻塞队列。
    • SynchronousQueue:一个不存储元素的阻塞队列。
    • LinkedTransferQueue:一个基于链表结构的无界阻塞队列。
    • LinkedBlockingDeque:一个基于链表结构的双向阻塞队列。
    1.ArrayBlockingQueue

    ArrayBlockingQueue是一个基于数组结构的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。
    默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。为了保证公平性,通常会降低吞吐量。我们可以使用以下代码创建一个公平的阻塞队列:



    访问者的公平性是使用可重入锁实现的,代码如下:


    2.LinkedBlockingQueue

    LinkedBlockingQueue是一个基于链表结构的有界/无界阻塞队列,指定了容量就是有界阻塞队列,未指定容量默认为Integer.MAX_VALUE,为无界阻塞队列。此队列按FIFO排序元素。

    3.PriorityBlockingQueue

    PriorityBlockingQueue是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者初始化PriorityBlockingQueue时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。

    4.DelayQueue

    DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
    DelayQueue非常有用,可以将DelayQueue运用在以下应用场景:

    • 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
    • 定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,比如TimerQueue就是使用DelayQueue实现的。
    5.SynchronousQueue

    SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。
    它支持公平访问队列。默认情况下线程采用非公平性策略访问队列。使用以下构造方法可以创建公平性访问的SynchronousQueue,如果设置为true,则等待的线程会采用先进先出的顺序访问队列。



    SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。SynchronousQueue降低了将数据从生产者移动到消费者的延迟(不必出队和入队),它的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue。
    因为SynchronousQueue没有存储功能,因此put和take会一直阻塞,直到由另一个线程已经准备好参与到交付过程中。仅当有足够多的消费者,并且总是有一个消费者准备好获取交付的工作时,才适合使用SynchronousQueue。

    6.LinkedTransferQueue

    LinkedTransferQueue是一个基于链表结构的无界阻塞TransferQueue队列。相对于其他阻塞队列,LinkedTransferQueue多了tryTransfer和transfer方法。
    (1)transfer方法
    如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。
    (2)tryTransfer方法
    tryTransfer方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是tryTransfer方法无论消费者是否接收,方法立即返回,而transfer方法是必须等到消费者消费了才返回。对于带有时间限制的tryTransfer(E e,long timeout,TimeUnit unit)方法,试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。

    7.LinkedBlockingDeque

    LinkedBlockingDeque是一个基于链表结构的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque多了addFirst、addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First单词结尾的方法,表示插入、获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。另外,插入方法add等同于addLast,移除方法remove等效于removeFirst。但是take方法却等同于takeFirst,不知道是不是JDK的bug,使用时还是用带有First和Last后缀的方法更清楚。
    在初始化LinkedBlockingDeque时可以设置容量防止其过度膨胀。另外,双向阻塞队列可以运用在“工作窃取”模式中。

    阻塞方法和中断方法



    Fork/Join框架

    什么是Fork/Join框架

    Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
    我们再通过Fork和Join这两个单词来理解一下Fork/Join框架。Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+…+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果。

    Fork/Join的运行流程如下:

    双端队列与工作窃取算法

    Java6增加了两种容器类型,Deque 和 BlockingDeque,它们分别对Queue和 BlockingQueue进行了扩展。 Deque是一个双端队列,实现了在队列头和队列尾的高效插入和移除。具体实现包括 ArrayDeque和LinkedblockingDeque。正如阻塞队列适用于生产者一消费者模式,双端队列同样适用于另一种相关模式,即工作窃取( Work Stealing)算法
    工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。那么,为什么需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。比如A线程负责处理A队列里的任务。但是,有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
    在生产者一消费者设计中,所有消费者有一个共享的工作队列,而在工作窃取设计中,每个消费者都有各自的双端队列。如果一个消费者完成了自己双端队列中的全部任务,那么它可以从其他消费者双端队列末尾窃取任务。工作窃取模式比传统的生产者一消费者模式具有更高的可伸缩性,这是因为工作者线程不会在单个共享的任务队列上发生竞争。在大多数时候,它们都只是访问自己的双端队列,从而极大地减少了竞争。当工作者线程需要访问另一个队列时,它会从队列的尾部而不是从头部获取工作,因此进一步降低了队列上的竞争程度。工作窃取非常适用于既是消费者也是生产者问题—当执行某个工作时可能导致出现更多的工作。例如,在网页爬虫程序中处理一个页面时,通常会发现有更多的页面需要处理。类似的还有许多搜索图的算法,例如在垃圾回收阶段对堆进行标记,都可以通过工作密取机制来实现高效并行。当一个工作线程找到新的任务单元时,它会将其放到自己队列的末尾(或者在工作共享设计模式中,放入其他工作者线程的队列中)。当双端队列为空时,它会在另一个线程的队列队尾査找新的任务,从而确保每个线程都保持忙碌状态。

    工作窃取运行流程图如下:

    工作窃取算法的优点:充分利用线程进行并行计算,减少了线程间的竞争。

    工作窃取算法的缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列。

    Fork/Join框架的设计

    我们已经很清楚Fork/Join框架的需求了,那么可以思考一下,如果让我们来设计一个Fork/Join框架,该如何设计?这个思考有助于你理解Fork/Join框架的设计。

    步骤1 分割任务。首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停地分割,直到分割出的子任务足够小。

    步骤2 执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。

    Fork/Join使用两个类来完成以上两件事情。
    ① ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。通常情况下,我们不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了以下两个子类:

    • RecursiveAction:用于没有返回结果的任务。
    • RecursiveTask:用于有返回结果的任务。

    ② ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。
    任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

    使用Fork/Join框架

    让我们通过一个简单的需求来使用Fork/Join框架,需求是:计算1+2+3+4的结果。

    使用Fork/Join框架首先要考虑到的是如何分割任务,如果希望每个子任务最多执行两个数的相加,那么我们设置分割的阈值是2,由于是4个数字相加,所以Fork/Join框架会把这个任务fork成两个子任务,子任务一负责计算1+2,子任务二负责计算3+4,然后再join两个子任务的结果。因为是有结果的任务,所以必须继承RecursiveTask,实现代码如下:




    通过这个例子,我们进一步了解ForkJoinTask,ForkJoinTask与一般任务的主要区别在于它需要实现compute方法,在这个方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用fork方法时,又会进入compute方法,看看当前子任务是否需要继续分割成子任务,如果不需要继续分割,则执行当前子任务并返回结果。使用join方法会等待子任务执行完并得到其结果。

    ForkJoin 使用 ForkJoinPool 来启动,它是一个特殊的线程池,线程数量取决于 CPU 核数。

    public class ForkJoinPool extends AbstractExecutorService
    

    ForkJoinPool 实现了工作窃取算法来提高 CPU 的利用率。每个线程都维护了一个双端队列,用来存储需要执行的任务。工作窃取算法允许空闲的线程从其它线程的双端队列中窃取一个任务来执行。窃取的任务必须是最晚的任务,避免和队列所属线程发生竞争。例如下图中,Thread2 从 Thread1 的队列中拿出最晚的 Task1 任务,Thread1 会拿出 Task2 来执行,这样就避免发生竞争。但是如果队列中只有一个任务时还是会发生竞争。


    Fork/Join框架的异常处理

    ForkJoinTask在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法获取异常。使用如下代码:



    getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。

    相关文章

      网友评论

          本文标题:Java多线程学习之并发容器和框架

          本文链接:https://www.haomeiwen.com/subject/oijliftx.html