美文网首页
java.util.concurrent.ConcurrentH

java.util.concurrent.ConcurrentH

作者: zycisbg | 来源:发表于2018-06-07 17:23 被阅读0次

为了解决线程安全问题,同时又为了照顾效率的问题,java从1.5就有了ConcurrentHashMap。从而代替了HashTable。1.7的ConcurrentHashMap 是用了多个(默认16)ReentrantLock来解决线程安全问题的。1.8的ConcurrentHashMap 则用了cas来解决的。

如果想读ConcurrentHashMap源码的话,一定要具备以下知识点。
1.volatile关键字。
2.unsafe的cas方法
在阅读ConcurrentHashMap之前,如果读过HashMap (JDK1.8)的源码,也会有一定的帮助。
java.util.HashMap(JDK1.8)

下边只是一些比较简单的源码,有很多我也不是很清楚,欢迎大家沟通。

initTable

在map(在这里指ConcurrentHashMap,下边不在赘述)添加的时候,一定会初始化这个Node数组,但是因为多线程的原因,不能有多个线程同时去初始化,所以这里用while来监控,如果有别的线程正在扩容,则用yield方法去重新竞争cpu资源。

private final Node<K,V>[] initTable() {
        
        //sizeCtl 负数代表正在进行初始化或扩容操作
        //(-1代表正在初始化,-N 表示有N-1个线程正在进行扩容操作。0代表hash表还没有被初始化,正数表示初始化或下一次进行扩容的大小,这一点类似于扩容阈值的概念)
        Node<K,V>[] tab; int sc;
        //开始初始化
        //因为table是volatile修饰的,如果table不为空了。直接跳出循环。
        while ((tab = table) == null || tab.length == 0) {
            //进入此判断,表示已经有线程正在初始化或者扩容了。
            if ((sc = sizeCtl) < 0)
                //线程从运行状态变为就绪状态。
                Thread.yield(); // lost initialization race; just spin
                //通过unsafe类的cas把sizeCtl更换为-1
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        //todo 暂时想不通为什么sc会大于0  ,多这个判断的意义是什么,。因为sc并不是volatile修饰。上边只有赋值,赋值完马上判断
                        //table大小默认值为16
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        //创建node数组
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        //更新阈值   16 - 4 = 12.    是16 * 0.75
                        sc = n - (n >>> 2);
                    }
                } finally {
                    //把sc赋值到sizeCtl上。
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
helpTransfer

在map添加元素的时候,如果此时map正在扩容。此时不会阻塞,而会进入helpTransfer这个方法, 如果看过hashmap扩容源码,则知道扩容的时候会有数组的copy动作。此时这里会多线程进行拷贝。这个很NB。

/**
     * Helps transfer if a resize is in progress.
     */
    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            //先通过此时tab的大小计算一个校检码
            int rs = resizeStamp(tab.length);
            //用while监控是否扩容完毕。
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                //如果已经扩容完毕,或者不能在扩容了。则直接跳出
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                //帮助扩容
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }
transfer

这个扩容方法,我自己看了会,没有看太懂,我是从这篇文章中摘取的扩容方法的解析。
ConcurrentHashMap

 /** 
    * 一个过渡的table表  只有在扩容的时候才会使用 
    */  
   private transient volatile Node<K,V>[] nextTable;  
  
