美文网首页
java.util.concurrent.ConcurrentH

java.util.concurrent.ConcurrentH

作者: zycisbg | 来源:发表于2018-06-07 17:23 被阅读0次

    为了解决线程安全问题,同时又为了照顾效率的问题,java从1.5就有了ConcurrentHashMap。从而代替了HashTable。1.7的ConcurrentHashMap 是用了多个(默认16)ReentrantLock来解决线程安全问题的。1.8的ConcurrentHashMap 则用了cas来解决的。

    如果想读ConcurrentHashMap源码的话,一定要具备以下知识点。
    1.volatile关键字。
    2.unsafe的cas方法
    在阅读ConcurrentHashMap之前,如果读过HashMap (JDK1.8)的源码,也会有一定的帮助。
    java.util.HashMap(JDK1.8)

    下边只是一些比较简单的源码,有很多我也不是很清楚,欢迎大家沟通。

    initTable

    在map(在这里指ConcurrentHashMap,下边不在赘述)添加的时候,一定会初始化这个Node数组,但是因为多线程的原因,不能有多个线程同时去初始化,所以这里用while来监控,如果有别的线程正在扩容,则用yield方法去重新竞争cpu资源。

    private final Node<K,V>[] initTable() {
            
            //sizeCtl 负数代表正在进行初始化或扩容操作
            //(-1代表正在初始化,-N 表示有N-1个线程正在进行扩容操作。0代表hash表还没有被初始化,正数表示初始化或下一次进行扩容的大小,这一点类似于扩容阈值的概念)
            Node<K,V>[] tab; int sc;
            //开始初始化
            //因为table是volatile修饰的,如果table不为空了。直接跳出循环。
            while ((tab = table) == null || tab.length == 0) {
                //进入此判断,表示已经有线程正在初始化或者扩容了。
                if ((sc = sizeCtl) < 0)
                    //线程从运行状态变为就绪状态。
                    Thread.yield(); // lost initialization race; just spin
                    //通过unsafe类的cas把sizeCtl更换为-1
                else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if ((tab = table) == null || tab.length == 0) {
                            //todo 暂时想不通为什么sc会大于0  ,多这个判断的意义是什么,。因为sc并不是volatile修饰。上边只有赋值,赋值完马上判断
                            //table大小默认值为16
                            int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                            @SuppressWarnings("unchecked")
                            //创建node数组
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = tab = nt;
                            //更新阈值   16 - 4 = 12.    是16 * 0.75
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        //把sc赋值到sizeCtl上。
                        sizeCtl = sc;
                    }
                    break;
                }
            }
            return tab;
        }
    
    helpTransfer

    在map添加元素的时候,如果此时map正在扩容。此时不会阻塞,而会进入helpTransfer这个方法, 如果看过hashmap扩容源码,则知道扩容的时候会有数组的copy动作。此时这里会多线程进行拷贝。这个很NB。

    /**
         * Helps transfer if a resize is in progress.
         */
        final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
            Node<K,V>[] nextTab; int sc;
            if (tab != null && (f instanceof ForwardingNode) &&
                (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
                //先通过此时tab的大小计算一个校检码
                int rs = resizeStamp(tab.length);
                //用while监控是否扩容完毕。
                while (nextTab == nextTable && table == tab &&
                       (sc = sizeCtl) < 0) {
                    //如果已经扩容完毕,或者不能在扩容了。则直接跳出
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || transferIndex <= 0)
                        break;
                    //帮助扩容
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                        transfer(tab, nextTab);
                        break;
                    }
                }
                return nextTab;
            }
            return table;
        }
    
    transfer

    这个扩容方法,我自己看了会,没有看太懂,我是从这篇文章中摘取的扩容方法的解析。
    ConcurrentHashMap

     /** 
        * 一个过渡的table表  只有在扩容的时候才会使用 
        */  
       private transient volatile Node<K,V>[] nextTable;  
      
    /** 
        * Moves and/or copies the nodes in each bin to new table. See 
        * above for explanation. 
        */  
       private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {  
           int n = tab.length, stride;  
           if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)  
               stride = MIN_TRANSFER_STRIDE; // subdivide range  
           if (nextTab == null) {            // initiating  
               try {  
                   @SuppressWarnings("unchecked")  
                   Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//构造一个nextTable对象 它的容量是原来的两倍  
                   nextTab = nt;  
               } catch (Throwable ex) {      // try to cope with OOME  
                   sizeCtl = Integer.MAX_VALUE;  
                   return;  
               }  
               nextTable = nextTab;  
               transferIndex = n;  
           }  
           int nextn = nextTab.length;  
           ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//构造一个连节点指针 用于标志位  
           boolean advance = true;//并发扩容的关键属性 如果等于true 说明这个节点已经处理过  
           boolean finishing = false; // to ensure sweep before committing nextTab  
           for (int i = 0, bound = 0;;) {  
               Node<K,V> f; int fh;  
               //这个while循环体的作用就是在控制i--  通过i--可以依次遍历原hash表中的节点  
               while (advance) {  
                   int nextIndex, nextBound;  
                   if (--i >= bound || finishing)  
                       advance = false;  
                   else if ((nextIndex = transferIndex) <= 0) {  
                       i = -1;  
                       advance = false;  
                   }  
                   else if (U.compareAndSwapInt  
                            (this, TRANSFERINDEX, nextIndex,  
                             nextBound = (nextIndex > stride ?  
                                          nextIndex - stride : 0))) {  
                       bound = nextBound;  
                       i = nextIndex - 1;  
                       advance = false;  
                   }  
               }  
               if (i < 0 || i >= n || i + n >= nextn) {  
                   int sc;  
                   if (finishing) {  
                    //如果所有的节点都已经完成复制工作  就把nextTable赋值给table 清空临时对象nextTable  
                       nextTable = null;  
                       table = nextTab;  
                       sizeCtl = (n << 1) - (n >>> 1);//扩容阈值设置为原来容量的1.5倍  依然相当于现在容量的0.75倍  
                       return;  
                   }  
                   //利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作  
                   if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {  
                       if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)  
                           return;  
                       finishing = advance = true;  
                       i = n; // recheck before commit  
                   }  
               }  
               //如果遍历到的节点为空 则放入ForwardingNode指针  
               else if ((f = tabAt(tab, i)) == null)  
                   advance = casTabAt(tab, i, null, fwd);  
               //如果遍历到ForwardingNode节点  说明这个点已经被处理过了 直接跳过  这里是控制并发扩容的核心  
               else if ((fh = f.hash) == MOVED)  
                   advance = true; // already processed  
               else {  
                    //节点上锁  
                   synchronized (f) {  
                       if (tabAt(tab, i) == f) {  
                           Node<K,V> ln, hn;  
                           //如果fh>=0 证明这是一个Node节点  
                           if (fh >= 0) {  
                               int runBit = fh & n;  
                               //以下的部分在完成的工作是构造两个链表  一个是原链表  另一个是原链表的反序排列  
                               Node<K,V> lastRun = f;  
                               for (Node<K,V> p = f.next; p != null; p = p.next) {  
                                   int b = p.hash & n;  
                                   if (b != runBit) {  
                                       runBit = b;  
                                       lastRun = p;  
                                   }  
                               }  
                               if (runBit == 0) {  
                                   ln = lastRun;  
                                   hn = null;  
                               }  
                               else {  
                                   hn = lastRun;  
                                   ln = null;  
                               }  
                               for (Node<K,V> p = f; p != lastRun; p = p.next) {  
                                   int ph = p.hash; K pk = p.key; V pv = p.val;  
                                   if ((ph & n) == 0)  
                                       ln = new Node<K,V>(ph, pk, pv, ln);  
                                   else  
                                       hn = new Node<K,V>(ph, pk, pv, hn);  
                               }  
                               //在nextTable的i位置上插入一个链表  
                               setTabAt(nextTab, i, ln);  
                               //在nextTable的i+n的位置上插入另一个链表  
                               setTabAt(nextTab, i + n, hn);  
                               //在table的i位置上插入forwardNode节点  表示已经处理过该节点  
                               setTabAt(tab, i, fwd);  
                               //设置advance为true 返回到上面的while循环中 就可以执行i--操作  
                               advance = true;  
                           }  
                           //对TreeBin对象进行处理  与上面的过程类似  
                           else if (f instanceof TreeBin) {  
                               TreeBin<K,V> t = (TreeBin<K,V>)f;  
                               TreeNode<K,V> lo = null, loTail = null;  
                               TreeNode<K,V> hi = null, hiTail = null;  
                               int lc = 0, hc = 0;  
                               //构造正序和反序两个链表  
                               for (Node<K,V> e = t.first; e != null; e = e.next) {  
                                   int h = e.hash;  
                                   TreeNode<K,V> p = new TreeNode<K,V>  
                                       (h, e.key, e.val, null, null);  
                                   if ((h & n) == 0) {  
                                       if ((p.prev = loTail) == null)  
                                           lo = p;  
                                       else  
                                           loTail.next = p;  
                                       loTail = p;  
                                       ++lc;  
                                   }  
                                   else {  
                                       if ((p.prev = hiTail) == null)  
                                           hi = p;  
                                       else  
                                           hiTail.next = p;  
                                       hiTail = p;  
                                       ++hc;  
                                   }  
                               }  
                               //如果扩容后已经不再需要tree的结构 反向转换为链表结构  
                               ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :  
                                   (hc != 0) ? new TreeBin<K,V>(lo) : t;  
                               hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :  
                                   (lc != 0) ? new TreeBin<K,V>(hi) : t;  
                                //在nextTable的i位置上插入一个链表      
                               setTabAt(nextTab, i, ln);  
                               //在nextTable的i+n的位置上插入另一个链表  
                               setTabAt(nextTab, i + n, hn);  
                                //在table的i位置上插入forwardNode节点  表示已经处理过该节点  
                               setTabAt(tab, i, fwd);  
                               //设置advance为true 返回到上面的while循环中 就可以执行i--操作  
                               advance = true;  
                           }  
                       }  
                   }  
               }  
           }  
       } 
    
    putVal

    接下来就可以看put方法了。 put方法会直接调用putVal方法,他比put方法多一个onlyIfAbsent参数。
    该参数的意思为是否只在key为空的时候添加。(或者理解为是否覆盖value)
    如果上边那些方法看明白了。HashMap的put方法看过,那么这个还是很简单的。

     
     //添加方法。
     final V putVal(K key, V value, boolean onlyIfAbsent) {
            //concurrentHashMap不允许key或value为空
            if (key == null || value == null) throw new NullPointerException();
            //获取key的hash值
            int hash = spread(key.hashCode());
            int binCount = 0;
            //开始循环,并且赋值tab数组。成功后才会跳出循环
            for (Node<K,V>[] tab = table;;) {
                Node<K,V> f; int n, i, fh;
                //首次添加会进入此判断。
                if (tab == null || (n = tab.length) == 0)
                    //初始化node数组
                    //因为这是一个死循环,初始化后,会再次循环。
                    tab = initTable();
                //通过key的hash值来寻找位置,如果当前位置为空,则进入判断
                else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                    //cas直接放入,如果放入成功。然后跳出循环。没有成功继续循环。
                    if (casTabAt(tab, i, null,
                                 new Node<K,V>(hash, key, value, null)))
                        break;                   // no lock when adding to empty bin
                }
                //如果正在扩容。则帮助扩容
                else if ((fh = f.hash) == MOVED)
                    //多线程扩容,返回新的tab
                    tab = helpTransfer(tab, f);
                else {
                    V oldVal = null;
                    //当该位置已经有值 则上锁。
                    synchronized (f) {
                        //判断是该值是否已经改变
                        if (tabAt(tab, i) == f) {
                            //如果f的hashcode大于0.则该节点是连边。
                            if (fh >= 0) {
                                binCount = 1;
                                //循环判断,如果hash值一样,并且equals也想等,则覆盖,如果e已经是最后一个元素,则链下去
                                //在这里每循环一次,就会++binCount.为下边是否变成树做准备
                                for (Node<K,V> e = f;; ++binCount) {
                                    K ek;
                                    if (e.hash == hash &&
                                        ((ek = e.key) == key ||
                                         (ek != null && key.equals(ek)))) {
                                        oldVal = e.val;
                                        if (!onlyIfAbsent)
                                            e.val = value;
                                        break;
                                    }
                                    Node<K,V> pred = e;
                                    if ((e = e.next) == null) {
                                        pred.next = new Node<K,V>(hash, key,
                                                                  value, null);
                                        break;
                                    }
                                }
                            }
                            //如果该节点是树。则用树的方式往下链
                            else if (f instanceof TreeBin) {
                                Node<K,V> p;
                                binCount = 2;
                                if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                               value)) != null) {
                                    oldVal = p.val;
                                    if (!onlyIfAbsent)
                                        p.val = value;
                                }
                            }
                        }
                    }
                    if (binCount != 0) {
                        //如果bincount大于8 则该位置变为红黑树
                        if (binCount >= TREEIFY_THRESHOLD)
                            treeifyBin(tab, i);
                        if (oldVal != null)
                            return oldVal;
                        break;
                    }
                }
            }
            //增加当前的容量,在增加的时候 也会根据容量来扩容。
            addCount(1L, binCount);
            return null;
        }
    
    get

    这个方法没什么特殊的。跟hashmap的getnode方法差不多,就是找位置然后是树就从树中取,是链表就从链表中取

    public V get(Object key) {
            Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
            //计算hash值
            int h = spread(key.hashCode());
            //如果当前tab没问题并且该位置有值
            if ((tab = table) != null && (n = tab.length) > 0 &&
                (e = tabAt(tab, (n - 1) & h)) != null) {
                //如果该位置的第一个就是该值、就直接返回
                if ((eh = e.hash) == h) {
                    if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                        return e.val;
                }
                //如果是树。则从树中寻找。
                else if (eh < 0)
                    return (p = e.find(h, key)) != null ? p.val : null;
                //如果是链表,则next下去找。
                while ((e = e.next) != null) {
                    if (e.hash == h &&
                        ((ek = e.key) == key || (ek != null && key.equals(ek))))
                        return e.val;
                }
            }
            return null;
        }
    

    相关文章

      网友评论

          本文标题:java.util.concurrent.ConcurrentH

          本文链接:https://www.haomeiwen.com/subject/wwcisftx.html