美文网首页面试精选
ConcurrentHashMap为什么是线程安全的?对比Has

ConcurrentHashMap为什么是线程安全的?对比Has

作者: 大程子的技术成长路 | 来源:发表于2021-07-11 11:32 被阅读0次

HashMap和ConcurrentHashMap在多线程情况下的对比

我们用一段代码证明下HashMap的线程不安全,以及ConcurrentHashMap的线程安全性。代码逻辑很简单,开启10000个线程,每个线程做很简单的操作,就是put一个key,然后删除一个key,理论上线程安全的情况下,最后map的size()肯定为0。

package com.wangcp.jvmstudy.mapStudy;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class MapTest {
    public static void main(String[] args) {
        // 创建HashMap及ConcurrentHashMap
        Map<Object,Object> map = new HashMap<>();
        ConcurrentHashMap<Object,Object> concurrentHashMap = new ConcurrentHashMap<>();
        // 开启 10000 个线程
        for (int i = 0; i < 10000; i++) {
            new Thread(()->{
                double a = Math.random();
                // HashMap put值后删除
                map.put(a,a);
                map.remove(a);
                // ConcurrentHashMap put值后删除
                concurrentHashMap.put(a,a);
                concurrentHashMap.remove(a);
            }).start();
        }

        System.out.println("普通map剩余:" + map.size());
        System.out.println("concurrentHashMap剩余:" + concurrentHashMap.size());
    }
}

取三次执行结果:

--------------------第一次------------------------
普通map剩余:7
concurrentHashMap剩余:0
--------------------第二次------------------------
普通map剩余:2
concurrentHashMap剩余:0
--------------------第三次------------------------
普通map剩余:4
concurrentHashMap剩余:0

通过以上的三次执行结果,也就证明了ConcurrentHashMap是线程安全的,我们接下来从源码分析下ConcurrentHashMap是如何保证线程安全的,本次源码jdk版本为1.8。

源码分析

下面就是ConcurrentHashMap的基本属性,大部分和HashMap一样,只是增加了部分属性,后面我们来分析增加的部分属性是起到如何的作用的。

ConcurrentHashMap 备注
private static final int MAXIMUM_CAPACITY = 1 << 30; 默认最大的容量
private static final int DEFAULT_CAPACITY = 16; 默认初始化的容量
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; 最大的数组可能长度
private static final int DEFAULT_CONCURRENCY_LEVEL = 16; 默认的并发级别,目前并没有用,只是为了保持兼容性
private static final float LOAD_FACTOR = 0.75f; 负载因子
static final int TREEIFY_THRESHOLD = 8; 链表转换为红黑树的阈值,默认是8
static final int UNTREEIFY_THRESHOLD = 6; 红黑树转换链表的阀值,默认是6
static final int MIN_TREEIFY_CAPACITY = 64; 进行链表转换最少需要的数组长度,如果没有达到这个数字,只能进行扩容
private static final int MIN_TRANSFER_STRIDE = 16; table扩容时, 每个线程最少迁移table的槽位个数
private static int RESIZE_STAMP_BITS = 16; 用来计算偏移量和线程数量的标记
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; 能够调整的最大线程数量
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; 记录偏移量
static final int MOVED = -1; 值为-1, 当Node.hash为MOVED时, 代表着table正在扩容
static final int TREEBIN = -2; TREEBIN, 值为-2, 代表此元素后接红黑树
static final int RESERVED = -3; 占位符,目前没看出来明显的作用
static final int HASH_BITS = 0x7fffffff; 用来计算Hash值的
transient volatile Node<K,V>[] table; 节点数组
private transient volatile Node<K,V>[] nextTable; table迁移过程临时变量, 在迁移过程中将元素全部迁移到nextTable上
private transient volatile long baseCount; 基础计数器
private transient volatile int sizeCtl; table扩容和初始化的标记,不同的值代表不同的含义,默认为0,表示未初始化<br />-1: table正在初始化;小于-1,表示table正在扩容;大于0,表示初始化完成后下次扩容的大小
private transient volatile int transferIndex; table容量从n扩到2n时, 是从索引n->1的元素开始迁移, transferIndex代表当前已经迁移的元素下标table容量从n扩到2n时, 是从索引n->1的元素开始迁移, transferIndex代表当前已经迁移的元素下标
private transient volatile int cellsBusy; 扩容时候,CAS锁标记
private transient volatile CounterCell[] counterCells; 计数器表,大小是2次幂

ConcurrentHashMap的常用方法属性

put方法

