美文网首页
ThreadLocalMap源码解读

ThreadLocalMap源码解读

作者: CXYMichael | 来源:发表于2019-05-14 21:15 被阅读0次

    关于ThreadLocal以及InheritedThreadLocal基本原理的介绍已经非常多,但是感觉threadlocal设计的精髓还是在于ThreadLocalMap,所以这里详细分析一下ThreadLocalMap的实现。

    成员变量

    整体的成员变量和HashMap差不多,主要不同就是扩容阈值是2/3而非0.75,Entry的key是WeakReference<ThreadLocal<?>>。

            //初始容量16
            private static final int INITIAL_CAPACITY = 16;
    
            //Entry数组
            private Entry[] table;
    
            //容量
            private int size = 0;
    
            //扩容阈值
            private int threshold; // Default to 0
    
            //扩容阈值固定为Entry数组长度的2/3,比HashMap要低,因为ThreadLocalMap使用开放定址-线性探测法处理hash冲突,Entry占用更快
            private void setThreshold(int len) {
                threshold = len * 2 / 3;
            }
    

    初始化

            //首次调用ThreadLocal的set方法时,ThreadLocalMap会懒加载,存入一个元素,初始容量置1
            ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
                table = new Entry[INITIAL_CAPACITY];
                int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
                table[i] = new Entry(firstKey, firstValue);
                size = 1;
                setThreshold(INITIAL_CAPACITY);
            }
    
            //Thread初始化,InheritedThreadLocal不为空时,会调用该构造函数创建子线程InheritedThreadLocalMap
            private ThreadLocalMap(ThreadLocalMap parentMap) {
                Entry[] parentTable = parentMap.table;
                int len = parentTable.length;
                setThreshold(len);
                table = new Entry[len];
    
                for (int j = 0; j < len; j++) {
                    Entry e = parentTable[j];
                    if (e != null) {
                        @SuppressWarnings("unchecked")
                        ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
                        if (key != null) {
                            Object value = key.childValue(e.value);
                            Entry c = new Entry(key, value);
                            int h = key.threadLocalHashCode & (len - 1);
                            while (table[h] != null)
                                h = nextIndex(h, len);
                            table[h] = c;
                            size++;
                        }
                    }
                }
            }
    

    查询、插入、删除

    和HashMap的实现有很大不同,hashkey冲突时采用线性地址法而非链地址法,并且在所有可能的地方都做了过期Entry的回收工作,并对因为hash冲突而偏移的Entry进行整理。

            //这里分两种情况处理,一种是e不为空且key相等,直接返回结果,另一种调用getEntryAfterMiss
            private Entry getEntry(ThreadLocal<?> key) {
                int i = key.threadLocalHashCode & (table.length - 1);
                Entry e = table[i];
                if (e != null && e.get() == key)
                    return e;
                else
                    return getEntryAfterMiss(key, i, e);
            }
    
            //如果在hash槽位没有找到该节点,说名不存在或者冲突后偏移了,因此需要向后顺序搜索,知道出现空槽位为止(空槽为说明后面不可能再存在了)。同时还会顺便做Entry的清理工作
            private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
                Entry[] tab = table;
                int len = tab.length;
    
                while (e != null) {
                    ThreadLocal<?> k = e.get();
                    if (k == key)
                        return e;
                    if (k == null)
                        expungeStaleEntry(i);
                    else
                        i = nextIndex(i, len);
                    e = tab[i];
                }
                return null;
            }
    
            //从hashcode位置开始寻找空位并调用replaceStaleEntry插入,或者遇到相同key则更新并直接return
            private void set(ThreadLocal<?> key, Object value) {
    
                Entry[] tab = table;
                int len = tab.length;
                int i = key.threadLocalHashCode & (len-1);
    
                for (Entry e = tab[i];
                     e != null;
                     //nextIndex向后唤醒搜索数组
                     e = tab[i = nextIndex(i, len)]) {
                    ThreadLocal<?> k = e.get();
    
                    if (k == key) {
                        e.value = value;
                        return;
                    }
                    //如果key为null,就用新key、value覆盖,同时清理旧数据
                    if (k == null) {
                        replaceStaleEntry(key, value, i);
                        return;
                    }
                }
                //如果执行到这里,说明i位置是空的,并且需要直接插入
                tab[i] = new Entry(key, value);
                int sz = ++size;
                //进行清理,如果没有清理出空间,就判断是否需要扩容
                if (!cleanSomeSlots(i, sz) && sz >= threshold)
                    rehash();
            }
    
            private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                                           int staleSlot) {
                Entry[] tab = table;
                int len = tab.length;
                Entry e;
    
                //由于GC会批量回收无效引用,所以set()的循环中发现一个过期槽位,就意味着这个key前面也可能出现了新的过期槽位,所以向前搜索并记录可能存在的可用槽位。这样可以增加空槽位的利用率,从而避免频繁触发rehash。
                int slotToExpunge = staleSlot;
                for (int i = prevIndex(staleSlot, len);
                     (e = tab[i]) != null;
                     i = prevIndex(i, len))
                    if (e.get() == null)
                        slotToExpunge = i;
    
                //向后搜索,直到null Entry或者有重复key
                for (int i = nextIndex(staleSlot, len);
                     (e = tab[i]) != null;
                     i = nextIndex(i, len)) {
                    ThreadLocal<?> k = e.get();
    
                    //如果发现重复key,就和过期槽位做替换,从而维持hashtable的顺序性(每个entry离散列位置尽可能更近)
                    if (k == key) {
                        e.value = value;
    
                        tab[i] = tab[staleSlot];
                        tab[staleSlot] = e;
    
                        // 从i位置或者slotToExpunge位置进行批量清理
                        if (slotToExpunge == staleSlot)
                            slotToExpunge = i;
                        cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
                        return;
                    }
    
                    // 如果既没有向前搜索到过期槽位,也没有向后搜索到重复key
                    if (k == null && slotToExpunge == staleSlot)
                        slotToExpunge = i;
                }
    
                // 如果没有找到重复key,就直接替换过期键
                tab[staleSlot].value = null;
                tab[staleSlot] = new Entry(key, value);
    
                // 如果有其它过期键,就进行批量清理操作
                if (slotToExpunge != staleSlot)
                    cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
            }
    
            //移除操作,计算hashcode,然后从该位置向后遍历直到遇到null槽位,逐个比较key值是否匹配,如果匹配就调用Entry的clear方法,并从散列位置清理旧数据
            private void remove(ThreadLocal<?> key) {
                Entry[] tab = table;
                int len = tab.length;
                int i = key.threadLocalHashCode & (len-1);
                for (Entry e = tab[i];
                     e != null;
                     e = tab[i = nextIndex(i, len)]) {
                    if (e.get() == key) {
                        e.clear();
                        expungeStaleEntry(i);
                        return;
                    }
                }
            }
    

    值得注意的是这里调用了key.threadLocalHashCode获取了ThreadLocal对象的hashcode,而其hashcode是通过静态AtomicInteger以HASH_INCREMENT步长递增生成的的。

        private final int threadLocalHashCode = nextHashCode();
    
        private static AtomicInteger nextHashCode =
            new AtomicInteger();
    
        private static final int HASH_INCREMENT = 0x61c88647;
    
        private static int nextHashCode() {
            return nextHashCode.getAndAdd(HASH_INCREMENT);
        }
    

    实际上这里使用的是斐波那契散列法,这是一种乘数散列法,只不过这个乘数比较特殊,是32位整型上限2^32-1乘以黄金分割比例0.618....的值2654435769,用有符号整型表示就是-1640531527,去掉符号后16进制表示为0x61c88647。使用0x61c88647为步长进行散列,结果会更均匀。

    清理

    清理过程是基于热度衰减的方式,从指定位置向后每搜索一位热度减半,发现过期键则重置热度,直到热度归0结束清理。Redis扫描回收过期键时也与此比较类似,不过是按比例进行采样回收,直到采样中过期键比例小于某个阈值。

            //以i为起点向后搜索log2(n)位,如果发现过期Entry则置n为len。换个说法就是先做小范围扫描,如果发现过期Entry就做大范围扫描,直到大范围搜索也找不到过期Entry为止
            private boolean cleanSomeSlots(int i, int n) {
                boolean removed = false;
                Entry[] tab = table;
                int len = tab.length;
                do {
                    i = nextIndex(i, len);
                    Entry e = tab[i];
                    if (e != null && e.get() == null) {
                        n = len;
                        removed = true;
                        i = expungeStaleEntry(i);
                    }
                } while ( (n >>>= 1) != 0);
                return removed;
            }
    
            private int expungeStaleEntry(int staleSlot) {
                Entry[] tab = table;
                int len = tab.length;
    
                //回收staleSlot槽位
                tab[staleSlot].value = null;
                tab[staleSlot] = null;
                size--;
    
                //对后续位置rehash,直到遇到null槽位
                Entry e;
                int i;
                for (i = nextIndex(staleSlot, len);
                     (e = tab[i]) != null;
                     i = nextIndex(i, len)) {
                    ThreadLocal<?> k = e.get();
                    //如果可以清理,就清理
                    if (k == null) {
                        e.value = null;
                        tab[i] = null;
                        size--;
                    } else {
                        //计算hashSlot位置
                        int h = k.threadLocalHashCode & (len - 1);
                        //如果entry没有在hashSlot位置,则从hashSlot开始搜索空槽位并插入(相当于整理,避免冲突后entry偏离原位置过远)
                        if (h != i) {
                            tab[i] = null;
                            while (tab[h] != null)
                                h = nextIndex(h, len);
                            tab[h] = e;
                        }
                    }
                }
                return i;
            }
    

    扩容

    和HashMap逻辑基本一致,扩容一倍并重新计算Hashslot

           private void resize() {
                Entry[] oldTab = table;
                int oldLen = oldTab.length;
                int newLen = oldLen * 2;
                Entry[] newTab = new Entry[newLen];
                int count = 0;
    
                for (int j = 0; j < oldLen; ++j) {
                    Entry e = oldTab[j];
                    if (e != null) {
                        ThreadLocal<?> k = e.get();
                        //Entry过期,直接清理
                        if (k == null) {
                            e.value = null;
                        } else {
                            int h = k.threadLocalHashCode & (newLen - 1);
                            while (newTab[h] != null)
                                h = nextIndex(h, newLen);
                            newTab[h] = e;
                            count++;
                        }
                    }
                }
    
                setThreshold(newLen);
                size = count;
                table = newTab;
            }
    

    相关文章

      网友评论

          本文标题:ThreadLocalMap源码解读

          本文链接:https://www.haomeiwen.com/subject/vlumaqtx.html