美文网首页
深入理解ConcurrentHashMap

深入理解ConcurrentHashMap

作者: 逅弈 | 来源:发表于2018-02-06 20:29 被阅读40次

逅弈 欢迎转载,注明原创出处即可,谢谢!

HashMap是用的最多的来保存数据的数据结构,但众所周知,HashMap不是线程安全的。

当插入一个新的节点时,如果这个key不存在,则会判断当前内部元素是否已经达到阈值(默认是数组大小的0.75),如果已经达到阈值,会对数组进行扩容,也会对链表中的元素进行rehash。

但是在并发的情况下,进行扩容时,可能会产生循环链表,然后在执行get的时候,会触发死循环,导致CPU100%运转,所以一定要避免在并发环境下使用HashMap。

而为了解决线程安全的HashMap,可以通过Collections.synchronizedMap(Map map)来对一个HashMap进行包装,其内部是通过一个SynchronizedMap内部类重新生成了一个Map,并对每个方法加上了synchronized锁,所以可以实现线程安全的HashMap。但这并不是我们所需要的,这种方式是多线程不友好的,同一时刻同一个方法只有一个线程能执行。所以为了提高并发量,提高性能,Doug Lea为了我们提供了并发的HashMap,也即ConcurrentHashMap,该类在并发包juc中。

PS 本文分析的是JDK1.8的ConcurrentHashMap的实现

首先ConcurrentHashMap是通过数组+链表/红黑树的结构来存储数据的,通过CAS+synchronized保证并发的安全性的。

有以下重要的内部类:

/**
 * 节点,保存key-value键值对
 * Key-value entry.  This class is never exported out as a
 * user-mutable Map.Entry (i.e., one supporting setValue; see
 * MapEntry below), but can be used for read-only traversals used
 * in bulk tasks.  Subclasses of Node with a negative hash field
 * are special, and contain null keys and values (but are never
 * exported).  Otherwise, keys and vals are never null.
 */
static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
    // 省略
}
/**
 * 红黑树节点
 * Nodes for use in TreeBins
 */
static final class TreeNode<K,V> extends Node<K,V> {
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion
    boolean red;
    // 省略
}
/**
 * 一棵红黑树
 * TreeNodes used at the heads of bins. TreeBins do not hold user
 * keys or values, but instead point to list of TreeNodes and
 * their root. They also maintain a parasitic read-write lock
 * forcing writers (who hold bin lock) to wait for readers (who do
 * not) to complete before tree restructuring operations.
 */
static final class TreeBin<K,V> extends Node<K,V> {
    TreeNode<K,V> root;
    volatile TreeNode<K,V> first;
    volatile Thread waiter;
    volatile int lockState;
    // values for lockState
    static final int WRITER = 1; // set while holding write lock
    static final int WAITER = 2; // set when waiting for write lock
    static final int READER = 4; // increment value for setting read lock
    // 省略
}
/**
 * 辅助节点,在扩容时使用
 * A node inserted at head of bins during transfer operations.
 */
static final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    // 省略
}

重要的变量

/**
 * 保存节点的数组
 * The array of bins. Lazily initialized upon first insertion.
 * Size is always a power of two. Accessed directly by iterators.
 */
transient volatile Node<K,V>[] table;
/**
 * 扩容时使用的节点数组,只在扩容时不为空
 * The next table to use; non-null only while resizing.
 */
private transient volatile Node<K,V>[] nextTable;
/**
 * 控制标识符,在初始化和扩容时使用
 * 负数表示正在初始化或扩容
 * -1 表示正在初始化,此时只有一个线程可以进行初始化,其他线程通过Thread.yield()挂起
 * -N 表示有N-1个线程正在进行扩容
 * 正数 表示数组的大小
 * 数组初始化完成后该值表示下一次扩容的元素的大小
 * Table initialization and resizing control.  When negative, the
 * table is being initialized or resized: -1 for initialization,
 * else -(1 + the number of active resizing threads).  Otherwise,
 * when table is null, holds the initial table size to use upon
 * creation, or 0 for default. After initialization, holds the
 * next element count value upon which to resize the table.
 */
