美文网首页
ThreadLocalMap源码解读

ThreadLocalMap源码解读

作者: CXYMichael | 来源:发表于2019-05-14 21:15 被阅读0次

关于ThreadLocal以及InheritedThreadLocal基本原理的介绍已经非常多,但是感觉threadlocal设计的精髓还是在于ThreadLocalMap,所以这里详细分析一下ThreadLocalMap的实现。

成员变量

整体的成员变量和HashMap差不多,主要不同就是扩容阈值是2/3而非0.75,Entry的key是WeakReference<ThreadLocal<?>>。

        //初始容量16
        private static final int INITIAL_CAPACITY = 16;

        //Entry数组
        private Entry[] table;

        //容量
        private int size = 0;

        //扩容阈值
        private int threshold; // Default to 0

        //扩容阈值固定为Entry数组长度的2/3,比HashMap要低,因为ThreadLocalMap使用开放定址-线性探测法处理hash冲突,Entry占用更快
        private void setThreshold(int len) {
            threshold = len * 2 / 3;
        }

初始化

        //首次调用ThreadLocal的set方法时,ThreadLocalMap会懒加载,存入一个元素,初始容量置1
        ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
            table = new Entry[INITIAL_CAPACITY];
            int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
            table[i] = new Entry(firstKey, firstValue);
            size = 1;
            setThreshold(INITIAL_CAPACITY);
        }

        //Thread初始化,InheritedThreadLocal不为空时,会调用该构造函数创建子线程InheritedThreadLocalMap
        private ThreadLocalMap(ThreadLocalMap parentMap) {
            Entry[] parentTable = parentMap.table;
            int len = parentTable.length;
            setThreshold(len);
            table = new Entry[len];

            for (int j = 0; j < len; j++) {
                Entry e = parentTable[j];
                if (e != null) {
                    @SuppressWarnings("unchecked")
                    ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
                    if (key != null) {
                        Object value = key.childValue(e.value);
                        Entry c = new Entry(key, value);
                        int h = key.threadLocalHashCode & (len - 1);
                        while (table[h] != null)
                            h = nextIndex(h, len);
                        table[h] = c;
                        size++;
                    }
                }
            }
        }

查询、插入、删除

和HashMap的实现有很大不同,hashkey冲突时采用线性地址法而非链地址法,并且在所有可能的地方都做了过期Entry的回收工作,并对因为hash冲突而偏移的Entry进行整理。

        //这里分两种情况处理,一种是e不为空且key相等,直接返回结果,另一种调用getEntryAfterMiss
        private Entry getEntry(ThreadLocal<?> key) {
            int i = key.threadLocalHashCode & (table.length - 1);
            Entry e = table[i];
            if (e != null && e.get() == key)
                return e;
            else
                return getEntryAfterMiss(key, i, e);
        }

        //如果在hash槽位没有找到该节点,说名不存在或者冲突后偏移了,因此需要向后顺序搜索,知道出现空槽位为止(空槽为说明后面不可能再存在了)。同时还会顺便做Entry的清理工作
        private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
            Entry[] tab = table;
            int len = tab.length;

            while (e != null) {
                ThreadLocal<?> k = e.get();
                if (k == key)
                    return e;
                if (k == null)
                    expungeStaleEntry(i);
                else
                    i = nextIndex(i, len);
                e = tab[i];
            }
            return null;
        }

        //从hashcode位置开始寻找空位并调用replaceStaleEntry插入,或者遇到相同key则更新并直接return
        private void set(ThreadLocal<?> key, Object value) {

            Entry[] tab = table;
            int len = tab.length;
            int i = key.threadLocalHashCode & (len-1);

            for (Entry e = tab[i];
                 e != null;
                 //nextIndex向后唤醒搜索数组
                 e = tab[i = nextIndex(i, len)]) {
                ThreadLocal<?> k = e.get();

                if (k == key) {
                    e.value = value;
                    return;
                }
                //如果key为null,就用新key、value覆盖,同时清理旧数据
                if (k == null) {
                    replaceStaleEntry(key, value, i);
                    return;
                }
            }
            //如果执行到这里,说明i位置是空的,并且需要直接插入
            tab[i] = new Entry(key, value);
            int sz = ++size;
            //进行清理,如果没有清理出空间,就判断是否需要扩容
            if (!cleanSomeSlots(i, sz) && sz >= threshold)
                rehash();
        }

        private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                                       int staleSlot) {
            Entry[] tab = table;
            int len = tab.length;
            Entry e;

            //由于GC会批量回收无效引用,所以set()的循环中发现一个过期槽位,就意味着这个key前面也可能出现了新的过期槽位,所以向前搜索并记录可能存在的可用槽位。这样可以增加空槽位的利用率,从而避免频繁触发rehash。
            int slotToExpunge = staleSlot;
            for (int i = prevIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = prevIndex(i, len))
                if (e.get() == null)
                    slotToExpunge = i;

            //向后搜索,直到null Entry或者有重复key
            for (int i = nextIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = nextIndex(i, len)) {
                ThreadLocal<?> k = e.get();

                //如果发现重复key,就和过期槽位做替换,从而维持hashtable的顺序性(每个entry离散列位置尽可能更近)
                if (k == key) {
                    e.value = value;

                    tab[i] = tab[staleSlot];
                    tab[staleSlot] = e;

                    // 从i位置或者slotToExpunge位置进行批量清理
                    if (slotToExpunge == staleSlot)
                        slotToExpunge = i;
                    cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
                    return;
                }

                // 如果既没有向前搜索到过期槽位,也没有向后搜索到重复key
                if (k == null && slotToExpunge == staleSlot)
                    slotToExpunge = i;
            }

            // 如果没有找到重复key,就直接替换过期键
            tab[staleSlot].value = null;
            tab[staleSlot] = new Entry(key, value);

            // 如果有其它过期键,就进行批量清理操作
            if (slotToExpunge != staleSlot)
                cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
        }

        //移除操作,计算hashcode,然后从该位置向后遍历直到遇到null槽位,逐个比较key值是否匹配,如果匹配就调用Entry的clear方法,并从散列位置清理旧数据
        private void remove(ThreadLocal<?> key) {
            Entry[] tab = table;
            int len = tab.length;
            int i = key.threadLocalHashCode & (len-1);
            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                if (e.get() == key) {
                    e.clear();
                    expungeStaleEntry(i);
                    return;
                }
            }
        }