/** 
    * Moves and/or copies the nodes in each bin to new table. See 
    * above for explanation. 
    */  
   private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {  
       int n = tab.length, stride;  
       if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)  
           stride = MIN_TRANSFER_STRIDE; // subdivide range  
       if (nextTab == null) {            // initiating  
           try {  
               @SuppressWarnings("unchecked")  
               Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];//构造一个nextTable对象 它的容量是原来的两倍  
               nextTab = nt;  
           } catch (Throwable ex) {      // try to cope with OOME  
               sizeCtl = Integer.MAX_VALUE;  
               return;  
           }  
           nextTable = nextTab;  
           transferIndex = n;  
       }  
       int nextn = nextTab.length;  
       ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);//构造一个连节点指针 用于标志位  
       boolean advance = true;//并发扩容的关键属性 如果等于true 说明这个节点已经处理过  
       boolean finishing = false; // to ensure sweep before committing nextTab  
       for (int i = 0, bound = 0;;) {  
           Node<K,V> f; int fh;  
           //这个while循环体的作用就是在控制i--  通过i--可以依次遍历原hash表中的节点  
           while (advance) {  
               int nextIndex, nextBound;  
               if (--i >= bound || finishing)  
                   advance = false;  
               else if ((nextIndex = transferIndex) <= 0) {  
                   i = -1;  
                   advance = false;  
               }  
               else if (U.compareAndSwapInt  
                        (this, TRANSFERINDEX, nextIndex,  
                         nextBound = (nextIndex > stride ?  
                                      nextIndex - stride : 0))) {  
                   bound = nextBound;  
                   i = nextIndex - 1;  
                   advance = false;  
               }  
           }  
           if (i < 0 || i >= n || i + n >= nextn) {  
               int sc;  
               if (finishing) {  
                //如果所有的节点都已经完成复制工作  就把nextTable赋值给table 清空临时对象nextTable  
                   nextTable = null;  
                   table = nextTab;  
                   sizeCtl = (n << 1) - (n >>> 1);//扩容阈值设置为原来容量的1.5倍  依然相当于现在容量的0.75倍  
                   return;  
               }  
               //利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作  
               if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {  
                   if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)  
                       return;  
                   finishing = advance = true;  
                   i = n; // recheck before commit  
               }  
           }  
           //如果遍历到的节点为空 则放入ForwardingNode指针  
           else if ((f = tabAt(tab, i)) == null)  
               advance = casTabAt(tab, i, null, fwd);  
           //如果遍历到ForwardingNode节点  说明这个点已经被处理过了 直接跳过  这里是控制并发扩容的核心  
           else if ((fh = f.hash) == MOVED)  
               advance = true; // already processed  
           else {  
                //节点上锁  
               synchronized (f) {  
                   if (tabAt(tab, i) == f) {  
                       Node<K,V> ln, hn;  
                       //如果fh>=0 证明这是一个Node节点  
                       if (fh >= 0) {  
                           int runBit = fh & n;  
                           //以下的部分在完成的工作是构造两个链表  一个是原链表  另一个是原链表的反序排列  
                           Node<K,V> lastRun = f;  
                           for (Node<K,V> p = f.next; p != null; p = p.next) {  
                               int b = p.hash & n;  
                               if (b != runBit) {  
                                   runBit = b;  
                                   lastRun = p;  
                               }  
                           }  
                           if (runBit == 0) {  
                               ln = lastRun;  
                               hn = null;  
                           }  
                           else {  
                               hn = lastRun;  
                               ln = null;  
                           }  
                           for (Node<K,V> p = f; p != lastRun; p = p.next) {  
                               int ph = p.hash; K pk = p.key; V pv = p.val;  
                               if ((ph & n) == 0)  
                                   ln = new Node<K,V>(ph, pk, pv, ln);  
                               else  
                                   hn = new Node<K,V>(ph, pk, pv, hn);  
                           }  
                           //在nextTable的i位置上插入一个链表  
                           setTabAt(nextTab, i, ln);  
                           //在nextTable的i+n的位置上插入另一个链表  
                           setTabAt(nextTab, i + n, hn);  
                           //在table的i位置上插入forwardNode节点  表示已经处理过该节点  
                           setTabAt(tab, i, fwd);  
                           //设置advance为true 返回到上面的while循环中 就可以执行i--操作  
                           advance = true;  
                       }  
                       //对TreeBin对象进行处理  与上面的过程类似  
                       else if (f instanceof TreeBin) {  
                           TreeBin<K,V> t = (TreeBin<K,V>)f;  
                           TreeNode<K,V> lo = null, loTail = null;  
                           TreeNode<K,V> hi = null, hiTail = null;  
                           int lc = 0, hc = 0;  
                           //构造正序和反序两个链表  
                           for (Node<K,V> e = t.first; e != null; e = e.next) {  
                               int h = e.hash;  
                               TreeNode<K,V> p = new TreeNode<K,V>  
                                   (h, e.key, e.val, null, null);  
                               if ((h & n) == 0) {  
                                   if ((p.prev = loTail) == null)  
                                       lo = p;  
                                   else  
                                       loTail.next = p;  
                                   loTail = p;  
                                   ++lc;  
                               }  
                               else {  
                                   if ((p.prev = hiTail) == null)  
                                       hi = p;  
                                   else  
                                       hiTail.next = p;  
                                   hiTail = p;  
                                   ++hc;  
                               }  
                           }  
                           //如果扩容后已经不再需要tree的结构 反向转换为链表结构  
                           ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :  
                               (hc != 0) ? new TreeBin<K,V>(lo) : t;  
                           hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :  
                               (lc != 0) ? new TreeBin<K,V>(hi) : t;  
                            //在nextTable的i位置上插入一个链表      
                           setTabAt(nextTab, i, ln);  
                           //在nextTable的i+n的位置上插入另一个链表  
                           setTabAt(nextTab, i + n, hn);  
                            //在table的i位置上插入forwardNode节点  表示已经处理过该节点  
                           setTabAt(tab, i, fwd);  
                           //设置advance为true 返回到上面的while循环中 就可以执行i--操作  
                           advance = true;  
                       }  
                   }  
               }  
           }  
       }  
   } 
