美文网首页
Java并发容器

Java并发容器

作者: luoxn28 | 来源:发表于2017-06-20 20:09 被阅读0次

    Java程序员进行并发编程时,相比于其他语言的程序员而言要倍感幸福,因为并发编程大师Doug Lea不遗余力地为Java开发者提供了非常多的并发容器和框架。如果使用C语言进行开发的话,首先需要自己定义好底层数据结构(也可以说是容器,比如链表/哈希/树/...),然后结合使用mutex/cond等底层并发API来构建并发安全的容器;如果使用C++的话可以使用STL,但是STL中容器并未对并发安全有很好的支持。

    1 ConcurrentHashMap

    在并发编程中使用HashMap可能导致程序死循环。而使用线程安全的HashTable效率又非常低下,基于以上两个原因,便有了ConcurrentHashMap的登场机会,ConcurrentHashMap是并发安全且高效的HashMap。

    (1) 线程不安全的HashMap
    在多线程环境下,使用HashMap进行put操作会引起死循环,导致CPU利用率接近100%,所以在并发情况下不能使用HashMap。

    public static void main(String[] args) {
        final HashMap<String, String> map = new HashMap<String, String>(2);
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10000; i++) {
                    new Thread(new Runnable() {
                        @Override
                        public void run() {
                            map.put(UUID.randomUUID().toString(), "");
                        }
                    }, "ftf" + i).start();
                }
            }
        }, "ftf");
        t.start();
        t.join();
    }
    

    执行以上代码就会导致死循环,HashMap在并发执行put操作时会引起死循环(具体来说就是put操作中rehash时可能导致链表形成循环结构),是因为多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。

    (2) 效率低下的HashTable
    HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下。因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同步方法时,会进入阻塞或轮询状态。

    (3) ConcurrentHashMap的锁分段技术提高并发访问效率
    HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的线程都必须竞争同一把锁,假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术。首先将数据分成一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。(注意:JDK1.8中舍弃了分段锁技术,改用CAS+Synchronized机制。以下ConcurrentHashMap讲解以JDK1.8以例)

    ConcurrentHashMap中一个重要的类就是Node,该类存储键值对,所有插入ConcurrentHashMap的数据都包装在这里面。它与HashMap中的定义很相似,但是但是有一些差别它对value和next属性设置了volatile同步锁,它不允许调用setValue方法直接改变Node的value域,它增加了find方法辅助map.get()方法。可在get方法返回的结果中更改对应的value值。

    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
    
        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
    
        public final K getKey()       { return key; }
        public final V getValue()     { return val; }
        public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
        public final String toString(){ return key + "=" + val; }
        public final V setValue(V value) {
            throw new UnsupportedOperationException();
        }
        ...
    }
    

    ConcurrentHashMap定义了三个原子操作,用于对指定位置的节点进行操作。正是这些原子操作保证了ConcurrentHashMap的线程安全。

    @SuppressWarnings("unchecked")  
    //获得在i位置上的Node节点  
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {  
       return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);  
    }  
    //利用CAS算法设置i位置上的Node节点。之所以能实现并发是因为他指定了原来这个节点的值是多少  
    //在CAS算法中,会比较内存中的值与你指定的这个值是否相等,如果相等才接受你的修改,否则拒绝你的修改  
    //因此当前线程中的值并不是最新的值,这种修改可能会覆盖掉其他线程的修改结果  有点类似于SVN  
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,  
                                       Node<K,V> c, Node<K,V> v) {  
       return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);  
    }  
    //利用volatile方法设置节点位置的值  
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {  
       U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);  
    }  
    

    put方法
    ConcurrentHashMap最常用的方法就是put和get方法了,put方法依然沿用了HashMap的put方法的思想,据hash值计算这个新插入的点在table中的位置i,如果i位置是空的,直接放进去,否则进行判断,如果i位置是树节点,按照树的方式插入新的节点,否则把i插入到链表的末尾。ConcurrentHashMap中依然沿用这个思想,有一个最重要的不同点就是ConcurrentHashMap不允许key或value为null值。另外由于涉及到多线程,put方法就要复杂一点。在多线程中可能有以下两种情况:

    • 如果一个或多个线程正在对ConcurrentHashMap进行扩容操作,当前线程也要进入扩容的操作中。这个扩容的操作之所以能被检测到,是因为transfer方法中在空结点上插入forward节点,如果检测到需要插入的位置被forward节点占有,就帮助进行扩容;
    • 如果检测到要插入的节点是非空且不是forward节点,就对这个节点加锁,这样就保证了线程安全。尽管这个有一些影响效率,但是还是会比hashTable的synchronized要好得多。

    get方法
    get方法比较简单,给定一个key来确定value的时候,必须满足两个条件 key相同 hash值相同,对于节点可能在链表或树上的情况,需要分别去查找。get方法是不要加锁的,因为Node节点中value数据类型时volatile的,保证了内存可见性。

    2 ConcurrentLinkedQueue

    线程安全的队列有2种实现方式,一种是非阻塞的,使用CAS算法实现;另一种是阻塞的,使用锁机制来实现。ConcurrentLinkedQueue是一种非阻塞型基于链接节点的无界线程安全队列,它采用先进先出的规
    则对节点进行排序。

    ConcurrentLinkedQueue结构类图:



    ConcurrentLinkedQueue由head节点和tail节点组成,每个节点(Node)由节点元素(item)和指向下一个节点(next)的引用组成,节点与节点之间就是通过这个next关联起来,从而组成一张链表结构的队列。默认情况下head节点存储的元素为空,tail节点等于head节点。

    入队列
    入队列就是将如对队节点添加到队列尾部,入队列主要做两件事情:多线程同时入队列操作时,使用CAS来保证操作。如果有一个线程正在入队,那么它必须先获取尾节点,然后设置尾节点的下一个节点为入队节点,但这时可能有另外一个线程插队了,那么队列的尾节点就会发生变化,这时当前线程要暂停入队操作,然后重新获取尾节点。


    从源代码角度来看,整个入队过程确实做两件事情:第一是定位出尾节点;第二是使用CAS算法将入队节点设置成尾节点的next节点,如不成功则重试。

    定位尾节点
    tail节点并不总是尾节点,所以每次入队都必须先通过tail节点来找到尾节点。尾节点可能是tail节点,也可能是tail节点的next节点。代码中循环体中的第一个if就是判断tail是否有next节点,有则表示next节点可能是尾节点。获取tail节点的next节点需要注意的是p节点等于p的next节点的情况,只有一种可能就是p节点和p的next节点都等于空,表示这个队列刚初始化,正准备添加节点,所以需要返回head节点。获取p节点的next节点代码如下。

    出队列
    出队列的就是从队列里返回一个节点元素,并清空该节点对元素的引用。与入队列类似,并不是每次出队时都更新head节点,当head节点里有元素时,直接弹出head节点里的元素,而不会更新head节点。只有当head节点里没有元素时,出队操作才会更新head节点。这种做法也是通过hops变量来减少使用CAS更新head节点的消耗,从而提高出队效率。


    首先获取头节点的元素,然后判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,如果不为空,则使用CAS的方式将头节点的引用设置成null,如果CAS成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取头节点。

    3 ConcurrentBlockingQueue

    阻塞队列就是支持阻塞插入和删除的一个队列。
    支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。
    支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。

    阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。
    在阻塞队列不可用时(添加时队列已满,移除时队列为空),这两个附加操作提供了4种处理方式:



    注意:如果是无界阻塞队列,队列不可能会出现满的情况,所以使用put或offer方法永远不会被阻塞,而且使用offer方法时,该方法永远返回true。

    Java中的阻塞队列
    JDK7提供了以下7种队列:

    • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
    • LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列,默认最大长度为Integer.MAX_VALUE。
    • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列,注意不能保证同优先级的相对顺序。
    • DelayQueue:一个使用优先级队列实现的无界阻塞队列。
    • SynchronousQueue:一个不存储元素的阻塞队列。
    • LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
    • LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

    ArrayBlockingQueue
    一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。
    默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。为了保证公平性,通常会降低吞吐量。

    DelayQueue
    DelayQueue是一个支持延时获取元素的无界阻塞队列。队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
    DelayQueue非常有用,可以将DelayQueue运用在以下应用场景。
    ● ·缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。
    ● ·定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,比如TimerQueue就是使用DelayQueue实现的。

    参考资料:
    1《Java并发编程的艺术》
    2 JAVA HASHMAP的死循环 - 酷壳
    3 ConcurrentHashMap源码分析(JDK8版本)

    相关文章

      网友评论

          本文标题:Java并发容器

          本文链接:https://www.haomeiwen.com/subject/ljgnqxtx.html