值得注意的是这里调用了key.threadLocalHashCode获取了ThreadLocal对象的hashcode,而其hashcode是通过静态AtomicInteger以HASH_INCREMENT步长递增生成的的。

    private final int threadLocalHashCode = nextHashCode();

    private static AtomicInteger nextHashCode =
        new AtomicInteger();

    private static final int HASH_INCREMENT = 0x61c88647;

    private static int nextHashCode() {
        return nextHashCode.getAndAdd(HASH_INCREMENT);
    }

实际上这里使用的是斐波那契散列法,这是一种乘数散列法,只不过这个乘数比较特殊,是32位整型上限2^32-1乘以黄金分割比例0.618....的值2654435769,用有符号整型表示就是-1640531527,去掉符号后16进制表示为0x61c88647。使用0x61c88647为步长进行散列,结果会更均匀。

清理

清理过程是基于热度衰减的方式,从指定位置向后每搜索一位热度减半,发现过期键则重置热度,直到热度归0结束清理。Redis扫描回收过期键时也与此比较类似,不过是按比例进行采样回收,直到采样中过期键比例小于某个阈值。

        //以i为起点向后搜索log2(n)位,如果发现过期Entry则置n为len。换个说法就是先做小范围扫描,如果发现过期Entry就做大范围扫描,直到大范围搜索也找不到过期Entry为止
        private boolean cleanSomeSlots(int i, int n) {
            boolean removed = false;
            Entry[] tab = table;
            int len = tab.length;
            do {
                i = nextIndex(i, len);
                Entry e = tab[i];
                if (e != null && e.get() == null) {
                    n = len;
                    removed = true;
                    i = expungeStaleEntry(i);
                }
            } while ( (n >>>= 1) != 0);
            return removed;
        }

        private int expungeStaleEntry(int staleSlot) {
            Entry[] tab = table;
            int len = tab.length;

            //回收staleSlot槽位
            tab[staleSlot].value = null;
            tab[staleSlot] = null;
            size--;

            //对后续位置rehash,直到遇到null槽位
            Entry e;
            int i;
            for (i = nextIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = nextIndex(i, len)) {
                ThreadLocal<?> k = e.get();
                //如果可以清理,就清理
                if (k == null) {
                    e.value = null;
                    tab[i] = null;
                    size--;
                } else {
                    //计算hashSlot位置
                    int h = k.threadLocalHashCode & (len - 1);
                    //如果entry没有在hashSlot位置,则从hashSlot开始搜索空槽位并插入(相当于整理,避免冲突后entry偏离原位置过远)
                    if (h != i) {
                        tab[i] = null;
                        while (tab[h] != null)
                            h = nextIndex(h, len);
                        tab[h] = e;
                    }
                }
            }
            return i;
        }

扩容

和HashMap逻辑基本一致,扩容一倍并重新计算Hashslot

       private void resize() {
            Entry[] oldTab = table;
            int oldLen = oldTab.length;
            int newLen = oldLen * 2;
            Entry[] newTab = new Entry[newLen];
            int count = 0;

            for (int j = 0; j < oldLen; ++j) {
                Entry e = oldTab[j];
                if (e != null) {
                    ThreadLocal<?> k = e.get();
                    //Entry过期,直接清理
                    if (k == null) {
                        e.value = null;
                    } else {
                        int h = k.threadLocalHashCode & (newLen - 1);
                        while (newTab[h] != null)
                            h = nextIndex(h, newLen);
                        newTab[h] = e;
                        count++;
                    }
                }
            }

            setThreshold(newLen);
            size = count;
            table = newTab;
        }

相关文章

网友评论

      本文标题:ThreadLocalMap源码解读

      本文链接:https://www.haomeiwen.com/subject/vlumaqtx.html