美文网首页
ConcurrentHashMap

ConcurrentHashMap

作者: 刘一一同学 | 来源:发表于2019-07-15 10:33 被阅读0次

    1. JDK1.7实现

    1.1 概述

    在多线程环境下,使用HashMap进行put操作时可能会引起死循环,导致CPU利用率飙升,另一方面也无法解决数据同步问题,因此在并发环境下不适合使用HashMap。为了解决数据同步以及并发问题,使用 HashTablesynchronizedMap的确能解决上述问题。但是,这两种方式基本都是对整个哈希表进行加锁,这在并发环境下,严重影响系统性能。因此,引入ConcurrentHashMap之后,既能解决数据同步问题,也不影响系统性能。

    1.2 实现原理

    ConcurrentHashMap是由SegmentHashEntry数组结构组成,和HashMap一样,仍然是数组+链表组成。其中Segment使用了可重入锁ReentrantLockHashEntry则用于存储键值对数据。

    ConcurrentHashMap采用了分段锁技术,分段锁是将数据按段存储,然后给每段数据配一把锁。当一个线程访问其中一个段数据的时候,首先对这段数据进行加锁,其他段的数据也能被其他线程访问。但是,有些方法需要跨段访问,比如size()containsValue(),它们可能需要按顺序锁定整个表的数据段,操作完毕后,又按顺序释放所有段的锁。

    1.2.1 get()

    get()不需要加锁,只需要将key通过hash之后定位到具体的Segment,再通过一次hash定位到具体的元素上。由于HashEntry中的value属性是用volatile关键词修饰的,保证了内存可见性,所以每次获取时都是最新值。

    1.2.2 put()

    put()需要加锁,虽然HashEntry中的value属性是用volatile关键词修饰的,但是并不能保证并发的原子性,所以put()操作需要加锁处理。首先也是通过将key通过hash之后定位到具体的Segment,再通过一次hash定位到具体的元素上,在put()之前会进行一次扩容检查,之后再做插入操作。而HashMap是在插入元素之后才校验是否需要扩容,有可能扩容之后后续就不再有元素插入,这样就浪费了本次扩容(扩容非常消耗性能)。

    1.2.3 size()

    每个Segment都有一个volatile修饰的全部变量count,求整个ConcurrentMapsize时,很明显就是将所有的count累加即可。但是volatile修饰的变量却不能保证多线程的原子性,所以直接累加很容易出现并发问题。如果每次调用size()方法时,将其余的修改操作加锁,效率也很低。所以最终的做法是,先尝试两次将count累加,如果容器的count发生了变化再加锁统计size

    ConcurrentHashMap统计时如何知道count大小发生了变化呢?每个Segment 都有一个modCount变量,每当进行一次put和remove操作,modCount将会+1。只要modCount发生了变化就认为容器的大小也在发生变化

    2. JDK1.8实现

    2.1 概述

    JDK1.8中的ConcurrentHashMap优化如下:

    • 直接用Node数组+链表+红黑树的数据结构来实现。
    • HashEntry改为Node,用来保存key、value以及key的hash值。
    • 抛弃原有的Segment分段锁,采用CAS+synchronized 对每一行数据进行加锁,保证并发安全性。
    • valnext都使用了volatile修饰,目的是在多线程环境下线程A修改结点的val或者新增节点的时候对线程B是可见的。

    2.2 实现原理

    2.2.1 get()

    • 根据key计算hashcode,定位到该table索引位置,如果查询的key正好在首结点Node上,那么直接返回值。
    • 如果是红黑树,那就按照树的方式获取值。
    • 如果都不满足,那就按照链表的方式遍历获取值。
    public V get(Object key) {  
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;  
        // 计算两次hashcode  
        int h = spread(key.hashCode()); 
        if ((tab = table) != null && (n = tab.length) > 0 &&  
            // 读取首节点的Node元素
            (e = tabAt(tab, (n - 1) & h)) != null) {  
            if ((eh = e.hash) == h) { // 如果该节点就是首节点就返回  
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))  
                    return e.val;  
            }  
            // hash值为负值表示正在扩容,这个时候查的是ForwardingNode的find方法来定位到nextTable来  
            // 查找,查找到就返回  
            else if (eh < 0)  
                return (p = e.find(h, key)) != null ? p.val : null;  
            // 既不是首节点也不是ForwardingNode,那就往下遍历  
            while ((e = e.next) != null) {
                if (e.hash == h &&  
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))  
                    return e.val;  
            }  
        }  
        return null;  
    }  
    

    2.2.2 put()过程

    • 元素插入前检查是否需要进行初始化,如果没有初始化就先调用initTable()初始化。
    • 根据key计算hashcode,定位到对应的Node。如果Node为空,利用CAS尝试写入数据,失败则自旋保证成功。
    • 如果当前位置的hashcode == MOVED == -1,则需要进行扩容。
    • 如果都不满足,则利用synchronized锁写入数据。
    • 如果链表节点数量大于或等于TREEIFY_THRESHOLD(默认值为8)则要转换为红黑树。
    • 如果添加成功就调用addCount()统计 size,并且检查是否需要扩容。

    TREEIFY_THRESHOLD 用于判断是否需要将链表转换为红黑树的阈值。

    public V put(K key, V value) {  
        return putVal(key, value, false);  
    }  
    /** Implementation for put and putIfAbsent */  
    final V putVal(K key, V value, boolean onlyIfAbsent) {  
        if (key == null || value == null) throw new NullPointerException();  
        // 两次hash,减少hash冲突,可以均匀分布  
        int hash = spread(key.hashCode()); 
        int binCount = 0;  
        for (Node<K,V>[] tab = table;;) { // 对这个table进行迭代  
            Node<K,V> f; int n, i, fh;  
            // 这里就是上面构造方法没有进行初始化,在这里进行判断,为null就调用initTable进行初始化,属于懒汉模式初始化  
            if (tab == null || (n = tab.length) == 0)  
                tab = initTable();  
            // 如果i位置没有数据,就直接无锁插入  
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,  
                             new Node<K,V>(hash, key, value, null)))  
                    break;                   // no lock when adding to empty bin  
            }  
            // 如果在进行扩容,则先进行扩容操作
            else if ((fh = f.hash) == MOVED)  
                tab = helpTransfer(tab, f);  
            else {  
                V oldVal = null;  
                // 如果以上条件都不满足,那就要进行加锁操作,也就是存在hash冲突,锁住链表或者红黑树的头结点  
                synchronized (f) {  
                    if (tabAt(tab, i) == f) {  
                        // 表示该节点是链表结构
                        if (fh >= 0) {   
                            binCount = 1;  
                            for (Node<K,V> e = f;; ++binCount) {  
                                K ek;  
                                // 这里涉及到相同的key进行put就会覆盖原先的value  
                                if (e.hash == hash &&  
                                    ((ek = e.key) == key ||  
                                     (ek != null && key.equals(ek)))) {  
                                    oldVal = e.val;  
                                    if (!onlyIfAbsent)  
                                        e.val = value;  
                                    break;  
                                }  
                                Node<K,V> pred = e;  
                                // 插入链表尾部  
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,  
                                                              value, null);  
                                    break;  
                                }  
                            }  
                        }  
                        // 红黑树结构
                        else if (f instanceof TreeBin) {  
                            Node<K,V> p;  
                            binCount = 2;  
                            // 红黑树结构旋转插入  
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,  
                                                           value)) != null) {  
                                oldVal = p.val;  
                                if (!onlyIfAbsent)  
                                    p.val = value;  
                            }  
                        }  
                    }  
                }  
                 // 如果链表的长度大于8时就会进行红黑树的转换
                if (binCount != 0) {  
                    if (binCount >= TREEIFY_THRESHOLD)  
                        treeifyBin(tab, i);  
                    if (oldVal != null)  
                        return oldVal;  
                    break;  
                }  
            }  
        }  
        // 统计size,并且检查是否需要扩容  
        addCount(1L, binCount);
        return null;  
    }  
    

    2.3 代码示例

    public class ConcurrentHashMapDemo {
    
        private static final ConcurrentHashMap map = new ConcurrentHashMap();
        private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(10);
    
        public static void main(String[] args) {
            for (int i=1; i <= 10; i++) {
                EXECUTOR_SERVICE.execute(new MyThread());
            }
        }
        static class MyThread implements Runnable {
            @Override
            public void run() {
                map.put("name", Thread.currentThread().getName());
                map.size();
                System.out.println(map.get("name"));
                map.remove("name");
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:ConcurrentHashMap

          本文链接:https://www.haomeiwen.com/subject/vzojhctx.html