final V putVal(K key, V value, boolean onlyIfAbsent) {
    //key和value不允许为null
    if (key == null || value == null) throw new NullPointerException();
    //计算hash值
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        //如果table没有初始化,进行初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        //计算数组的位置    
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            //如果该位置为空,构造新节点添加即可
            if (casTabAt(tab, i, null,
                         new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }//如果正在扩容
        else if ((fh = f.hash) == MOVED)
            //帮着一起扩容
            tab = helpTransfer(tab, f);
        else {
            //开始真正的插入
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    //如果已经初始化完成了
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            //这里key相同直接覆盖原来的节点
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            //否则添加到节点的最后面
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,  value, null);
                                break;
                            }
                        }
                    }//如果节点是树节点,就进行树节点添加操作
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,alue)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }//判断节点是否要转换成红黑树
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);//红黑树转换
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    //计数器,采用CAS计算size大小,并且检查是否需要扩容
    addCount(1L, binCount);
    return null;
}

对比HashMap的put方法我们发现逻辑相差不大,主要是新增了线程安全部分,在添加元素时候,采用synchronized来保证线程安全,然后计算size的时候采用CAS操作进行计算。总结下就是:

  • 判断key和vaule是否为空,如果为空,直接抛出异常。
  • 判断table数组是否已经初始化完毕,如果没有初始化,进行初始化。
  • 计算key的hash值,如果该位置为空,直接构造节点放入。
  • 如果table正在扩容,进入帮助扩容方法。
  • 最后开启同步锁,进行插入操作,如果开启了覆盖选项,直接覆盖,否则,构造节点添加到尾部,如果节点数超过红黑树阈值,进行红黑树转换。如果当前节点是树节点,进行树插入操作。
  • 最后统计size大小,并计算是否需要扩容。

get方法

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    //计算hash值
    int h = spread(key.hashCode());
    //如果table已经初始化,并且计算hash值的索引位置node不为空
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        //如果hash相等,key相等,直接返回该节点的value
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }//如果hash值为负值表示正在扩容,这个时候查的是ForwardingNode的find方法来定位到节点。
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        //循环遍历链表,查询key和hash值相等的节点。
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

get方法的主要流程如下:

  • 直接计算hash值,查找的节点如果key和hash值相等,直接返回该节点的value就行。
  • 如果table正在扩容,就调用ForwardingNode的find方法查找节点。
  • 如果没有扩容,直接循环遍历链表,查找到key和hash值一样的节点值即可。

ConcurrentHashMap的扩容