putVal

接下来就可以看put方法了。 put方法会直接调用putVal方法,他比put方法多一个onlyIfAbsent参数。
该参数的意思为是否只在key为空的时候添加。(或者理解为是否覆盖value)
如果上边那些方法看明白了。HashMap的put方法看过,那么这个还是很简单的。

 
 //添加方法。
 final V putVal(K key, V value, boolean onlyIfAbsent) {
        //concurrentHashMap不允许key或value为空
        if (key == null || value == null) throw new NullPointerException();
        //获取key的hash值
        int hash = spread(key.hashCode());
        int binCount = 0;
        //开始循环,并且赋值tab数组。成功后才会跳出循环
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //首次添加会进入此判断。
            if (tab == null || (n = tab.length) == 0)
                //初始化node数组
                //因为这是一个死循环,初始化后,会再次循环。
                tab = initTable();
            //通过key的hash值来寻找位置,如果当前位置为空,则进入判断
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //cas直接放入,如果放入成功。然后跳出循环。没有成功继续循环。
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            //如果正在扩容。则帮助扩容
            else if ((fh = f.hash) == MOVED)
                //多线程扩容,返回新的tab
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                //当该位置已经有值 则上锁。
                synchronized (f) {
                    //判断是该值是否已经改变
                    if (tabAt(tab, i) == f) {
                        //如果f的hashcode大于0.则该节点是连边。
                        if (fh >= 0) {
                            binCount = 1;
                            //循环判断,如果hash值一样,并且equals也想等,则覆盖,如果e已经是最后一个元素,则链下去
                            //在这里每循环一次,就会++binCount.为下边是否变成树做准备
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        //如果该节点是树。则用树的方式往下链
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    //如果bincount大于8 则该位置变为红黑树
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        //增加当前的容量,在增加的时候 也会根据容量来扩容。
        addCount(1L, binCount);
        return null;
    }
get

这个方法没什么特殊的。跟hashmap的getnode方法差不多,就是找位置然后是树就从树中取,是链表就从链表中取

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        //计算hash值
        int h = spread(key.hashCode());
        //如果当前tab没问题并且该位置有值
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            //如果该位置的第一个就是该值、就直接返回
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            //如果是树。则从树中寻找。
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            //如果是链表,则next下去找。
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

相关文章

网友评论

      本文标题:java.util.concurrent.ConcurrentH

      本文链接:https://www.haomeiwen.com/subject/wwcisftx.html