美文网首页
HashMap及其并发的一些理解

HashMap及其并发的一些理解

作者: 沉迷学习_日渐发福 | 来源:发表于2018-11-17 17:44 被阅读0次

    HashMap及其并发的一些理解

    HashMap

    在jdk1.8之前,HashMap通过数组加链表的方式实现。在1.8之后,在链表长度大于8之后,会将链表转化为红黑树进行存储。

    基本元素

    Node和Entry

    Entry表示链表的每一个节点的元素,就是key和value的包装对象。Map是一个接口,实现类为Node

     static class Node<K,V> implements Map.Entry<K,V> {
            final int hash;
            final K key;
            V value;
            Node<K,V> next;
    
            Node(int hash, K key, V value, Node<K,V> next) {
                this.hash = hash;
                this.key = key;
                this.value = value;
                this.next = next;
            }
    
            public final K getKey()        { return key; }
            public final V getValue()      { return value; }
            public final String toString() { return key + "=" + value; }
    
            public final int hashCode() {
                return Objects.hashCode(key) ^ Objects.hashCode(value);
            }
    
            public final V setValue(V newValue) {
                V oldValue = value;
                value = newValue;
                return oldValue;
            }
    
            public final boolean equals(Object o) {
                if (o == this)
                    return true;
                if (o instanceof Map.Entry) {
                    Map.Entry<?,?> e = (Map.Entry<?,?>)o;
                    if (Objects.equals(key, e.getKey()) &&
                        Objects.equals(value, e.getValue()))
                        return true;
                }
                return false;
            }
        }
    

    作为一个Map存储的基本单元,需要有下面几个成员变量和方法

    • hash: 表示当前node对应的hash值
    • key: map中put对应的key值
    • value: map中put对应的value值
    • next: 表示链表的下一个节点
    • equals: 比较两个node是否相等

    TreeNode

    在jdk1.8之后,当链表长度大于8个时,将会转化红黑树,以用来提供查找效率。当链表长度到达一定长度时,查找的时间复杂度为O(n),在已红黑树进行查询时,时间复杂度会缩减为O(lgn)

     static final class TreeNode<K,V> extends LinkedHashMap.LinkedHashMapEntry<K,V> {
            TreeNode<K,V> parent;  // red-black tree links
            TreeNode<K,V> left;
            TreeNode<K,V> right;
            TreeNode<K,V> prev;    // needed to unlink next upon deletion
            boolean red;
            }
    

    一个红黑树的Node,需要有一下几个成员变量

    • parent: 父节点
    • left: 左子节点
    • right:右子节点
    • prev: 上一个节点
    • red: 是否是红节点

    还会提供几个相关的方法:

    • root(): 获取当前红黑树的根节点
    • moveRootToFront(): 保证红黑树节点是链表的第一个元素
    • find:从指定节点找到node
    • getTreeNode:从root节点找到对应的node
    • treeify:将链表转化为红黑树
    • untreeify:将红黑树转化为链表
    • putTreeVal:插入一个红黑树节点
    • removeTreeNode:移除一个红黑树节点

    操作

    put

     final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
                       boolean evict) {
            Node<K,V>[] tab; Node<K,V> p; int n, i;
            if ((tab = table) == null || (n = tab.length) == 0)
                //创建resize里面会创建新的table,或进行扩容
                n = (tab = resize()).length;
            if ((p = tab[i = (n - 1) & hash]) == null)
                //如果计算出来的pos并没有node节点,那么直接创建新的节点,并且进行存储
                tab[i] = newNode(hash, key, value, null);
            else {
                //计算出来pos有元素存在
                Node<K,V> e; K k;
                if (p.hash == hash &&
                    ((k = p.key) == key || (key != null && key.equals(k))))
                    //当前的node是需要需要替换的node
                    //直接替换
                    e = p;
                else if (p instanceof TreeNode)
                    //当前是以树结构进行存储,通过树的put方法进行插入
                    e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
                else {
                    //当前是链表,需要添加新node
                    for (int binCount = 0; ; ++binCount) {
                        if ((e = p.next) == null) {
                            //在最后一个节点处,创建新节点
                            p.next = newNode(hash, key, value, null);
                            //如果当前是size大于等于7了,也就是添加元素之后size至少是8了,
                            if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
                                //将链表转化为输结构存储
                                treeifyBin(tab, hash);
                            break;
                        }
                        ...
                        p = e;
                    }
                }
                ....
            }
            ++modCount;
            if (++size > threshold)
                //如果当前的大小大于阈值,那么需要扩容
                resize();
            afterNodeInsertion(evict);
            return null;
        }
    

    HashMap的put方法主要以下几个步骤:

    1. 如果当前table为空,那么初始化table
    2. 查找通过key的hash值来计算当前的table的pos,如果pos对应的node为空,那么直接插入
    3. 如果不为空,而且hash值一样和key一样,那么直接替换
    4. 如果不为空,hash值不一致,而且获取出来的节点是树节点,那么通过树的putTreeVal方法进行插入,内部会维持红黑树的平衡。
    5. 如果不为空,hash值不一致,且非树节点,那么当前还是以链表的形式存储,需要将元素添加到链表后面,添加完成之后,如果链表元素大于阈值,需要进行扩容。

    下面我们来看看hashMap的扩容方法resize

     final Node<K,V>[] resize() {
            Node<K,V>[] oldTab = table;
            int oldCap = (oldTab == null) ? 0 : oldTab.length;
            int oldThr = threshold;
            int newCap, newThr = 0;
            if (oldCap > 0) {
                ...
                //进行扩容,新size为oldsize的2倍
                newThr = oldThr << 1; // double threshold
            }
            threshold = newThr;
            @SuppressWarnings({"rawtypes","unchecked"})
            //创建新的table
            Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
            table = newTab;
            if (oldTab != null) {
                //如果旧表有值,需要将旧表的值添加新表
                for (int j = 0; j < oldCap; ++j) {
                    Node<K,V> e;
                    if ((e = oldTab[j]) != null) {
                        oldTab[j] = null;
                        if (e.next == null)
                            newTab[e.hash & (newCap - 1)] = e;
                        else if (e instanceof TreeNode)
                            //通过树进行赋值
                            ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
                        else { // preserve order
                            //重新计算hash,链表节点赋值
                            ...
                           }
                    }
                }
            }
            return newTab;
        }
    

    get

    final Node<K,V> getNode(int hash, Object key) {
            Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
            if ((tab = table) != null && (n = tab.length) > 0 &&
                (first = tab[(n - 1) & hash]) != null) {
                //table对应pos的节点存在
                if (first.hash == hash && // always check first node
                    ((k = first.key) == key || (key != null && key.equals(k))))
                    //key相等,hash值也相等,直接返回
                    return first;
                if ((e = first.next) != null) {
                    //如果当前是以树结构进行存储,那么通过树去寻找节点,并返回
                    if (first instanceof TreeNode)
                        return ((TreeNode<K,V>)first).getTreeNode(hash, key);
                    do {
                        //当前仍是以链表形式存储,遍历查找并且返回
                        if (e.hash == hash &&
                            ((k = e.key) == key || (key != null && key.equals(k))))
                            return e;
                    } while ((e = e.next) != null);
                }
            }
            return null;
        }
    

    其实get操作相对put还是比较简单的,

    1. 首先通过hash计算出在table的位置,并取值

    2. 如果当前的node已经是匹配hash和key的,那么直接返回

    3. 如果当前不匹配,

      当前node是以链表进行存储的:遍历比较取值并返回
      当前node是以树的形式进行存储的:通过红黑树的getTreeNode进行取值并返回

    hashMap在多线程中的缺陷

    hashMap并没有对多线程做一些额外的处理,这就导致了在多线程中直接使用hashMap可能出现异常情况。

    • 在put操作中,如果并发进行元素的添加,可能出现某个元素添加失败的情况
    if ((e = p.next) == null) {
        //第一个线程进入这个判断,切换到到第二个线程,并且执行完p.next的设置,再切换到第一个线程执行p.next的设置,会导致第二个线程的设置失效
        p.next = newNode(hash, key, value, null);
    }
    
    • 在put中,如果正在进行扩容,会导致链表循环引用的出现
    if ((e.hash & oldCap) == 0) {
        if (loTail == null)
            loHead = e;
        else
            //在第一个线程执行到这个语句时,切换到第二个线程
            loTail.next = e;
        loTail = e;
    }
    

    最后会出现e.next = e的循环引用的出现。所以,在多线程的情况下,我们需要利用ConcurrentHashMap。

    HashTable

    对于HashTable,通过给synchronized修饰解决HashMap的并发问题,这样效率十分低下,在同一个时刻,仅能有一个线程访问HashTable的各个方法,因为每个线程都需要竞争同一把锁。

    ConcurrentHashMap

    基本元素

    TreeBin

    底层结构是以树结构来进行存储的
    TreeBin表示红黑树的root节点:

    static final class TreeBin<K,V> extends Node<K,V> {
            TreeNode<K,V> root;
            volatile TreeNode<K,V> first;
            volatile Thread waiter;
            volatile int lockState;
            // values for lockState
            static final int WRITER = 1; // set while holding write lock
            static final int WAITER = 2; // set when waiting for write lock
            static final int READER = 4;
        }
    

    ForwardingNode

    这个是一个在扩容时会创建的节点,主要用于占位,当其他线程发现某个位置的节点fwd类型的节点,那么会跳过这个节点。

    CAS操作

    CAS操作是可以保证线程间同步的操作,比较并设置。不需要锁来实现,通过设置一个期待的原值,和需要修改的值,只有原值和当前期待的值成功,才会进行修改。
    map中提供了三个关于node的CAS操作。

    //获取table指定位置的元素
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
            return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
        }
    
        //判断当前元素是否存在
        static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                            Node<K,V> c, Node<K,V> v) {
            return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
        }
    //设置table的某个元素
        static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
            U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
        }
    

    ConcurrentHashMap

    在jdk1.7里面及以前采用了锁分段技术,对于每几个Entry,都将对应到一个Segment上,在put的时候,仅仅对segment进行加锁,多线程访问不同数据段的数据,就不会出现锁竞争的情况。

    jdk1.8实现

    在jdk1.8里面通过Unsafe类来实现。这个类并未开源
    原子访问是sun.misc.Unsafe被广泛应用的特性之一,特性包括简单的“put”和“get”操作(带有 volatile 语义或不带有 volatile 语义)以及比较并交换(compare and swap,CAS)操作

    put方法

    put方法的流程大概有下面几步

    • 判空值
    if (key == null || value == null) throw new NullPointerException();
    

    //如果此时需要插入的key和value为null,那么直接抛异常,说明concurrentHashMap不支持空key和空value

    • 计算hash
    int hash = spread(key.hashCode());
    

    通过hashCode值才能知道当前需要存入的哪个buket中。

    • 遍历table
      1. 如果当前table为null,那么需要new除新的table
     if (tab == null || (n = tab.length) == 0)
                    tab = initTable();
    

    initTable的代码如下

        private final Node<K,V>[] initTable() {
            Node<K,V>[] tab; int sc;
            while ((tab = table) == null || tab.length == 0) {
                if ((sc = sizeCtl) < 0)
                    Thread.yield(); // lost initialization race; just spin
                //通过Unsafe这个类来进行原子操作,如果size仍然为0,那么进行创建
                else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if ((tab = table) == null || tab.length == 0) {
                            //设置默认size,默认为16
                            int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                            @SuppressWarnings("unchecked")
                            //创建size为16的数组
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = tab = nt;
                            //计算ctrl参数为16 - 4 = 12
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                    break;
                }
            }
            return tab;
        }
    

    首先会通过Unsafe来进行判断是否需要创建table,如果需要,那么直接创建table,默认的初始化table大小为16,同时会设置大小控制参数,初始的sizeCtl为12

    1. 如果当前的table不为空,那么查找当前hash值对应的table位置上是否有值,如果有值,直接通过cas设置值
    //如果table表指定的位置没有值,
    if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
        //已cas操作的形式直接插入
        if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                        break;                   // no lock when adding to empty bin
                }
    
    1. 如果table当前pos已经存储了其他的Node,且当前的hash值是-1,表明当前table正在扩容,则通过多线程帮助其扩容
    if ((fh = f.hash) == MOVED)
        tab = helpTransfer(tab, f);
    

    MOVED是内部定义的常量,值为-1,如果计算出的hash值为MOVE,表明当前正在扩容。这个扩容的内容后续会讲解

    1. 当前table的pos位置有值,且没有到达扩容阈值,需要直接进行覆盖
     else {
        V oldVal = null;
        //对需当前需要替换的节点加锁
        synchronized (f) {
            //再次判断需要替换的节点已经被替换
            if (tabAt(tab, i) == f) {
                    //fh是前面计算出来的f节点的hash值,大于等于0表示当前是已链表的形式进行存储
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            //判断当前的hash值是否相等,key是否相等
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                        (ek != null && key.equals(ek)))) {                           //相等,那么找到了需要替换的value
                                oldVal = e.val;
                                //如果不是不才替换,那么直接替换,并结束循环
                                if (!onlyIfAbsent)
                                    e.val = value;
                                    break;
                                }
                                
                                Node<K,V> pred = e;
                                //加入没有找到需要替换的节点,那么在节点最后创建新的节点
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key, value,null);
                                        break;
                                    }
                                }
                            }
                            //如果当前是已红黑树进行存储
                            else if (f instanceof TreeBin) {
                                Node<K,V> p;
                                binCount = 2;
                                //已树的形式寻找节点
                                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                               value)) != null) {
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        p.val = value;
                                }
                            }
                    }
                    if (binCount != 0) {
                        //如果当前节点数大于8,那么建立为树节点
                        if (binCount >= TREEIFY_THRESHOLD)
                            treeifyBin(tab, i);
                        if (oldVal != null)
                            return oldVal;
                        break;
                    }
                }
    

    在转化为红黑树之前,在table大小小于64时,会对table进行扩容。否则直接创建红黑树。

    if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
                    synchronized (b) {
                        if (tabAt(tab, index) == b) {
                            TreeNode<K,V> hd = null, tl = null;
                            for (Node<K,V> e = b; e != null; e = e.next) {
                                //创建节点
                                TreeNode<K,V> p =
                                    new TreeNode<K,V>(e.hash, e.key, e.val,
                                                      null, null);
                                if ((p.prev = tl) == null)
                                    hd = p;
                                else
                                    tl.next = p;
                                tl = p;
                            }
                            //同时设置头节点,也就是TreeBin
                            setTabAt(tab, index, new TreeBin<K,V>(hd));
                        }
                    }
                }
    

    和hashMap相比,其实就是一些同步相关的操作

    • 通过cas操作初始化table
    • 通过CAS操作查找node是否存在
    • 通过CAS操作设置node
    • 通过多线程并发进行扩容

    get操作

    public V get(Object key) {
            Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
            //两次hash计算hash计算hash值
            int h = spread(key.hashCode());
            //通过CAS操作获取头节点
            if ((tab = table) != null && (n = tab.length) > 0 &&
                (e = tabAt(tab, (n - 1) & h)) != null) {
                //判断是否节点是否符合
                if ((eh = e.hash) == h) {
                    if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                        return e.val;
                }
                else if (eh < 0)
                    //hash小于0,表示当前以树的形式做存储
                    return (p = e.find(h, key)) != null ? p.val : null;
                while ((e = e.next) != null) {
                
                    if (e.hash == h &&
                        ((ek = e.key) == key || (ek != null && key.equals(ek))))
                        return e.val;
                }
            }
            return null;
        }
    

    get操作也比较简单,先通过cas操作判断node是否存在,其他的操作和普通的hashmap一致。

    相关文章

      网友评论

          本文标题:HashMap及其并发的一些理解

          本文链接:https://www.haomeiwen.com/subject/rrdifqtx.html