private transient volatile int sizeCtl;
  • table初始化

table初始化操作会延缓到第一次put的时候执行。但是put是可以并发执行的,Doug Lea是如何实现table只初始化一次的?来看看源码的实现

sizeCtl默认为0,如果ConcurrentHashMap实例化时有传参数,sizeCtl会是一个2的幂次方的值。

当执行第一次put操作的线程会执行Unsafe.compareAndSwapInt方法修改sizeCtl为-1,

有且只有一个线程能够修改成功,其它线程通过Thread.yield()让出CPU时间片等待table初始化完成。


/**
 * Initializes table, using the size recorded in sizeCtl.
 */
private final Node<K,V>[] initTable() {
    Node<K,V>[] tab; int sc;
    while ((tab = table) == null || tab.length == 0) {
        // 当发现sizeCtl<0时,说明当前有线程正在进行表的初始化工作,则当前线程挂起
        if ((sc = sizeCtl) < 0)
            Thread.yield(); // lost initialization race; just spin
        // 否则sizeCtl为初始值,则通过CAS将sizeCtl更改为-1,表示当前线程要进行表的初始化工作了    
        else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
            try {
                if ((tab = table) == null || tab.length == 0) {
                    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                    @SuppressWarnings("unchecked")
                    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                    table = tab = nt;
                    // 由于n总是2的幂次方,所以sc=0.75*n
                    sc = n - (n >>> 2);
                }
            } finally {
                // 初始化工作完成后,sizeCtl的值变为下一次扩容时表的大小
                sizeCtl = sc;
            }
            break;
        }
    }
    return tab;
}

  • put操作

假设table已经初始化完成,put操作采用CAS+synchronized实现并发插入或更新操作,具体实现如下:

1 hash算法

static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

2 在table中定位索引位置,n是table的大小

int index = (n-1) & hash;

3 获取table中对应索引的元素f

Doug Lea采用tabAt来获取对应索引i处的节点f

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

而tabAt中实际是调用的Unsafe.getObjectVolatile来获取,也许有人质疑,直接table[index]不可以么,为什么要这么复杂?

在java内存模型中,我们已经知道每个线程都有一个工作内存,里面存储着table的副本,虽然table是volatile修饰的,但table中的元素不是volatile的,不能保证线程每次都拿到table中的最新元素,Unsafe.getObjectVolatile可以直接获取指定内存的数据,保证了每次拿到数据都是最新的

4 如果f为null,说明table中这个位置第一次插入元素,利用Unsafe.compareAndSwapObject方法插入Node节点。

如果CAS成功,说明Node节点已经插入,随后addCount(1L, binCount)方法会检查当前容量是否需要进行扩容。

如果CAS失败,说明有其它线程提前插入了节点,自旋重新尝试在这个位置插入节点。

5 如果f的hash值为-1,说明当前f是ForwardingNode节点,意味有其它线程正在扩容,则一起进行扩容操作。

6 其余情况把新的Node节点按链表或红黑树的方式插入到合适的位置,这个过程采用同步内置锁实现并发


final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    // 计算key的hash值
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh;
        // table为空时,执行初始化
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 根据key的hash值得到对应的节点f,和该节点在表中的位置i
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // 如果节点f为空,则通过CAS在位置i插入一个新节点
            // 如果插入成功则跳出循环,并调用addCount判断是否需要进行扩容
            // 如果插入失败,则通过自旋重新尝试在位置i插入节点
            if (casTabAt(tab, i, null,new Node<K,V>(hash,key,value, null)))
                break;                   // no lock when adding to empty bin
        }
        // 如果节点f的hash值为-1,则说明有线程正在进行扩容,则当前线程帮助一起进行扩容
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else {
            // 省略内置同步锁部分
        }
    }
    // 判断是否需要扩容
    addCount(1L, binCount);
    return null;
}

如果节点f未能通过CAS插入,则在节点f上进行同步,节点插入之前,再次利用tabAt(tab, i) == f判断,防止被其它线程修改。

