美文网首页
58 concurrentHashMap

58 concurrentHashMap

作者: 滔滔逐浪 | 来源:发表于2020-10-03 09:22 被阅读0次

    jdk1.8 ConcurrentHashMap
    1,去除了Segment 分段锁
    2,synchronized+cas 保证node节点线程安全问题

    HashMap1.8与jdk1.8 ConcurrentHashMap 数据结构模型是一样的
    concurrentHashMap 数组+链表+红黑树
    concurrentHashMap对我们 index 下标对应的node 节点上锁
    ConcurrenthashMap锁的竞争:
    多个线程同时落到put key的时候。如果 多个key都落入到同一个index node节点的时候;

    如果没有落到一个index node 节点不需要做锁的竞争。
    注意:
    只需要计算一次index的值。

    区别:
    hashtable 对我们整个Table 数组上锁。
    jdk1.7 ConcurrentHashmap

    1,基于Segment 分段锁设计
    2,lock+cas 保证node节点线程安全问题
    ConcurrenthashMap1.5 synchronized性能没有做优化 tryLock方法
    需要计算2次index 值: 第一次计算存放哪个Segement 对象中。
    第二次计算Segement 对象中哪个hashEntry<k,v>[]table;

    使用传统的hashTable 保证线程安全问题,是采用synchronized 锁将整个hashTable中的数组锁住,在多个线程中只允许一个线程访问put或者get,效率非常低,但是能保证线程安全。
    [图片上传失败...(image-c18fcf-1601685672405)]

    jdk官方不推荐在多线程的情况下使用hashtable 或者hashmap 建议使用Concurrenthashmap 分段锁,效率非常高。
    Concurrenthashmap 将一个大 的 hashmap 集合拆成n多个不同的小的hashtable(segement)。默认的情况下分成16个不同的segment每个segment中都有独立的hashEntry<k,v>[]table;
    简单实现Concurrenthashmap:

    package com.taotao.hashmap001;
    
    import java.util.Hashtable;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentLinkedDeque;
    
    /**
     *@author tom
     *Date  2020/9/27 0027 7:19
     *concurrentHashmap底层原理
     */
    public class MyConcurrentHashMap<K, V> {
        /**
         * segments
         */
        private Hashtable<K, V>[] segments;
    
        public MyConcurrentHashMap() {
            segments = new Hashtable[16];
        }
    
        public void put(K k, V v) {
            //第一次计算index, 计算key存放在那个hashtable
            int segmentIndex = k.hashCode() & (segments.length - 1);
            Hashtable<K, V> segment = segments[segmentIndex];
            if (segment == null) {
                segment = new Hashtable<>();
            }
            segment.put(k, v);
          segments[segmentIndex]=segment;
        }
    
    
        public V get(K k) {
            //第一次计算index, 计算key存放在那个hashtable
            int segmentIndex = k.hashCode() & (segments.length - 1);
            Hashtable<K, V> segment = segments[segmentIndex];
            if (segment != null) {
                return segment.get(k);
            }
            return null;
    
        }
    
        public static void main(String[] args) {
            MyConcurrentHashMap<String, String> myConcurrentHashMap = new MyConcurrentHashMap<>();
            ConcurrentHashMap map=new ConcurrentHashMap();
    
            for (int i = 0; i < 10; i++) {
                myConcurrentHashMap.put(i + "", i + "");
            }
            for (int i = 0; i < 10; i++) {
                myConcurrentHashMap.get(i + "");
                System.out.println(  myConcurrentHashMap.get(i + ""));
            }
        }
    }
    
    
    

    核心源码分析:

    1,无参构造函数分析:
    initialCapacity  --16
    loadFactor  hashEntry<K,V>[]table; 加载因子 0.75
    concurrnetcyLevel并发级别 默认是16
    2,并发级别是能够大于2的16次方
    if(concurrentlevel > MAX_SEGMENTS)
    concurrentlevel =MAX_SEGMENTS;
    3,sshift 左移位的次数 ssize 作用,记录segment数组大小。
    int sshift=0;
    int ssize=1;
    while(ssize < concurrentcyLevel){
       ++sshift=0;
    int ssize=1;
    while(ssize<concurrencyLevel){
           ++sshift;
    ssize<<=1;
    
    }
    }
    
    ###4,segmentShift segmentMask: ssize-1 做与运算的时候能够将key均匀存放:
    this.segmentSHift=32-shift;
    this.segmentmask=ssize-1;
    ###5.初始化 Segment0 赋值下标0 的位置
    Segment<K,V>s0=new Segment<K,V>(loadFactory,(int)(cap *loadFactor),(hashEntry<K,V>[])new hashEntry[cap]);
    ####6 采用cas修改复制给Segment数组;
    UNSAFE.putOrderedObject(ss,SBASE,s0);//or
    
    
    
    
    
    
    
    

    put方法底层实现:

    
            Segment<K,V> s;
            if (value == null)
                throw new NullPointerException();
            ###计算key存放那个Segment数组下标位置;
            int hash = hash(key);
            int j = (hash >>> segmentShift  2 8) & segmentMask 15;
            ###使用cas 获取Segment[10]对象 如果没有获取到的情况下,则创建一个新的segment对象
            if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
                 (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
                s = ensureSegment(j);
            ### 使用lock锁对put方法保证线程安全问题
            return s.put(key, hash, value, false);
    
    
    
    0000 0000 00000 0000 0000 0000 0000 0011
                                        0000 0000 00000 0000 0000 0000 0000 0011
    
    
    
    深度分析:
    Segment<K,V> ensureSegment(int k) 
      
        private Segment<K,V> ensureSegment(int k) {
            final Segment<K,V>[] ss = this.segments;
            long u = (k << SSHIFT) + SBASE; // raw offset
            Segment<K,V> seg;
            ### 使用UNSAFE强制从主内存中获取 Segment对象,如果没有获取到的情况=null
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
                ## 使用原型模式 将下标为0的Segment设定参数信息 赋值到新的Segment对象中
                Segment<K,V> proto = ss[0]; // use segment 0 as prototype
                int cap = proto.table.length;
                float lf = proto.loadFactor;
                int threshold = (int)(cap * lf);
                HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
                #### 使用UNSAFE强制从主内存中获取 Segment对象,如果没有获取到的情况=null
                if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                    == null) { // recheck
                    ###创建一个新的Segment对象
                    Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                    while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                           == null) {
                        ###使用CAS做修改
                        if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                            break;
                    }
                }
            }
            return seg;
    }
    
       final V put(K key, int hash, V value, boolean onlyIfAbsent) {
        ###尝试获取锁,如果获取到的情况下则自旋
                HashEntry<K,V> node = tryLock() ? null :
                    scanAndLockForPut(key, hash, value);
                V oldValue;
                try {
                    HashEntry<K,V>[] tab = table;
                    ###计算该key存放的index下标位置
                    int index = (tab.length - 1) & hash;
                    HashEntry<K,V> first = entryAt(tab, index);
                    for (HashEntry<K,V> e = first;;) {
                        if (e != null) {
                            K k;
                            ###查找链表如果链表中存在该key的则修改
                            if ((k = e.key) == key ||
                                (e.hash == hash && key.equals(k))) {
                                oldValue = e.value;
                                if (!onlyIfAbsent) {
                                    e.value = value;
                                    ++modCount;
                                }
                                break;
                            }
                            e = e.next;
                        }
                        else {
                            if (node != null)
                                node.setNext(first);
                            else
                                ###创建一个新的node结点 头插入法
                                node = new HashEntry<K,V>(hash, key, value, first);
                            int c = count + 1;
                              ###如果达到阈值提前扩容
                            if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                                rehash(node);
                            else
                                setEntryAt(tab, index, node);
                            ++modCount;
                            count = c;
                            oldValue = null;
                            break;
                        }
                    }
                } finally {
                    ###释放锁
                    unlock();
                }
                return oldValue;
            }
    
    

    核心Api

    GetObjectVolatile
    此方法和上面的getObject功能类似,不过附加了'volatile'加载语义,也就是强制从主存中获取属性值。类似的方法有getIntVolatile、getDoubleVolatile等等。这个方法要求被使用的属性被volatile修饰,否则功能和getObject方法相同。

    tryLock() 方法是有返回值的,他表示用来尝试获取锁,如果获取成功,则返回true,

    如果获取失败(其他线程已经获取到锁),则返回false,这个方法无论如何都会立即返回,在拿不到锁的时候不会一直在哪里等待。

    [图片上传失败...(image-748a07-1601689703313)]

       /** Implementation for put and putIfAbsent */
        final V putVal(K key, V value, boolean onlyIfAbsent) {
            if (key == null || value == null) throw new NullPointerException();
            int hash = spread(key.hashCode());
            int binCount = 0;
            for (Node<K,V>[] tab = table;;) {
                Node<K,V> f; int n, i, fh;
                if (tab == null || (n = tab.length) == 0)
                    tab = initTable();
    //计算该key存放的index下标位置 查找node节点  如果没有发生index冲突的
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    //有多个线程同时修改 没有发生index冲突下标位置  cas修改。
    //如果cas修改成功的话在退出自旋
                    if (casTabAt(tab, i, null,
                                 new Node<K,V>(hash, key, value, null)))
                        break;                   // no lock when adding to empty bin
                }
                else if ((fh = f.hash) == MOVED)
                    tab = helpTransfer(tab, f);
                else {
                    V oldVal = null;
    //synchronized 的时候:index已经发生冲突 使用node节点上锁
                    synchronized (f) {
                        if (tabAt(tab, i) == f) {
                            if (fh >= 0) {
                                binCount = 1;
                                for (Node<K,V> e = f;; ++binCount) {
                                    K ek;
                                    if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                         (ek != null && key.equals(ek)))) {
                                        oldVal = e.val;
                                        if (!onlyIfAbsent)
                                            e.val = value;
                                        break;
                                    }
                                    Node<K,V> pred = e;
                                    if ((e = e.next) == null) {
                                        pred.next = new Node<K,V>(hash, key,
                                                                  value, null);
                                        break;
                                    }
                                }
                            }
                            else if (f instanceof TreeBin) {
                                Node<K,V> p;
                                binCount = 2;
                                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                               value)) != null) {
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        p.val = value;
                                }
                            }
                        }
                    }
                    if (binCount != 0) {
    //转红黑树
                        if (binCount >= TREEIFY_THRESHOLD)
                            treeifyBin(tab, i);
                        if (oldVal != null)
                            return oldVal;
                        break;
                    }
                }
            }
            addCount(1L, binCount);
            return null;
        }
    
    

    binCount //记录数组长度 ,如果长度>8转为红黑树。

        for (Node<K,V>[] tab = table;;) {}  // 自旋
      if (tab == null || (n = tab.length) == 0)
                tab = initTable();       //如果table为空 则初始化
    
      /**
         * Initializes table, using the size recorded in sizeCtl.
         */
        private final Node<K,V>[] initTable() {
            Node<K,V>[] tab; int sc;
            while ((tab = table) == null || tab.length == 0) {
    //如果发现有其他线程正在扩容的时候,当前线程释放cpu执行权
                if ((sc = sizeCtl) < 0)
                    Thread.yield(); // lost initialization race; just spin
                else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
    //使用cas 修改当前的sizeCtl 为-1   sizeCtl 默认为0,-1表示其他的线程已经在扩容,-2表示2个其他线程再扩容
                        if ((tab = table) == null || tab.length == 0) {
                            int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                            @SuppressWarnings("unchecked")
    //对table 默认长度为16
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = tab = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                    break;
                }
            }
            return tab;
        }
    
    
    
    image.png

    [图片上传失败...(image-572d22-1601775904418)]

    相关文章

      网友评论

          本文标题:58 concurrentHashMap

          本文链接:https://www.haomeiwen.com/subject/mhlkuktx.html