美文网首页杂七杂八
ConcurrentHashMap源码解析(JDK1.8)

ConcurrentHashMap源码解析(JDK1.8)

作者: 激情的狼王 | 来源:发表于2018-02-01 13:01 被阅读35次

    通过HashMap的实现原理可以知道,HashMap在并发情况下的扩容操作,会出现链表造成闭环,导致在get时会出现死循环,因此HashMap是线程不安全的,但是HashMap的实现原理和源码对于学习ConcurrentHashMap有很大的帮助,可以先看看这篇来预热一下HashMap源码解析(JDK1.8),我们先来看下JDK1.7的设计。

    ConcurrentHashMap—JDK1.7

    ConcurrentHashMap在1.7中采用了分段锁的设计如图:

    2184951-af57d9d50ae9f547.png

    一个ConcurrentHashMap由多个segment组成,每一个segment都包含了一个HashEntry数组,这个数组我们可以理解为一个加锁的HashMap, 每一个segment包含了对自己的HashMap的操作,比如get,put,replace等操作,这些操作发生的时候,对自己的HashMap进行锁定。由于每一个segment写操作只锁定自己的HashMap,所以可能存在多个线程同时写的情况,性能无疑好于整个HashMap锁定的情况,分段锁大大的提高了高并发环境下的处理能力。
    对于ConcurrentHashMap的数据插入,这里要进行两次Hash去定位数据的存储位置

    put操作
    static class Segment<K,V> extends ReentrantLock implements Serializable 
    

    Segment实现了ReentrantLock,也就带有锁的功能,当执行put操作时,会进行第一次key的hash来定位Segment的位置,如果该Segment还没有初始化,即通过CAS操作进行赋值,然后进行第二次hash操作,找到相应的HashEntry的位置,这里会利用继承过来的锁的特性通过继承ReentrantLock的tryLock()方法尝试去获取锁,如果获取成功就直接插入相应的位置,如果已经有线程获取该Segment的锁,那当前线程会以自旋的方式去继续的调用tryLock()方法去获取锁,超过指定次数就挂起,等待唤醒。

    get操作

    ConcurrentHashMap的get操作跟HashMap类似,只是ConcurrentHashMap第一次需要经过一次hash定位到Segment的位置,然后再hash定位到指定的HashEntry,遍历该HashEntry下的链表进行对比,成功就返回,不成功就返回null。

    size操作

    计算ConcurrentHashMap的元素大小是一个有趣的问题,因为他是并发操作的,就是在你计算size的时候,他还在并发的插入数据,可能会导致你计算出来的size和你实际的size有相差(在你return size的时候,插入了多个数据),要解决这个问题,JDK1.7版本用两种方案

    try {
        for (;;) {
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // force creation
            }
            sum = 0L;
            size = 0;
            overflow = false;
            for (int j = 0; j < segments.length; ++j) {
                Segment<K,V> seg = segmentAt(segments, j);
                if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0 || (size += c) < 0)
                   overflow = true;
                } }
            if (sum == last) break;
            last = sum; } }
    finally {
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    

    第一种方案他会使用不加锁的模式去尝试多次计算ConcurrentHashMap的size,最多三次,比较前后两次计算的结果,结果一致就认为当前没有元素加入,计算的结果是准确的;
    第二种方案是如果第一种方案不符合,他就会给每个Segment加上锁,然后计算ConcurrentHashMap的size返回。

    ConcurrentHashMap—JDK1.8

    1.8中放弃了一个HashMap被一个Segment封装加上锁的复杂设计,取而代之的是在HashMap的每个Node上增加CAS + Synchronized来保证并发安全进行实现,结构如下:

    2184951-d9933a0302f72d47.png
    重要属性
    transient volatile Node<K,V>[] table;
    
    private transient volatile Node<K,V>[] nextTable;
    
    private transient volatile long baseCount;
    
    private transient volatile int sizeCtl;
    

    table:用来存放Node节点数据的,默认为null,默认大小为16的数组,每次扩容时大小总是2的幂次方;
    nextTable:扩容时新生成的数据,数组为table的两倍;
    Node:节点,保存key-value的数据结构;
    ForwardingNode:一个特殊的Node节点,hash值为-1,其中存储nextTable的引用。只有table发生扩容的时候,ForwardingNode才会发挥作用,作为一个占位符放在table中表示当前节点为null或则已经被移动
    sizeCtl:控制标识符,用来控制table初始化和扩容操作的,在不同的地方有不同的用途,其值也不同,所代表的含义也不同 负数代表正在进行初始化或扩容操作

    -1代表正在初始化
    -N 表示有N-1个线程正在进行扩容操作
    正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小

    初始化: initTable()

    初始化方法initTable()ConcurrentHashMap初始化时不会立即执行,通常是在插入的时候,例如put、merge、compute、computeIfAbsent、computeIfPresent操作时。

    private final Node<K,V>[] initTable() {
            Node<K,V>[] tab; int sc;
            while ((tab = table) == null || tab.length == 0) {
                //sizeCtl < 0 表示有其他线程在初始化,该线程必须挂起
                if ((sc = sizeCtl) < 0)
                    Thread.yield();
                // 如果该线程获取了初始化的权利,则用CAS将sizeCtl设置为-1,表示本线程正在初始化
                else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                        // 进行初始化
                    try {
                        if ((tab = table) == null || tab.length == 0) {
                            int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = tab = nt;
                            // 下次扩容的大小
                            sc = n - (n >>> 2); ///相当于0.75*n 设置一个扩容的阈值  
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                    break;
                }
            }
            return tab;
        }
    

    初始化方法initTable()的关键就在于sizeCtlsizeCtl默认为0,如果ConcurrentHashMap实例化时有传参数,sizeCtl会是一个2的幂次方的值。所以执行第一次put操作的线程会执行Unsafe.compareAndSwapInt方法修改sizeCtl-1有且只有一个线程能够修改成功,其它线程判断sizeCtl的值来执行Thread.yield()让出CPU时间片等待table初始化完成。

    put操作
    public V put(K key, V value) {
            return putVal(key, value, false);
        }
    
        final V putVal(K key, V value, boolean onlyIfAbsent) {
            //key、value均不能为null
            if (key == null || value == null) throw new NullPointerException();
            //计算hash值
            int hash = spread(key.hashCode());
            int binCount = 0;
            for (Node<K,V>[] tab = table;;) {
                Node<K,V> f; int n, i, fh;
                // table为null,进行初始化工作
                if (tab == null || (n = tab.length) == 0)
                    tab = initTable();
                //如果i位置没有节点,则直接插入,不需要加锁
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                    if (casTabAt(tab, i, null,
                            new Node<K,V>(hash, key, value, null)))
                        break;                   // no lock when adding to empty bin
                }
                // 有线程正在进行扩容操作,则先帮助扩容
                else if ((fh = f.hash) == MOVED)
                    tab = helpTransfer(tab, f);
                else {
                    V oldVal = null;
                    //对该节点进行加锁处理(hash值相同的链表的头节点),对性能有点儿影响
                    synchronized (f) {
                        if (tabAt(tab, i) == f) {
                            //fh > 0 表示为链表,将该节点插入到链表尾部
                            if (fh >= 0) {
                                binCount = 1;
                                for (Node<K,V> e = f;; ++binCount) {
                                    K ek;
                                    //hash 和 key 都一样,替换value
                                    if (e.hash == hash &&
                                            ((ek = e.key) == key ||
                                                    (ek != null && key.equals(ek)))) {
                                        oldVal = e.val;
                                        //putIfAbsent()
                                        if (!onlyIfAbsent)
                                            e.val = value;
                                        break;
                                    }
                                    Node<K,V> pred = e;
                                    //链表尾部  直接插入
                                    if ((e = e.next) == null) {
                                        pred.next = new Node<K,V>(hash, key,
                                                value, null);
                                        break;
                                    }
                                }
                            }
                            //树节点,按照树的插入操作进行插入
                            else if (f instanceof TreeBin) {
                                Node<K,V> p;
                                binCount = 2;
                                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                        value)) != null) {
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        p.val = value;
                                }
                            }
                        }
                    }
                    if (binCount != 0) {
                        // 如果链表长度已经达到临界值8 就需要把链表转换为树结构
                        if (binCount >= TREEIFY_THRESHOLD)
                            treeifyBin(tab, i);
                        if (oldVal != null)
                            return oldVal;
                        break;
                    }
                }
            }
    
            //size + 1  
            addCount(1L, binCount);
            return null;
        }
    

    put方法流程如下:

    1.计算hash值,循环table
    2.判断table是否为null,是的话就去初始化
    3.根据hash值获取节点的位置i,若该位置为空,则直接插入,这个过程没有用锁而是用的CAS
    4.如果有线程正在进行扩容操作,则先帮助扩容
    5.最后锁住头结点,进行增加操作,该过程和HashMap的put类似,可以参考文章开头的链接。

    get操作
    public V get(Object key) {
            Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
            // 计算hash
            int h = spread(key.hashCode());
            if ((tab = table) != null && (n = tab.length) > 0 &&
                    (e = tabAt(tab, (n - 1) & h)) != null) {
                // 搜索到的节点key与传入的key相同且不为null,直接返回这个节点
                if ((eh = e.hash) == h) {
                    if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                        return e.val;
                }
                // 树
                else if (eh < 0)
                    return (p = e.find(h, key)) != null ? p.val : null;
                // 链表,遍历
                while ((e = e.next) != null) {
                    if (e.hash == h &&
                            ((ek = e.key) == key || (ek != null && key.equals(ek))))
                        return e.val;
                }
            }
            return null;
        }
    

    get操作的整个逻辑非常清楚:

    1.计算hash值
    2.判断table是否为空,如果为空,直接返回null
    3.根据hash值获取table中的Node节点(tabAt(tab, (n - 1) & h)),然后根据链表或者树形方式找到相对应的节点,返回其value值。

    整个get过程没用锁,用的是CAS.

    扩容操作addCount

    当ConcurrentHashMap中table元素个数达到了容量阈值(sizeCtl)时,则需要进行扩容操作。在put操作时最后一个会调用addCount(long x, int check),该方法主要做两个工作:1.更新baseCount;2.检测是否需要扩容操作。如下:

    private final void addCount(long x, int check) {
            CounterCell[] as; long b, s;
            if ((as = counterCells) != null ||
                !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
                CounterCell a; long v; int m;
                boolean uncontended = true;
                if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                    !(uncontended =
                      U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                    fullAddCount(x, uncontended);
                    return;
                }
                if (check <= 1)
                    return;
                s = sumCount();
            }
            if (check >= 0) {
                Node<K,V>[] tab, nt; int n, sc;
                while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                       (n = tab.length) < MAXIMUM_CAPACITY) {
                    int rs = resizeStamp(n);
                    if (sc < 0) {
                        if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                            sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                            transferIndex <= 0)
                            break;
                        if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                            transfer(tab, nt);
                    }
                    else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                                 (rs << RESIZE_STAMP_SHIFT) + 2))
                        transfer(tab, null);
                    s = sumCount();
                }
            }
        }
    

    transfer()方法为ConcurrentHashMap扩容操作的核心方法。由于ConcurrentHashMap支持多线程扩容,而且也没有进行加锁,所以实现会变得有点儿复杂。整个扩容操作分为两步:

    1.构建一个nextTable,其大小为原来大小的两倍,这个步骤是在单线程环境下完成的
    2.将原来table里面的内容复制到nextTable中,这个步骤是允许多线程操作的,所以性能得到提升,减少了扩容的时间消耗

    size()计算

    在JDK1.8中为了更好地统计size,ConcurrentHashMap提供了baseCount、counterCells两个辅助变量和一个CounterCell辅助内部类。

    @sun.misc.Contended static final class CounterCell {
            volatile long value;
            CounterCell(long x) { value = x; }
    }
    
    //ConcurrentHashMap中元素个数,但返回的不一定是当前Map的真实元素个数。基于CAS无锁更新
    private transient volatile long baseCount;
    
    private transient volatile CounterCell[] counterCells;
    

    size()方法定义如下:

    public int size() {
            long n = sumCount();
            return ((n < 0L) ? 0 :
                    (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                    (int)n);
    }
    final long sumCount() {
            CounterCell[] as = counterCells; CounterCell a;
            long sum = baseCount;
            if (as != null) {
                for (int i = 0; i < as.length; ++i) {
                    //遍历,所有counter求和
                    if ((a = as[i]) != null)
                        sum += a.value;     
                }
            }
            return sum;
    }
    

    通过上述size()逻辑可以知道:size = baseCount + counterCells[0...n-1].value,我们通过增加一个元素的逻辑代码来看这两个变量的含义。

    private final void addCount(long x, int check) {
            CounterCell[] as; long b, s;
            // s = b + x,完成baseCount++操作;
            if ((as = counterCells) != null ||
                !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
                CounterCell a; long v; int m;
                boolean uncontended = true;
                if (as == null || (m = as.length - 1) < 0 ||
                    (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                    !(uncontended =
                      U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                    //  多线程CAS发生失败时执行
                    fullAddCount(x, uncontended);
                    return;
                }
                if (check <= 1)
                    return;
                s = sumCount();
            }
    
            // 检查是否进行扩容
        }
    

    在并发量很高时,如果存在两个线程同时执行CAS修改baseCount值,则失败的线程会继续执行方法体中的逻辑,使用CounterCell记录元素个数的变化;
    如果通过CAS设置cellsBusy字段失败的话,则继续尝试通过CAS修改baseCount字段,如果修改baseCount字段成功的话,就退出循环,否则继续循环插入CounterCell对象;
    所以在1.8中的size实现比1.7简单多,因为元素个数保存baseCount中,部分元素的变化个数保存在CounterCell数组中,实现如下:
    通过累加baseCount和CounterCell数组中的数量,即可得到元素的总个数;

    相关文章

      网友评论

      本文标题:ConcurrentHashMap源码解析(JDK1.8)

      本文链接:https://www.haomeiwen.com/subject/ajtlzxtx.html