ConcurrentHashMap的扩容相对于HashMap的扩容相对复杂,因为涉及到了多线程操作,这里扩容方法主要是transfer,我们来分析下这个方法的源码,研究下是如何扩容的。

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    //保证每个线程扩容最少是16,
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) {            // initiating
        try {
            //扩容2倍
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) {      // try to cope with OOME
            //出现异常情况就不扩容了。
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        //用新数组对象接收
        nextTable = nextTab;
        //初始化扩容下表为原数组的长度
        transferIndex = n;
    }
    int nextn = nextTab.length;
    //扩容期间的过渡节点
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false; // to ensure sweep before committing nextTab

    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            //如果该线程已经完成了
            if (--i >= bound || finishing)
                advance = false;
            //设置扩容转移下标,如果下标小于0,说明已经没有区间可以操作了,线程可以退出了
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }CAS操作设置区间
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
        }
        //如果计算的区间小于0了,说明区间分配已经完成,没有剩余区间分配了
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {//如果扩容完成了,进行收尾工作
                nextTable = null;//清空临时数组
                table = nextTab;//赋值原数组
                sizeCtl = (n << 1) - (n >>> 1);//重新赋值sizeCtl
                return;
            }//如果扩容还在进行,自己任务完成就进行sizeCtl-1,这里是因为,扩容是通过helpTransfer()和addCount()方法来调用的,在调用transfer()真正扩容之前,sizeCtl都会+1,所以这里每个线程完成后就进行-1。
            if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                //这里应该是判断扩容是否结束
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                //结束,赋值状态
                finishing = advance = true;
                i = n; // recheck before commit
            }
        }//如果在table中没找到,就用过渡节点
        else if ((f = tabAt(tab, i)) == null)
            //成功设置就进入下一个节点
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)
            //如果节点不为空,并且该位置的hash值为-1,表示已经处理了,直接进入下一个循环即可
            advance = true; // already processed
        else {
            //这里说明老table该位置不为null,也没有被处理过,进行真正的处理逻辑。进行同步锁
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    //如果hash值大于0
                    if (fh >= 0) {
                        //为运算结果
                        int runBit = fh & n;
                        Node<K,V> lastRun = f;
                        for (Node<K,V> p = f.next; p != null; p = p.next) {
                            int b = p.hash & n;
                            if (b != runBit) {
                                runBit = b;
                                lastRun = p;
                            }
                        }
                        if (runBit == 0) {
                            ln = lastRun;
                            hn = null;
                        }
                        else {
                            hn = lastRun;
                            ln = null;
                        }
                        for (Node<K,V> p = f; p != lastRun; p = p.next) {
                            int ph = p.hash; K pk = p.key; V pv = p.val;
                            //这里的逻辑和hashMap是一样的,都是采用2个链表进行处理,具体分析可以查看我分析HashMap的文章
                            if ((ph & n) == 0)
                                ln = new Node<K,V>(ph, pk, pv, ln);
                            else
                                hn = new Node<K,V>(ph, pk, pv, hn);
                        }
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }//如果是树节点,执行树节点的扩容数据转移
                    else if (f instanceof TreeBin) {
                        TreeBin<K,V> t = (TreeBin<K,V>)f;
                        TreeNode<K,V> lo = null, loTail = null;
                        TreeNode<K,V> hi = null, hiTail = null;
                        int lc = 0, hc = 0;
                        for (Node<K,V> e = t.first; e != null; e = e.next) {
                            int h = e.hash;
                            TreeNode<K,V> p = new TreeNode<K,V>
                                (h, e.key, e.val, null, null);
                            //也是通过位运算判断两个链表的位置    
                            if ((h & n) == 0) {
                                if ((p.prev = loTail) == null)
                                    lo = p;
                                else
                                    loTail.next = p;
                                loTail = p;
                                ++lc;
                            }
                            else {
                                if ((p.prev = hiTail) == null)
                                    hi = p;
                                else
                                    hiTail.next = p;
                                hiTail = p;
                                ++hc;
                            }
                        }
                        //这里判断是否进行树转换
                        ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                        (hc != 0) ? new TreeBin<K,V>(lo) : t;
                        hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                        (lc != 0) ? new TreeBin<K,V>(hi) : t;
                        //这里把新处理的链表赋值到新数组中
                        setTabAt(nextTab, i, ln);
                        setTabAt(nextTab, i + n, hn);
                        setTabAt(tab, i, fwd);
                        advance = true;
                    }
                }
            }
        }
    }
}

ConcurrentHashMap的扩容还是比较复杂,复杂主要表现在,控制多线程扩容层面上,扩容的源码我没有解析的很细,一方面是确实比较复杂,本人有某些地方也不是太明白,另一方面是我觉得我们研究主要是弄懂其思想,能搞明白关键代码和关键思路即可,只要不是重新实现一套类似的功能,我想就不用纠结其全部细节了。总结下ConcurrentHashMap的扩容步骤如下:

  • 获取线程扩容处理步长,最少是16,也就是单个线程处理扩容的节点个数。
  • 新建一个原来容量2倍的数组,构造过渡节点,用于扩容期间的查询操作。
  • 进行死循环进行转移节点,主要根据finishing变量判断是否扩容结束,在扩容期间通过给不同的线程设置不同的下表索引进行扩容操作,就是不同的线程,操作的数组分段不一样,同时利用synchronized同步锁锁住操作的节点,保证了线程安全。
  • 真正进行节点在新数组的位置是和HashMap扩容逻辑一样的,通过位运算计算出新链表是否位于原位置或者位于原位置+扩容的长度位置

总结

1.ConcurrentHashMap大部分的逻辑代码和HashMap是一样的,主要通过synchronized和来保证节点插入扩容的线程安全,这里肯定有同学会问,为啥使用synchronized呢?而不用采取乐观锁,或者lock呢?我个人觉得可能原因有2点:

  • 乐观锁比较适用于竞争冲突比较少的场景,如果冲突比较多,那么就会导致不停的重试,这样反而性能更低。
  • synchronized在经历了优化之后,其实性能已经和lock没啥差异了,某些场景可能还比lock快。所以,我觉得这是采用synchronized来同步的原因。

2.ConcurrentHashMap的扩容核心逻辑主要是给不同的线程分配不同的数组下标,然后每个线程处理各自下表区间的节点。同时处理节点复用了hashMap的逻辑,通过位运行,可以知道节点扩容后的位置,要么在原位置,要么在原位置+oldlength位置,最后直接赋值即可。

相关文章

网友评论

    本文标题:ConcurrentHashMap为什么是线程安全的?对比Has

    本文链接:https://www.haomeiwen.com/subject/gvdkpltx.html