美文网首页
ConCurrentHashMap源码解析

ConCurrentHashMap源码解析

作者: 谢朴欢 | 来源:发表于2018-04-25 15:09 被阅读6次

    1. 简介

    在之前写了HashMap源码解析介绍了HashMap这个数据结构,可惜它并不是线程安全的,在多线程情况下最好还是使用ConCurrentHashMap。在JDK8中它使用了CAS+synchronized实现来保证线程安全,而在JDK7中它使用了分段锁的实现方式来保证线程安全。下面我来剖析下它源码:

    2. 实现

    类继承结构

    public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
        implements ConcurrentMap<K,V>, Serializable {}
    

    属性

    // Node数组最大大小
    private static final int MAXIMUM_CAPACITY = 1 << 30;
    // Node数组最大大小
    private static final int DEFAULT_CAPACITY = 16;
    // 最大数组大小,只在toArray方法中使用
    static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
    // 默认并发级别
    private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
    // 负载因子
    private static final float LOAD_FACTOR = 0.75f;
    // 链表转换为红黑树链表长度的阈值
    static final int TREEIFY_THRESHOLD = 8;
    // 红黑树转换为链表的阈值
    static final int UNTREEIFY_THRESHOLD = 6;
    // 链表转换为红黑树数组大小的阈值
    static final int MIN_TREEIFY_CAPACITY = 64;
    
    private static final int MIN_TRANSFER_STRIDE = 16;
    
    private static int RESIZE_STAMP_BITS = 16;
    
    // 帮助扩容的最大线程数
    private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
    
    private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
    
    static final int MOVED     = -1; // forwarding结点的Hash值
    static final int TREEBIN   = -2; // 树根结点的Hash值
    static final int RESERVED  = -3; // hash for transient reservations
    static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
    
    // 获取可用处理器数量
    static final int NCPU = Runtime.getRuntime().availableProcessors();
    
    // Node结点的数组
    transient volatile Node<K,V>[] table;
    
    // 
    private transient volatile Node<K,V>[] nextTable;
    
    private transient volatile long baseCount;
    
    // < -1表示有-(sizeCtl + 1)个线程在扩容
    // == -1表示数组在初始化
    // 大于0表示并且数组为初始化表示要初始化数组的大小,否则表示扩容的阈值
    private transient volatile int sizeCtl;
    
    private transient volatile int transferIndex;
    
    private transient volatile int cellsBusy;
    
    private transient volatile CounterCell[] counterCells;
    
    private transient KeySetView<K,V> keySet;
    private transient ValuesView<K,V> values;
    private transient EntrySetView<K,V> entrySet;
    

    结点类型

    链表结点类型

    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        // 使用volatile修饰来保证变量在多线程下的可见性
        volatile V val;
        volatile Node<K,V> next;
    
        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }
    
        public final K getKey()       { return key; }
        public final V getValue()     { return val; }
        public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
        public final String toString(){ return key + "=" + val; }
        public final V setValue(V value) {
            throw new UnsupportedOperationException();
        }
    
        public final boolean equals(Object o) {
            Object k, v, u; Map.Entry<?,?> e;
            return ((o instanceof Map.Entry) &&
                    (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                    (v = e.getValue()) != null &&
                    (k == key || k.equals(key)) &&
                    (v == (u = val) || v.equals(u)));
        }
    
        // 在以当前结点开始的链表中寻找含有k这个键的结点
        Node<K,V> find(int h, Object k) {
            Node<K,V> e = this;
            if (k != null) {
                do {
                    K ek;
                    if (e.hash == h &&
                        ((ek = e.key) == k || (ek != null && k.equals(ek))))
                        return e;
                } while ((e = e.next) != null);
            }
            return null;
        }
    }
    

    红黑树结点类型

    static final class TreeNode<K,V> extends Node<K,V> {
        TreeNode<K,V> parent;  // red-black tree links
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev;    // needed to unlink next upon deletion
        boolean red;
    
        TreeNode(int hash, K key, V val, Node<K,V> next,
                    TreeNode<K,V> parent) {
            super(hash, key, val, next);
            this.parent = parent;
        }
    
        // 找出键为k的结点
        Node<K,V> find(int h, Object k) {
            return findTreeNode(h, k, null);
        }
    
        /**
            * Returns the TreeNode (or null if not found) for the given key
            * starting at given root.
            */
        final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
            // 省略代码
        }
    }
    
    // 用于存放红黑树的结构
    static final class TreeBin<K,V> extends Node<K,V> {
        TreeNode<K,V> root;
        volatile TreeNode<K,V> first; // TreeNode根结点
        volatile Thread waiter;  // 等待线程
        volatile int lockState; // 锁状态
        // values for lockState
        static final int WRITER = 1; // set while holding write lock
        static final int WAITER = 2; // set when waiting for write lock
        static final int READER = 4; // increment value for setting read lock
    
        static int tieBreakOrder(Object a, Object b) {
            int d;
            if (a == null || b == null ||
                (d = a.getClass().getName().
                    compareTo(b.getClass().getName())) == 0)
                d = (System.identityHashCode(a) <= System.identityHashCode(b) ?
                        -1 : 1);
            return d;
        }
    
        //省略后面大部分代码
    

    构造函数

    在构造函数中并不会创建Node数组,而是延迟到第一次put方法时才创建Node数组

    public ConcurrentHashMap() {
    }
    
    public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                    MAXIMUM_CAPACITY :
                    tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }
    
    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, 1);
    }
    
    public ConcurrentHashMap(int initialCapacity,
                                float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        // 键值对数量 / 数组大小 <= 加载因子,根据initialCapacity与加载因子求出数组大小
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        // 如果数组大小最大等于数组大小则设置数组初始大小为MAXIMUM_CAPACITY,
        // 否则调用tableSizeFor函数求出一个不小于size的第一个2的n次幂的正整数
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        // 设置要初始化的数组大小
        this.sizeCtl = cap;
    }
    
    // 求出一个不小于c的第一个2的n次幂的正整数
    private static final int tableSizeFor(int c) {
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }
    

    散列函数

    通过高16位与低16位的异或运算得出新的散列值以减少碰撞几率

    static final int spread(int h) {
        return (h ^ (h >>> 16)) & HASH_BITS;
    }
    

    put方法

    //插入键值对,如果键已存在,默认更改其值
    public V put(K key, V value) {
        return putVal(key, value, false);
    }
    
    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        // 键与值不得为空
        if (key == null || value == null) throw new NullPointerException();
        // 计算出键的散列值
        int hash = spread(key.hashCode());
        // 指定数组位置上结点的个数
        int binCount = 0;
        // 通过使用循环直到添加成功退出循环
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            // 如果数组未初始化,则调用initTable方法初始化
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
                // 否则计算出键所在的桶,如果桶为空则调用casTabAt方法尝试在该位置添加结点
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // 添加结点成功退出循环,否则表示有其他线程也尝试在此位置插入结点,出现冲突
                if (casTabAt(tab, i, null,
                                new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }// 如果该位置不为空,并且该结点Hash值为MOVED表示当前正在扩容,调用helpTransfer方法帮助扩容
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                // 使用synchronized关键字对该头结点进行加锁,只有获取头结点的锁的线程才有资格对该桶的链表或数组添加结点
                synchronized (f) {
                    // 再一次判断加锁的结点是否为头结点
                    if (tabAt(tab, i) == f) {
                        // 如果头结点Hash值大于等于0则表示该桶存放的是链表,遍历链表添加结点
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                // 找到该结点
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                        (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    // 判断是否替换旧值
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                                value, null);
                                    break;
                                }
                            }
                        } // 否则判断头结点是否为为TreeBin的实例,如果是则该桶存放的是一棵红黑树
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            // 调用putTreeVal方法添加结点
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                            value)) != null) {
                                oldVal = p.val;
                                // 判断是否替换旧值
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
    
                // 如果binCount不等于0则表示添加结点或替换新值成功
                if (binCount != 0) {
                    // 如果结点数量大于等于8,将链表树形化
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    // 如果值不为空直接返回旧值
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        // 添加新结点成功,调用addCount方法判断是否需要扩容
        addCount(1L, binCount);
        return null;
    }
    

    initTable方法

    这个方法是在调用put方法时数组未初始化时调用的,用于创建初始数组

    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        // 使用循环来保证初始化成功
        while ((tab = table) == null || tab.length == 0) {
            // 如果sizeCtl小于0表示数组在初始化或者扩容
            if ((sc = sizeCtl) < 0)
                // 调用Thread.yield方法尝试让出CPU给其他相同优先级的线程
                Thread.yield(); // lost initialization race; just spin
                // 否则使用CAS更新SIZECTL为-1,表示要初始化数组
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    // 再一次判断数组是否未初始化
                    if ((tab = table) == null || tab.length == 0) {
                        // 若在构造对象时未给出初始化大小则使用默认大小16
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        // 创建新数组
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        // 设置下次扩容阈值为当前数组大小的0.75倍
                        sc = n - (n >>> 2);
                    }
                } finally {
                    // 无论是否初始化成功,更新sizeCtl的值
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
    

    helpTransfer方法

    当发现数组正在扩容时调用helpTransfer方法帮助数组转移结点到新数组

    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            int rs = resizeStamp(tab.length);
            while (nextTab == nextTable && table == tab &&
                    (sc = sizeCtl) < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }
    

    相关文章

      网友评论

          本文标题:ConCurrentHashMap源码解析

          本文链接:https://www.haomeiwen.com/subject/eshylftx.html