美文网首页Java源码分析
ConcurrentHashMap(一)

ConcurrentHashMap(一)

作者: DevilN | 来源:发表于2019-03-06 18:42 被阅读0次

    类继承

    ConcurrentHashMap继承AbstractMap,该抽象类定义了一些基本的操作,HashMap同样继承了这个类,其次还实现了ConcurrentMap,该接口继承的Map接口,除Map中的方法自己同样定义了一些操作。

    public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
        implements ConcurrentMap<K,V>, Serializable
    

    类重要属性

    //最大table容量
    private static final int MAXIMUM_CAPACITY = 1 << 30;
    //默认容量
    private static final int DEFAULT_CAPACITY = 16;
    //数组可能得最大值,(非2次幂)需要与toArray和相关联方法
    static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    //并发级别,已经不用了,兼容以前的版本
    private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
    //负载因子
    private static final float LOAD_FACTOR = 0.75f;
    //链表转红黑树阀值,> 8 链表转换为红黑树
    static final int TREEIFY_THRESHOLD = 8;
    //树转链表阀值,小于等于6
    static final int UNTREEIFY_THRESHOLD = 6;
    //树化链表的最小table容量
    static final int MIN_TREEIFY_CAPACITY = 64;
    //扩容线程每次最少要迁移16个hash桶
    private static final int MIN_TRANSFER_STRIDE = 16;
    //用于生成当前数组对应的基数戳\生成sizeCtl所使用的bit位数
    private static int RESIZE_STAMP_BITS = 16;
    //最大扩容线程数量
    private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
    //sizeCt中记录大小的位移
    private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
    /**
    * Node的hash字段编码
    */
    static final int MOVED     = -1; // hash for forwarding nodes forwarding Node的hash值
    static final int TREEBIN   = -2; // hash for roots of trees 树的根节点的hash值
    static final int RESERVED  = -3; // hash for transient reservations 暂时保留的hash值
    //普通节点hash值的可用位数,此值是为了消除负hash  0x7fffffff int的最大值 2^31
    static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
    //Node数组,存储元素的数组 size为2的次幂,加了volatile关键字
    transient volatile Node<K,V>[] table;
    //默认为null,扩容时新生成的数组,其大小为原数组的两倍。
    private transient volatile Node<K,V>[] nextTable;
    /**
     * 默认为0,用来控制table的初始化和扩容操作
     * 负数代表正在进行初始化或扩容操作
     *      -1代表正在初始化
     *      -N 表示有N-1个线程正在进行扩容操作
     *      正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小
    private transient volatile int sizeCtl;
    

    重要的内部类

    Node

    保存key,value及key的hash值的数据结构

    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;
    
        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
        ...省略部分代码
    }
    

    TreeNode

    TreeBins的节点,树节点类,另外一个核心的数据结构。当链表长度过长的时候,会转换为TreeNode。
    但是与HashMap不相同的是,它并不是直接转换为红黑树,而是把这些结点包装成TreeNode放在TreeBin对象中,由TreeBin完成对红黑树的包装

    static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent;  // red-black tree links
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev;    // needed to unlink next upon deletion
        boolean red;
    
        TreeNode(int hash, K key, V val, Node<K,V> next,
                 TreeNode<K,V> parent) {
            super(hash, key, val, next);
            this.parent = parent;
        }
        ...省略部分代码
    

    TreeBin

    在HashMap中桶中存放的是TreeNode,而实际的ConcurrentHashMap桶中,存放的是TreeBin对象。

    static final class TreeBin<K,V> extends Node<K,V> {
        TreeNode<K,V> root;//树的根节点
        volatile TreeNode<K,V> first;//第一个树节点
        volatile Thread waiter;//等待线程
        volatile int lockState;//锁状态
        // values for lockState
        static final int WRITER = 1; // set while holding write lock 持有写锁
        static final int WAITER = 2; // set when waiting for write lock 等待写锁
        static final int READER = 4; // increment value for setting read lock 设置读锁时增加
        ......省略部分代码
    

    构造器

    //空构造器
    public ConcurrentHashMap() {
    }
    //初始化容量的构造器,需要保证
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }
    //初始化容量
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
                    // 找出大于等于参数容量的2次幂数
        //初始化sizeCtl
        this.sizeCtl = cap;
    }
    //带有容器的构造类,将容器内元素全部put进map中
    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        this.sizeCtl = DEFAULT_CAPACITY;
        putAll(m);
    }
    //
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        //这里加1是为了下面算cap,找出适合initialCapacity的2次幂的数值,赋值给sizeCtl
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap;
    }
    

    重要的常用方法

    put(K key,V value):具体方法看putVal(K key, V value, boolean onlyIfAbsent)

    public V put(K key, V value) {
        return putVal(key, value, false);
    }
    

    putVal(K key, V value, boolean onlyIfAbsent):首先看JDK中源码是怎么实现的

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        //如果key等于null或者value为null,抛出空指针异常
        if (key == null || value == null) throw new NullPointerException();
        //计算key的hash值
        //1.调用object方法获取key的hash值
        //2.调用spread()重新计算--降低hash冲突
        int hash = spread(key.hashCode());
        // bin计数变量
        int binCount = 0;
        //将公用变量table赋值给tab,开始死循环
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;//声明几个变量
            if (tab == null || (n = tab.length) == 0)//如果tab为null或者tab长度为0,初始化tab
                tab = initTable();//初始化table,然后进入下次循环
            //如果tab不为null或者length不为0,进入下面else if 中
            //1.首先利用hash值计算出数组的下标
            //2.利用tab数组和下标值,找出对应下标的Node,详情看tabAt方法
            //3.判断是否为null
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //如果当前位置为null,没有被占用---此时有线程安全问题,可能有多个线程到这里
                //所以这里通过CAS操作数组,无需加锁,各线程可见
                //两种情况:
                //1.操作成功:跳出循环
                //2.操作失败:有其他线程修改了节点,重新进入下次循环
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    //操作成功
                    break;                   // no lock when adding to empty bin 添加到空箱时没有锁定
            }
            //走到这里说明,由hash值算出的下标,数组当前位置已经存在元素,下面还有其他判断
            // 如果当前位置的元素的hash值为MOVED,即为-1,说明此节点为forwarding nodes
            // 意味有其它线程正在扩容,则一起进行扩容操作,帮助扩容完成
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);//帮助扩容
            else {
                //走到这里,说明当前下标位置已经有节点,hash值冲突,也没有扩容操作,那么以链表或者红黑树的形式插入节点
                V oldVal = null;//为旧节点的value声明一个临时变量
                synchronized (f) { //给当前头节点加锁,不影响其他下标位置插入节点,相比segment锁粒度更小,提高并发
                    if (tabAt(tab, i) == f) {//这里需要再次判断下刚才获取到的头节点是否z这段时间内被其他线程修改,例如:被remove,
                        if (fh >= 0) {//当fh的值大于等于0时,说明是链表头节点
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {//循环链表,binCount迭加
                                K ek;
                                //如果当前e所指向的节点与要插入的节点key相同,则根据onlyIfAbsent,覆盖value或者不变,然后退出循环
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                //如果上面对比发现两者不同,那么将e赋值为当前节点的下个节点继续对比
                                //如果下个节点为null,则直接新建节点插入链表
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //如果fh.hash<0 说明此节点为树节点,则进入putTreeVal()方法中
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    //tab[i]的链表过长,转成红黑树或者扩容(tab.length过短,优先扩容)
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
    

    下面我们分析下这个方法,首先可以理清一下插入节点的逻辑:

    1.计算key值的hash值

    2.进入循环,判断table是否初始化,长度是否为0,如果没有则先初始化table,调用initTable(),然后进入下次循环

    3.如果table已经初始化,根据hash值计算出插入的元素应该放到数组哪个位置,接着查看此位置是否已经有元素存在,这里查看的方法为tabAt(tab, i = (n - 1) & hash),如果没有值,那么填入元素,这里填入的方法为casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)),填入成功,跳出循环,失败则重新进入循环。

    4.如果该下标位置已经有元素的话,那么还有两种情况,如果此处的元素的hash值为MOVED,即为-1,那么这代表此处的节点为forwarding nodes,可以理解为一个占位节点,这说明有线程正在进行扩容操作,那么当前线程可以帮助一起扩容

    5.如果该下标位置的元素不为MOVED,那么此节点要么是链表头节点,要么是树的根节点,那么对应两种情况,分别插入即可。

    其实ConcurrentHashMap中插入节点的逻辑和HashMap中大同小异,但是ConcurrentHashMap为了保证线程安全,在实现手法上和HashMap有很大变化,我们同样可以列出来看看:

    • 在ConcurrentHashMap中table、sizeCtl、nextable等公用变量用volatile修饰,volatile可以保证线程之间变量的可见性,这应该是在多线程场景下的低配。在HashMap中是没有这样修饰的。
    • 接着是初始化table,在HashMap中是没有初始化操作这一步骤的,而是扩容操作,这里暂且不讨论扩容操作,在put里面执行初始化,是可以并发执行的,那么怎么保证只初始化一次呢,在initTable()的实现中,sizeCtl默认为0,如果ConcurrentHashMap实例化时有传参数,sizeCtl会是一个2的幂次方的值。当有线程执行了Unsafe.compareAndSwapInt方法(CAS)就会修改sizeCtl为-1,此时其他线程就会调用Thread.yield()让出CPU时间片等待table初始化完成,而且修改sizeCtl前后进行了两次table检查,这样就确保了只有一个线程可以初始化table,这样我们就安全的生成一个table了,剩下就是往里面放元素。
    • 在检查数组下标是否有对应元素f时,使用了tabAt(tab, i = (n - 1) & hash)方法,在tabAt方法中使用了Unsafe.getObjectVolatile来获取,在Java内存模型中,我们已经知道每个线程都有一个工作内存,里面存储着table的副本,虽然table是被volatile修饰了,但是不能保证每次从table中取出最新元素(volatile的数组只针对数组的引用具有volatile的语义,而不是它的元素)(可参考并发编程网volatile是否能保证数组中元素的可见性?),所以Doug Lea采用Unsafe.getObjectVolatile可以直接获取指定内存的数据,保证了每次拿到数据都是最新的。
    • 如果此时检查到没有头节点,调用casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)这个方法插入头节点,这里同样利用Unsafe.compareAndSwapObject(CAS)方法插入Node节点。(这里CAS成功,可以保证后来的线程在上一步查询的时候感知到;如果CAS失败,那么说明其他线程已经修改了,接着就自旋重新尝试插入节点)
    • 如果检查到头节点已经有了节点,接着判断该节点的hash值,如果hash值等MOVED此静态int值,说明table正在扩容节点,那么此阶段插入节点并不是安全的,因为此时table状态并不稳定,那么此线程就帮助一起扩容,扩容完成后进入下次循环插入节点。
    • 如果头节点的值不是MOVED,那么此时是可以进行插入操作,接着就是ConcurrentHashMap不同HashMap的区别了,在插入节点之前,首先会锁定当前下标位置的头节点,即synchronized (f),不同于jdk1.7用segment实现的分段锁,1.8的分段锁粒度更小,进一步减少并发冲突的概率,当其他线程来了,只能等待当前线程修改完毕才能进入代码块。接下来插入节点的操作因为都在同步快里,所以和HashMap大同小异了。

    以上是ConcurrentHashMap的put操作过程,和HashMap已经有这么多的不同之处了,看来一篇文章是不能讲完了,其他方法就在分篇放在其他文章吧。文章中可能对CAS、Unsafe类以及更底层的设计没有写的很好的写出来,我也只是理解的表面些,希望大佬可以指教。

    此文章的观点是自己思考以及从网上大佬的文章中汲取的,如有错误,请勿喷(Ծ‸Ծ),非常欢迎大家指出,我也是个菜鸟而已,谢谢大家!

    相关文章

      网友评论

        本文标题:ConcurrentHashMap(一)

        本文链接:https://www.haomeiwen.com/subject/cqixpqtx.html