如果f.hash >= 0,说明f是链表结构的头结点,遍历链表,如果找到对应的node节点,则修改value,否则在链表尾部加入节点。

如果f是TreeBin类型节点,说明f是红黑树根节点,则在树结构上遍历元素,更新或增加节点。

如果链表中节点数binCount >= TREEIFY_THRESHOLD(默认是8),则把链表转化为红黑树结构。

// 使用内置同步锁的方式,进行节点的新增或更新
synchronized (f) {
    // 如果位置i上还是节点f
    if (tabAt(tab, i) == f) {
        // 如果节点f的hash>=0,则说明f是链表结构的头节点
        if (fh >= 0) {
            binCount = 1;
            // 遍历链表
            for (Node<K,V> e = f;; ++binCount) {
                K ek;
                // 如果发现该key在链表中已经存在,则将老的节点的值更新为value
                if (e.hash == hash &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
                    oldVal = e.val;
                    // 如果onlyIfAbsent==false,则用新的值覆盖旧的值,否则不添加
                    if (!onlyIfAbsent)
                        e.val = value;
                    break;
                }
                // 找到链表的尾部,将该key封装成一个新的节点插入到链表的尾部
                Node<K,V> pred = e;
                if ((e = e.next) == null) {
                    pred.next = new Node<K,V>(hash, key,value, null);
                    break;
                }
            }
        }
        // 如果节点f是TreeBin的节点,则说明f节点是红黑树的根节点
        else if (f instanceof TreeBin) {
            Node<K,V> p;
            binCount = 2;
            // 在树上遍历,新增或更新该节点到红黑树中去
            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {
                oldVal = p.val;
                if (!onlyIfAbsent)
                    p.val = value;
            }
        }
    }
}
// 内置锁同步结束后
// 如果链表中节点数binCount >= TREEIFY_THRESHOLD(默认是8),则把链表转化为红黑树结构
if (binCount != 0) {
    if (binCount >= TREEIFY_THRESHOLD)
        treeifyBin(tab, i);
    if (oldVal != null)
        return oldVal;
    break;
}

  • get操作

1 获得key的hash值h

2 通过tabAt获取key对应的节点e,如果节点e为空,则说明没有该值,直接返回null

3 如果节点e不为空,并且该节点的hash等于key的hash值,并且节点e的key与当前key相等,则说明该节点就是要找的节点,直接返回节点的val

4 如果节点的hash小于0,则说明该节点在红黑树上,则通过find方法在树上找到key对应的节点p,如果节点p不为空,则返回p的val

5 以上都没能找到的话,最后会从该节点开始遍历链表,直到找到key对应的节点e,如果e不为空,则返回e的val

/**
 * Returns the value to which the specified key is mapped,
 * or {@code null} if this map contains no mapping for the key.
 *
 * <p>More formally, if this map contains a mapping from a key
 * {@code k} to a value {@code v} such that {@code key.equals(k)},
 * then this method returns {@code v}; otherwise it returns
 * {@code null}.  (There can be at most one such mapping.)
 *
 * @throws NullPointerException if the specified key is null
 */
public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // 获得key的hash值
    int h = spread(key.hashCode());
    // 通过tabAt获取key对应的节点e
    // 如果节点e为空,则说明没有该值,直接返回null
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        // 如果节点e不为空,并且该节点的hash等于key的hash值
        // 并且节点e的key与当前key相等,则说明该节点就是要找的节点,直接返回节点的val
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        // 如果节点的hash小于0,则说明该节点在红黑树上
        else if (eh < 0)
            // 则通过find方法在树上找到key对应的节点p
            // 如果节点p不为空,则返回p的val
            return (p = e.find(h, key)) != null ? p.val : null;
        // 否则在链表上遍历,找到key对应的节点e
        while ((e = e.next) != null) {
            // 返回e的val
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}
更多原创好文,请关注「逅弈逐码」

相关文章

网友评论

      本文标题:深入理解ConcurrentHashMap

      本文链接:https://www.haomeiwen.com/subject/bjlnzxtx.html