美文网首页Java 并发编程
Java并发编程(六):并发容器和工具类

Java并发编程(六):并发容器和工具类

作者: yeonon | 来源:发表于2018-12-09 16:51 被阅读0次

    1 概述

    JDK提供了很多可以简化并发编程的容器以及工具类,例如ConcurrentHashMap、CopyOnWriteArrayList,Semaphore,CountDownLatch等。

    对于并发容器来说,他们本身内部提供了足够的同步手段,所以在外部客户端调用的时候,即使不加入同步手段(例如内置锁或者显式锁等)也可以保证线程安全。这个特点是并发容器最突出的好处,简化了客户端开发,使得客户端代码错误率大大降低,可用性也大大提高了。除此之外,并发容器的性能也非常不错,很多时候会比用户自己开发的要好很多。

    对于并发工具类来说,他们绝大多数底层都将一些同步操作委托给AQS框架来做,客户端可以使用这些工具来完成一些特殊的需求,例如可以用CountDownLatch来完成“让所有线程在同一起跑线,公平竞争”的需求,或者用Semaphore来做并发流量控制等等。

    本文争取深入到这些容器或者工具类的原理,而不是仅仅简单的介绍他们的使用以及API的功能(说实话,明白使用以及API功能看看JDK文档就行了)。

    2 并发容器

    本文主要介绍三种常见、常用的并发容器:ConcurrentHashMap,CopyOnWriteArrayList以及BlockingQueue。ConcurrentHashMap是线程安全的HashMap,我想不少朋友都看过一些文章说并发环境下HashMap扩容会出现“死循环”的问题,而ConcurrentHashMap不会出现这样的问题。CopyOnWriteArrayList也是线程安全的ArrayList,其主要的同步手段就是使用CopyOnWrite(COW)技术。BlockingQueue即阻塞队列,在本系列之前的文章中,不少地方都碰到过BlockingQueue,足以说明BlockingQueue的重要性。

    2.1 ConcurrentHashMap

    ConcurrentHashMap是Java5新加入的类,在此之前,如果想在并发环境下安全的使用键值对存储容器,只能使用Hashtable,Hashtable的实现和HashMap并没有太大的区别,只是在Hashtable各种操作(例如get(),put()等)都被synchronized关键字修饰,即给这些操作方法加上了内置锁,很粗暴,也很有效,但就是效率受到非常大的影响。ConcurrentHashMap作为Hashtable的替代类,具有较高的性能以及较好的伸缩性,下表是两者之间put()方法的性能对比(出处我已经找不到了,抱歉):

    线程数 ConcurrentHashMap Hashtable
    1 1.00 1.03
    2 2.59 32.40
    4 5.58 78.23
    8 13.21 163.48
    16 27.58 341.21
    32 57.27 778.41

    数据上来看,从Hashtable切换到ConcurrentHashMap获得的性能收益非常可观,尤其是线程数多,竞争激烈的时候。那为什么ConcurrentHashMap会有如此优秀的性能呢?

    上面说到过,Hashtable的同步手段非常简单粗暴,只是简单的在各个API上加入synchronized关键字修饰而已,虽然有效,但是性能损失太大,下面是其部分源码:

    public synchronized V get(Object key) {
        Entry<?,?> tab[] = table;
        //....
    }
    
    public synchronized V put(K key, V value) {
        // Make sure the value is not null
        if (value == null) {
            throw new NullPointerException();
        }
        //....
    }
    

    而ConcurrentHashMap的手段更加高级、优雅且性能损失小。Java7的ConcurrentHashMap和Java8的ConcurrentHashMap有比较大的不同,Java7的ConcurrentHashMap采用一种叫做“分段锁”的技术,而Java8抛弃了这种技术,而是采用CAS技术,原因也许是因为分段锁的锁太多,锁多就容易发生死锁吧(个人猜测,具体还不太清楚)。

    由于我电脑装上JDK1.7就老出现莫名其妙的问题,所以我就不分析JDK1.7的源码了。

    2.1.1 JDK1.7中的ConcurrentHashMap

    其存储结构还是和HashMap基本一致,都是数据加链表的数据结构。不同的是,HashMap是一个大数据,数组元素是一个一个Node节点,而ConcurrentHashMap虽然也是数组,但数组元素不是Node,而是一个叫做Segment的数据结构,每个Segment包含一个与HashMap数据结构差不多的Node数组(Node本身就是链表节点)。下图是整体结构图:

    ioqpa8.png

    下面以put操作为例介绍其工作流程:

    1. 首先使用hash函数计算出插入值的hashcode,根据hashcode的高位确定应该插入到哪个Segment里。
    2. 确定Segment之后,再次根据整个hashcode(和之前的是同一个hashcode)决定插入到该Segment中Table的哪个位置。
    3. 之后就和HashMap一样了,如果发生哈希碰撞就将值作为链表作为头结点插入到链表中。(即所谓的“拉链法”)

    get操作也差不多,大家可以自己走一下(如果有能力查看JDK1.7的源码,那更好)。

    到这里,你可以会有疑问,从上面的流程来看,好像没有什么地方有同步操作啊,那ConcurrentHashMap是怎么做到线程安全的?其实Segment类继承了ReentrantLock类,即Segment本身就是一个锁。对ConcurrentHashMap操作的第一步都是确定Segment,在线程进入Segment之前,需要获得该Segment锁,操作完毕之后释放锁,以便其他线程能正常执行。这种技术被称作“分段锁”,由于ConcurrentHashMap存在多个Segment实例,所以也就存在多个锁,根据之前的知识,我们知道,这些锁之间是不会互斥的,即一个线程获取了Segment1的锁,另一个线程仍然可以获取Segment2的锁,这就使得ConcurrentHashMap可以支持并行的写操作且保证线程安全。

    看起来,ConcurrentHashMap似乎很完美,不仅能保证线程安全,而且性能损失不大,但凡事都有两面性,ConcurrentHashMap也有其缺点,最广为认识的缺点应该是size()方法,该方法返回容器中的元素个数。由于ConcurrentHashMap是“分段”的,所以size()并发并不好统计元素个数,设想一下,如果现在线程A调用size()方法且已经遍历过Segment1,此时线程B要往Segment1里插入元素,显然线程A不可能又“回头”去计算Segment1里的元素数量,所以最终的结果会不准确。

    一种简单的解决方法就是调用size()方法的时候把整个ConcurrentHashMap锁住,然后进行遍历统计。但这样做会导致所有的其他操作被阻塞,对性能影响比较大。为了更好的支持并发,ConcurrentHashMap会在不上锁的情况下逐个Segment计算三次,只要相邻的两次计算过程中,修改的次数相等(内部使用modCount变量来计数),那么就说明两次计算的size相同,就可以直接返回结果了。

    介绍完JDK1.7的ConcurrentHashMap,下面来看看JDK1.8的ConcurrentHashMap是怎样的。

    2.1.2 JDK1.8中的ConcurrentHashMap

    JDK1.8的ConcurrentHashMap抛弃了“分段锁”,采用了CAS技术来保证线程安全。其数据结构也发生来变化,使用了和HashMap一样的大数组table。如下所示:

    ioLd00.png

    下面是put操作的源码:

    public V put(K key, V value) {
        return putVal(key, value, false);
    }
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        //key,value不能为null
        if (key == null || value == null) throw new NullPointerException();
        //计算hash值
        int hash = spread(key.hashCode());
        //节点数量
        int binCount = 0;
        //注意这里的for其实是一个无限循环,CAS操作如果失败会不断尝试
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //如果table为null或者其大小为0,那么就进行初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //利用hashcode确定插入位置,如果该位置之前没有值(为null),那么就使用CAS进行插入(插入到头结点),如果操作成功,就直接break,并且返回null(返回之前旧值),如果操作失败,就再次循环尝试。
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            //如果该节点的hashcode和MOVED(-1)相等,那么就说明应该进行扩容操作
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            //到这里,说明既没有扩容,而且发生了哈希碰撞(即该位置有元素)
            else {
                //先拿到旧值
                V oldVal = null;
                //对节点加锁,该节点其实是红黑树或者链表的头结点
                synchronized (f) {
                    //判断当前的节点是否是头结点
                    if (tabAt(tab, i) == f) {
                        //fh大于等于0,说明该节点f是链表的节点(fh是f的hashcode)
                        if (fh >= 0) {
                            binCount = 1;
                            //遍历链表
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                //如果该节点的key已经存在,那么就更新值,然后退出循环
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                //如果已经到达尾节点了,说明key之前不存在,那么就往链表尾部插入新的节点
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //这里对应的是(fh >= 0那个条件),如果不满足fh >= 0条件,那么就判断f是否是一个红黑树节点
                        else if (f instanceof TreeBin) {
                      
                            Node<K,V> p;
                            binCount = 2;
                            //往树中添加节点或者修改节点值
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    //如果节点数量已经超过阈值,就把链表结构转换成红黑树
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //计数+1
        addCount(1L, binCount);
        return null;
    }
    
    

    put()操作还是比较复杂的,我已经在源码中写上了一些注释,所以就不再多说了。虽然有一定难度,但是理解了之后会发现其逻辑还是比较清晰的,希望读者多读几遍源码,推荐写一些测试代码,然后慢慢的DEBUG跟踪,途中注意查看几个关键变量的变化。

    关于其他方法,例如get(),size()等,由于篇幅限制,就不一一介绍了,其实读懂了put()方法,再读其他方法的代码,会非常轻松,所以关键是掌握思想,而不是具体的某段代码!

    2.2 CopyOnWriteArrayList

    ArrayList在多线程并发环境下会有线程安全问题(例如扩容问题),在Java5之前,可以使用Vector作为替代或者Collections.synchronizedList()方法包装来解决ArrayList的线程安全问题。但这两种方法的同步机制都很粗暴,其中Vector就像Hashtable之于HashMap一样,直接在操作API上用synchronized关键字修饰,Collections.synchronizedList()也差不多,所以他们的效率都比较低。

    CopyOnWriteArrayList是在JDK1.5中加入的一个线程安全的ArrayList,有些文章会把他当做Vector的替代者,但我认为不能那么简单的划等号,为什么?因为CopyOnWriteArrayList对应用场景有比较高的要求,在读多写少的环境下,他运行效率非常高,而且能保证线程安全,但如果在写多读少的情况下,CopyOnWriteArrayList的性能会随着线程数的增加急剧下降。至于为什么会有这种情况,看看下面即将讲到的CopyOnWriteArrayList源码就知道了。

    2.2.1 CopyOnWriteArrayList.add()方法

    源码如下所示:

    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lock();
        try {
            //先拿到内部数组的引用
            Object[] elements = getArray();
            //数组长度,也是元素个数
            int len = elements.length;
            //将原数组拷贝到一个新的数组中,新的数组长度是len+1,返回新数组的引用
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            //将值插入到数组尾部
            newElements[len] = e;
            //将新数组的引用赋值给实例字段array(array是内部数组的引用)
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }
    

    可以说是非常简单了,关键地方就是数组复制。为什么要耗费资源去做一次复制呢?注意到这里的插入操作是在复制之后的新数组上进行的,如果此时其他线程正在遍历CopyOnWriteArrayList的元素,也不会受到影响(虽然读到的是旧数据),写线程在最后快完事的时候才修改引用,将新数组赋值给内部的数组引用array,这实际上就是一种“读写分离”的思想。

    在非线程安全的ArrayList中,如果一个线程正在遍历,另一个线程有对List进行修改的操作时,可能会发生java.util.ConcurrentModificationException异常,但并不总是发生,这取决于线程的执行顺序。而CopyOnWriteArrayList的“读写分离”就不会发生该异常,最多只是可能会读取到旧数据而已。虽然是旧数据,但仔细想想,这好像挺合理的,因为我打算在某个时刻遍历List里的元素,其实就是想知道该时刻List里有哪些东西,至于从时刻开始到遍历完毕的这段时间内,元素是否有修改,我并不关心,不是吗?

    下面来看看CopyOnWriteArrayList的迭代器。(get()等方法我就不在这介绍了,实在是没什么新东西,但还是建议没有接触过的朋友仔细看看)

    2.2.2 COWIterator

    我个人认为CopyOnWriteArrayList的迭代器实现还是比较有特色的,和传统的迭代器实现有些不一样。下面是其源码:

    static final class COWIterator<E> implements ListIterator<E> {
           //数组快照,即创建该迭代器实例的时刻的数组,之后发生的变化不会反映到该快照上
            private final Object[] snapshot;
            //游标,用来指示当前迭代器指针的所在位置
            private int cursor;
    
            private COWIterator(Object[] elements, int initialCursor) {
                cursor = initialCursor;
                snapshot = elements;
            }
    
            /**下面就都是迭代器的常规方法了**/
        
            public boolean hasNext() {
                return cursor < snapshot.length;
            }
    
            public boolean hasPrevious() {
                return cursor > 0;
            }
    
            @SuppressWarnings("unchecked")
            public E next() {
                if (! hasNext())
                    throw new NoSuchElementException();
                return (E) snapshot[cursor++];
            }
    
            @SuppressWarnings("unchecked")
            public E previous() {
                if (! hasPrevious())
                    throw new NoSuchElementException();
                return (E) snapshot[--cursor];
            }
    
            public int nextIndex() {
                return cursor;
            }
    
            public int previousIndex() {
                return cursor-1;
            }
    
            public void remove() {
                throw new UnsupportedOperationException();
            }
        
            public void set(E e) {
                throw new UnsupportedOperationException();
            }
    
            public void add(E e) {
                throw new UnsupportedOperationException();
            }
    
            @Override
            public void forEachRemaining(Consumer<? super E> action) {
                Objects.requireNonNull(action);
                Object[] elements = snapshot;
                final int size = elements.length;
                for (int i = cursor; i < size; i++) {
                    @SuppressWarnings("unchecked") E e = (E) elements[i];
                    action.accept(e);
                }
                cursor = size;
            }
        }
    

    该迭代器的特点就在于不是简单的直接使用外部类的数组引用,而是保存一个外部类数组的“快照”,这样在使用迭代器进行遍历的时候,就不需要担心并发修改的问题了,因为此时迭代器遍历的说白了就是开始遍历时刻内部数据的一份拷贝,而不是对原数据进行遍历。虽然数据可能是“旧数据”,但也无伤大雅,不是吗?(上面我已经解释过为什么会“无伤大雅”)

    顺便说一下,虽然现在的Java代码很少直接使用迭代器了,但实际上for-each的遍历方法仍然是在使用迭代器,for-each语法只不过是语法糖而已。

    2.3 BlockingQueue

    BlckingQueue是一个接口,表示阻塞队列,JDK里包含了几个BlockingQueue的实现,例如ArrayBlockingQueue,LinkedBlockingQueue,PriorityBlockingQueue,SynchronousQueue等。本文主要介绍的是ArrayBlockingQueue实现,其他的实现各位可以自行查看相关源码学习。

    BlckingQueue是一种非常有用的同步容器,在很多地方都有应用,例如之前讲到的线程池,某些消息中间件等等。其主要的操作有如下几种:

    i5opQK.png

    这几个方法的作用以及特点,我已经在Java并发编程(五):生产者和消费者一文中有过介绍,在这里不再赘述。

    下面是ArrayBlockingQueue实现的put()方法源码:

    public void put(E e) throws InterruptedException {
        //检查e是否为null,为null就抛出异常
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lockInterruptibly();
        try {
            //如果count值和数组长度相等,就表示队列已经满了,线程进入阻塞状态
            while (count == items.length)
                //notFull是一个Condition类型的变量,用来表示“未满”这个状态
                notFull.await();
            //将值放入队列中
            enqueue(e);
        } finally {
            //解锁
            lock.unlock();
        }
    }
    
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
       
        final Object[] items = this.items;
        //putIndex是用来指示插入的索引
        items[putIndex] = x;
        //如果putIndex等于数组长度,将putIndex置位0(putIndex是一个循环的索引)
        if (++putIndex == items.length)
            putIndex = 0;
        //数量+1
        count++;
        //notEmpty也是一个Condition变量,发出信号即唤醒因为该Condition而陷入阻塞的线程
        //语义就是:现在添加了新的元素,队列肯定是不空的,所以唤醒那些因为队列空导致被阻塞的线程
        notEmpty.signal();
    }
    

    已经在代码中写了比较详细的注释了,不再多说。下面来看看与之匹配的take()方法:

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lockInterruptibly();
        try {
            //如果count == 0,说明队列为空,就让线程进入阻塞状态,注意此时是调用notEmpty.await(),其语义是:此时“非空”条件已经不满足了
            while (count == 0)
                notEmpty.await();
            //出队操作
            return dequeue();
        } finally {
            //解锁
            lock.unlock();
        }
    }
    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        //先拿到值,takeIndex和putIndex是差不多的东西
        E x = (E) items[takeIndex];
        //置位null,是为了防止内存溢出,帮助GC
        items[takeIndex] = null;
        //和putIndex一样,takeIndex也是一个循环索引
        if (++takeIndex == items.length)
            takeIndex = 0;
        //数量-1
        count--;
        //itrs是迭代器
        if (itrs != null)
            itrs.elementDequeued();
        //notFull发出信号,唤醒因为notFull.await()进入阻塞状态线程,因为此时已经满足了“不满”的条件
        notFull.signal();
        return x;
    }
    
    

    其他的方法,例如offer,poll()等就不一一贴出来,都不难理解。希望各位能自行阅读相关源码进行学习。随便说一下,如果有熟悉“生产者消费者模型”的朋友,应该不难看出,ArrayBlockingQueue的put、take实现其实就是在实现“生产者消费者模型”。其实我想说的是,很多知识是有相互联系的,学习的时候相互对照着学习,既弄懂了新知识,又进一步理解了以前学过的知识,这就是“事半功倍”!

    3 并发工具类

    本小节主要介绍三个工具类:CountDownLacth,Semaphore,CyclicBarrier。他们的功能都很强大,而且使用起来还非常简单,但前提是能真正理解这些工具的原理,否则可能会出现一些令人迷惑的问题。(并发相关的问题总是这样,是吧?)

    3.1 Semaphore

    Semaphore即信号量,信号量内部有一个计数器,表示许可数,当一个线程调用Semaphore.acquire()方法时,该许可数会-1,与之对应的方法是Semaphore.release(),调用该方法会使得计数器+1。我们可以这样理解,Semaphore就像是一个公司保安,门卫(就字面意思,没有其他意思),手里持有一定数量的通行证,当有人要进公司且符合公司相关规定的时候,就给那人一个通行证,表示允许通过,此时保安手里的通行证自然就减少了1张,如果通行证已经被消耗完了,那么保安就会人挡在门外,直到有人把通行证归还,保安可以再次把通行证给之前被挡在门外的人(至于如何给,这就得看具体的执行策略了)。

    下面是一个使用Semaphore的示例:

    public class Main {
    
        public static void main(String[] args) {
            Semaphore semaphore = new Semaphore(5);
            Random random = new Random();
    
            for (int i = 0; i < 10; i++) {
                new Thread(() -> {
                    try {
                        System.out.println(Thread.currentThread() + " 想要进公司!");
                        semaphore.acquire();
                        System.out.println(Thread.currentThread() + " 进公司了!");
                        Thread.sleep(random.nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println(Thread.currentThread() + " 归还通行证");
                        semaphore.release();
                    }
                }).start();
            }
        }
    }
    

    输出大致如下所示:

    Thread[Thread-0,5,main] 想要进公司!
    Thread[Thread-0,5,main] 进公司了!
    Thread[Thread-1,5,main] 想要进公司!
    Thread[Thread-1,5,main] 进公司了!
    Thread[Thread-2,5,main] 想要进公司!
    Thread[Thread-2,5,main] 进公司了!
    Thread[Thread-3,5,main] 想要进公司!
    Thread[Thread-3,5,main] 进公司了!
    Thread[Thread-4,5,main] 想要进公司!
    Thread[Thread-4,5,main] 进公司了!
    Thread[Thread-5,5,main] 想要进公司!
    Thread[Thread-6,5,main] 想要进公司!
    Thread[Thread-7,5,main] 想要进公司!
    Thread[Thread-8,5,main] 想要进公司!
    Thread[Thread-9,5,main] 想要进公司!
    Thread[Thread-2,5,main] 归还通行证
    Thread[Thread-5,5,main] 进公司了!
    Thread[Thread-5,5,main] 归还通行证
    Thread[Thread-6,5,main] 进公司了!
    Thread[Thread-4,5,main] 归还通行证
    Thread[Thread-7,5,main] 进公司了!
    Thread[Thread-6,5,main] 归还通行证
    Thread[Thread-8,5,main] 进公司了!
    Thread[Thread-7,5,main] 归还通行证
    Thread[Thread-9,5,main] 进公司了!
    Thread[Thread-8,5,main] 归还通行证
    Thread[Thread-3,5,main] 归还通行证
    Thread[Thread-0,5,main] 归还通行证
    Thread[Thread-1,5,main] 归还通行证
    Thread[Thread-9,5,main] 归还通行证
    

    发现确实和之前叙述的逻辑基本一致,现在来看看Semaphore的几个重要方法的源码。

    3.1.1 Semaphore的构造函数

    //permits即表示许可证的数量,sync是一个AQS实现类的实例,下面会看到
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    
    //fair为true,表示是公平模式,否则表示非公平模式
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
    

    构造函数比较简单了,就是初始化通行证的数量以及是否要使用公平模式,在内部就是初始化AQS的实例sync。这里仍然先不讨论这个sync究竟是何方神圣,我将会在下篇文章细细道来。

    3.1.2 Semaphore.acquire()方法

    其源码如下所示:

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    

    其实就是委托给AQS来实现,这里我就直接说其功能了,如果当前的通行证数量大于0,那么就让通行证数量减1,如果通行证数量小于等于0,那就让线程进入阻塞状态。(即保安把人挡在门外)

    3.1.3 Semaphore.release()方法

    其源码如下所示:

    public void release() {
        sync.releaseShared(1);
    }
    

    还是委托给AQS来实现,该方法是与acquire()配套的方法,调用该方法,会把通行证数量加1(即归还通行证),然后会唤醒之前被阻塞的线程(如果有的话)。

    那Semaphore有什么用呢?我们的示例就是一个使用场景,如果你要限制某时刻线程的数量,就可以使用Semaphore,就相等于控制了并发量,减轻服务端处理压力,不过对吞吐量以及响应性会有影响。

    3.2 CountDownLacth

    Latch的意思是“闭锁”(说实话,我一直觉得这个翻译很别扭)。有两个重要的方法:await()和countDown()。CountDownLacth内部有一个计数器,当调用await()方法时,如果该计数器的值不为0,那么调用该方法的线程会被阻塞,当调用countDown()时,会将计数器的值减1,当计数器的值为0的时候,会唤醒之前被阻塞的所有线程。

    还是用人员进入公司为例子来介绍其逻辑。CountDownLacth就像公司的大门一样,公司早上9点开门,来得早的人就只能在外面等着,无论多少人都得等着,直到9点之后内部人员把门打开,等着的人就会一窝蜂的进入公司。下面是一个该例子的示例代码:

    public class CountDownLatchTest {
    
        public static void main(String[] args) throws InterruptedException {
            CountDownLatch gate = new CountDownLatch(1);
            Random random = new Random();
    
            for (int i = 0; i < 5; i++) {
                new Thread(() -> {
                    try {
                        System.out.println(Thread.currentThread() + " 被阻挡在公司门外!");
                        gate.await();
                        System.out.println(Thread.currentThread() + " 进入公司!");
                        Thread.sleep(random.nextInt(1000));
                        System.out.println(Thread.currentThread() + " 离开公司!");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
            Thread.sleep(100);
            gate.countDown();
        }
    }
    

    输出我大致如下所示:

    Thread[Thread-1,5,main] 被阻挡在公司门外!
    Thread[Thread-0,5,main] 被阻挡在公司门外!
    Thread[Thread-3,5,main] 被阻挡在公司门外!
    Thread[Thread-2,5,main] 被阻挡在公司门外!
    Thread[Thread-4,5,main] 被阻挡在公司门外!
    Thread[main,5,main] 内部人员打开门
    Thread[Thread-1,5,main] 进入公司!
    Thread[Thread-2,5,main] 进入公司!
    Thread[Thread-3,5,main] 进入公司!
    Thread[Thread-0,5,main] 进入公司!
    Thread[Thread-4,5,main] 进入公司!
    Thread[Thread-0,5,main] 离开公司!
    Thread[Thread-1,5,main] 离开公司!
    Thread[Thread-3,5,main] 离开公司!
    Thread[Thread-4,5,main] 离开公司!
    Thread[Thread-2,5,main] 离开公司!
    

    接下来看看几个重要方法源码。

    3.2.1 CountDownLacth的构造函数

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
    

    count值初始化的时候不能小于0,否则会直接抛出异常,之后用count去初始化AQS实现类的实例,CountDownLacth也是委托AQS去实现同步语义的。

    3.2.2 CountDownLatch.await()方法

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    

    发现了吗?和Semaphore一模一样,那为什么CountDownLacth和Semaphore的功能不一样呢?这里简单说一下吧(不如实在是难以解答这个问题),其实AQS采用了“模板方法模式”,这个acquireSharedInterruptibly()会使用到tryAcquireShared()方法,会已该方法的返回值作为依据来决定线程应该是被阻塞还是不阻塞。所以这两个类之所以功能或者行为不一样,就是因为他们内部的AQS实现类重写了tryAcquireShared()方法并且实现逻辑不同。

    下篇文章我会详细讲到AQS,到时候会更加的理解这段描述。

    3.2.3 CountDownLatch.countDown()方法

    public void countDown() {
        sync.releaseShared(1);
    }
    

    仍然和Semaphore的release()方法一样!原因我已经在上面讲过了,不再赘述。(说到这,我真的不得不佩服搞出AQS这个框架的人,实在是太NB了!)

    3.3 CyclicBarrier

    Barrier即栅栏,Barrier类似于Latch,他们的关键区别就是:Latch用于等待某个事件,而Barrier用于等待其他线程。这是《Java并发编程实战》里的给出的,但我认为作者应该只是为了更好的区分他们,为什么呢?因为等待其他线程其实也可以认为是一个事件,所以我个人更加倾向于Barrier的一个子集(个人想法而已,不代表权威)。

    除此之外,两者还有一个区别不经常被人提到(这里特指CyclicBarrier和CountDownLatch的实现):CountDownLatch在结束之后不会自动“重启”,即内部计数器为0之后就无法再次使用同一个lacth来阻挡线程了。而CyclicBarrier具有这个功能,在内部计数器为0之后,会调用重启方法,将计数器再次修改成初始值(待会儿会看到源码)。

    下面是一个CyclicBarrier的示例(代码随手写的,不太符合规范):

    public class Main {
    
        public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
            CyclicBarrier barrier = new CyclicBarrier(5, () -> {
                System.out.println("主线程来了,开始吧");
            }); 
            for (int i = 0; i < 4; i++) {
                new Thread(() -> {
                    System.out.println(Thread.currentThread() + " 到达了!!等待主线程");
                    try {
                        barrier.await();
                        System.out.println(Thread.currentThread() + " 开始运行.....");
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
            Thread.sleep(10);
            System.out.println("主线程先休息一下.....");
            Thread.sleep(1000);
            barrier.await();
            Thread.sleep(500);
    
    
            System.out.println("---------第二次开始------------");
            for (int i = 0; i < 4; i++) {
                new Thread(() -> {
                    System.out.println(Thread.currentThread() + " 第二次到达了!!等待主线程");
                    try {
                        barrier.await();
                        System.out.println(Thread.currentThread() + " 开始运行.....");
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
            Thread.sleep(10);
            System.out.println("主线程在休息一下.....");
            Thread.sleep(1000);
            barrier.await();
    
        }
    }
    
    

    输出类似如下所示:

    Thread[Thread-1,5,main] 到达了!!等待主线程
    Thread[Thread-3,5,main] 到达了!!等待主线程
    Thread[Thread-2,5,main] 到达了!!等待主线程
    Thread[Thread-0,5,main] 到达了!!等待主线程
    主线程先休息一下.....
    主线程来了,开始吧
    Thread[Thread-1,5,main] 开始运行.....
    Thread[Thread-0,5,main] 开始运行.....
    Thread[Thread-3,5,main] 开始运行.....
    Thread[Thread-2,5,main] 开始运行.....
    ---------第二次开始------------
    Thread[Thread-4,5,main] 第二次到达了!!等待主线程
    Thread[Thread-5,5,main] 第二次到达了!!等待主线程
    Thread[Thread-6,5,main] 第二次到达了!!等待主线程
    Thread[Thread-7,5,main] 第二次到达了!!等待主线程
    主线程在休息一下.....
    主线程来了,开始吧
    Thread[Thread-4,5,main] 开始运行.....
    Thread[Thread-7,5,main] 开始运行.....
    Thread[Thread-6,5,main] 开始运行.....
    Thread[Thread-5,5,main] 开始运行.....
    

    3.3.1 CyclicBarrier的构造函数

    CyclicBarrier有两个构造函数:

    //parties即线程的总数量(作者用parties这个变量名有点意思啊)
    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    
    //barrierAction是一个Runnable变量,当最后一个线程到达的时候,该线程会执行该Runnable的run()方法
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
    

    3.3.2 CyclicBarrier.await()方法

    其源码如下所示:

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            //调用dowait()方法
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
    
    //第一个参数表示是否是定时模式,第二参数是纳秒数(仅在定时模式下有意义)
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        //加锁
        lock.lock();
        try {
            //Generation是一个内部类,该内部类只有一个字段broken
            //该类型主要用来表示的一次迭代过程,下一次迭代会生成另一个实例
            final Generation g = generation;
    
            //如果broken为false,说明被打断了,直接抛出一次
            if (g.broken)
                throw new BrokenBarrierException();
    
            //该方法可以响应中断
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
    
            //获取当前的count值,此时index就表示“还有几个线程没到”
            int index = --count;
            //当index为0的时候,表示此时该线程是最后一个线程了
            if (index == 0) {  // tripped
                //指示“运行”动作是否已经进行
                boolean ranAction = false;
                try {
                    //即构造函数中传入的那个Runnable
                    final Runnable command = barrierCommand;
                    //如果Runnable不为null,就允许其run方法
                    if (command != null)
                        command.run();
                    //设置ranAction为true
                    ranAction = true;
                    //重新设置count值等参数
                    nextGeneration();
                    //直接返回
                    return 0;
                } finally {
                    //如果由于某种原因,导致没有正确运行,那么就抛出异常(breakBarrier()方法会抛出异常)
                    if (!ranAction)
                        breakBarrier();
                }
            }
    
            // loop until tripped, broken, interrupted, or timed out
            //这里是一个无限循环,如果之前index不等0,线程就会执行到这里,而且线程如果在下面的逻辑中被阻塞,是不会执行到上面的逻辑的,即不会触发那个Runnable以及“重新设置count等参数”。只是从阻塞状态中醒来而已。
            for (;;) {
                try {
                    //是否是定时模式,如果不是
                    if (!timed)
                        //线程进入阻塞状态,trip是一个Condition变量
                        trip.await();
                    //如果是定时模式,且设置了超时纳秒数,那么就进入一个有超时时间的阻塞状态
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
    
                //下面都是一些检查了
                
                if (g.broken)
                    throw new BrokenBarrierException();
               //什么时候会有g != generation呢?当调用nextGeneration()时,会重新实例化一个新的Generation对象,此时就表示最后一个线程已经到了,所以被阻塞的线程可以返回了。
                if (g != generation)
                    return index;
    
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            //最后解锁
            lock.unlock();
        }
    }
    
    //最后一个线程到达的时候会调用该方法
    private void nextGeneration() {
        //唤醒所有被阻塞的线程
        trip.signalAll();
        //重新设置count值
        count = parties;
        //生成新的Generation对象实例
        generation = new Generation();
    }
    

    注释已经写得非常清楚了,实在不理解就多看几遍,然后写一个测试用例,DEBUG调试一下,应该就能理解了。

    可能读者会有一个问题,为什么CyclicBarrier不和CountDownLatch一样委托给AQS来实现呢?说实话,没在哪里看到过有解释这个问题的(可能还是读书太少了),也不太清楚这个问题的答案。。。。

    4 小结

    本文详细介绍了几个常用的并发容器和并发工具类,熟练的使用它们是编写高效并发程序的基础,避免“重复造轮子”。这些个容器和工具,各有各的特点以及适合的应用场景,理解它们的原理,可以更加灵活的、合理的使用它们,避免发生误用反而导致性能急剧下降的问题。本文仍然提到了AQS这个东西,这个东西其实算是比较高级的了,也比较底层,很多工具类都依赖这个东西,在下篇文章,我会详细的介绍AQS,并且使用AQS写出自己的Semaphore实现。

    5 参考资料

    《Java并发编程实战》

    《Java进阶(六)从ConcurrentHashMap的演进看Java多线程核心技术》

    相关文章

      网友评论

        本文标题:Java并发编程(六):并发容器和工具类

        本文链接:https://www.haomeiwen.com/subject/rnkkhqtx.html