美文网首页死磕源码
死磕源码系列 - ThreadLocal

死磕源码系列 - ThreadLocal

作者: sunyelw | 来源:发表于2020-05-09 23:20 被阅读0次

    关于 ThreadLocal 先问几个问题

    1. ThreadLocal 基于什么场景下使用?
    2. ThreadLocal 为什么是弱引用?
    3. ThreadLocal 一定会发生内存泄漏吗?
    4. ThreadLocal 的底层是如何实现的(细节)

    针对上面问题来尝试逐一分析。


    一、概述

    每个线程类 Thread 都有维护两个 ThreadLocal.ThreadLocalMap 类型的属性

    public class Thread implements Runnable {
        /* ThreadLocal values pertaining to this thread. This map is maintained
         * by the ThreadLocal class. */
        ThreadLocal.ThreadLocalMap threadLocals = null;
    
        /*
         * InheritableThreadLocal values pertaining to this thread. This map is
         * maintained by the InheritableThreadLocal class.
         */
        ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
    }
    

    ThreadLocalMap 是 ThreadLocal 的内部类,是一个键值对组成的集合,类似 HashMap,但具体实现有很大差别。

    注释写道 threadLocals 属性被 ThreadLocal 类所使用。

    • ThreadLocal 基于什么场景下使用?

    正因为 ThreadLocal 归属于线程内部,所以其天然线程安全,而只要在线程内部就可以随时取得 ThreadLocal 内部的值,为方法的设计提供了极大便利,比如一个层层透传的方法不会因为要添加某个参数而要全部重构,只需要初始调用处添加一个 ThreadLocal 将参数塞进去,真正使用的地方再取出来。

    那么我觉得两种场景可以考虑使用 ThreadLocal

    1. 线程安全的数据处理
    2. 改动最小的数据传递

    二、ThreadLocal 的几个常用方法简析

    1. get

    public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();
    }
    // 获取 ThreadLocalMap 实例
    ThreadLocalMap getMap(Thread t) {
        return t.threadLocals;
    }
    
    • setInitialValue 可以设置初始值,默认为null,支持子类重写
    • getMap(Thread) 方法获取 ThreadLocalMap 实例 threadLocals,拿到之后取值

    2. set

    private void set(ThreadLocal<?> key, Object value) {
    
        Entry[] tab = table;
        int len = tab.length;
        int i = key.threadLocalHashCode & (len-1);
    
        for (Entry e = tab[i];
             e != null;
             e = tab[i = nextIndex(i, len)]) {
            ThreadLocal<?> k = e.get();
    
            if (k == key) {
                e.value = value;
                return;
            }
    
            if (k == null) {
                replaceStaleEntry(key, value, i);
                return;
            }
        }
    
        tab[i] = new Entry(key, value);
        int sz = ++size;
        if (!cleanSomeSlots(i, sz) && sz >= threshold)
            rehash();
    }
    
    • for循环里面是hash冲突算法,每有冲突就往后一位继续查询直到找到空的Entry,这就是开放定址法了
    • 循环里面还会做两件事,如果key相同,直接覆盖赋值返回;如果key为null,则调用 replaceStaleEntry 方法
    • 最后判断是否需要rehash
    • replaceStaleEntry、cleanSomeSlots、rehash 方法后面细讲

    3. remove

    public void remove() {
        ThreadLocalMap m = getMap(Thread.currentThread());
        if (m != null)
            m.remove(this);
    }
    
    private void remove(ThreadLocal<?> key) {
        Entry[] tab = table;
        int len = tab.length;
        int i = key.threadLocalHashCode & (len-1);
        for (Entry e = tab[i];
             e != null;
             e = tab[i = nextIndex(i, len)]) {
            if (e.get() == key) {
                e.clear();
                expungeStaleEntry(i);
                return;
            }
        }
    }
    
    • 移除当前实例
    • 并且尝试往后遍历移除过期Entry

    可以看到所有对ThreadLocal的操作都落在了 ThreadLocalMap 的一个或几个方法上,下面继续看下 ThreadLocalMap 的实现。

    小结

    1. 每个 Thread 实例都维护一个 ThreadLocalMap 类型属性 threadLocals 用于存储 ThreadLocal
    2. 由于 threadLocals 是个Map,每个线程可以有多个 ThreadLocal
    3. 将目标对象存入 ThreadLocal 时,其实是以 ThreadLocal 实例为key、目标对象为value存入当前线程实例的 threadLocals 中
    4. 首次塞值会执行 threadLocals 的初始化
    5. 通过源码解析我们会发现一个有趣的现象,对比熟悉的 HashMap 实现,ThreadLocalMap 的set、get、remove方法并不是单纯地对底层数据结构进行简单的数据操作,而是会有几率地去触发一些额外的小动作,比如set时的replaceStaleEntry和cleanSomeSlots、get时的expungeStaleEntry,这些操作需要结合 ThreadLocalMap 的实现一起分析才能了解其用意,后文详叙。

    三、为什么弱应用

    了解了 ThreadLocal 与 ThreadLocalMap 的关系,先来看下 ThreadLocal 的内存分布图

    ThreadLocal 内存分布

    注意 ThreadLocal 与 key 之间的那根虚线,这就是弱引用,实线都是强引用。

    关于强软弱虚四种引用都是针对内存敏感的,不同类型的引用GC时机不一样,需要知道的是:如果一个对象只具有弱引用,那么垃圾回收器在扫描到该对象时,无论内存充足与否,都会回收该对象的内存

    我们来回答下开篇的两个问题

    • ThreadLocal 为什么是弱引用?

    假设如果 ThreadLocal 是个强引用,也就是虚线换实线会发生什么?

    我们现在有段代码

    // 创建
    private ThreadLocal<String> th1 = new ThreadLocal <>();
    
    public void one() {
        // 设置
        th1.set("hello world");
        ...
        // 使用
        log.error(th1.get());
        ...
    }
    
    1. 第一次请求,假设落在第零个槽点上,内存分布如下


      first request
    1. 第二次请求,假设落在第一个槽点上,内存分布如下


      second request

    这种情况下 ThreadLocalMap 的 key1 一直指向一个没有其他引用的 ThreadLocal,而这 ThreadLocal 实际已经完成了其生命周期,只是因为 key1 的这个强引用无法释放内存,这样就造成了内存泄漏。

    PS: 因为这种写法每次都会创建一个新的 ThreadLocal 实例,而实际上这些实例基本是等同的,就存在资源浪费。这里有一个解决方法就是设置 ThreadLocal 为 static 的,这样只要所属类(称为TSO,thread special object)没有被回收,每次都可以共用同一个 ThreadLocal 实例,还可以避免上面的情况。

    static ThreadLocal 的详细说明可以参考 将ThreadLocal变量设置为private static的好处是啥

    结论:基于 ThreadLocal 的这种设计,使用不当是会造成内存泄漏的。若使用者在使用过程中忘了甚至不知道需要手动调用 remove 方法时,ThreadLocal 本身做了一些的优化可以在一定程度上避免过多的内存泄漏。

    自身的优化分两步处理

    1. ThreadLocal 设置为弱引用,每当栈内引用失效就会使 ThreadLocal 被回收,导致 ThreadLocalMap 中存在 key 为 null 的 Entry
    2. 在对 ThreadLocal 的操作中都会有清除这些 key 为 null 的 Entry 的动作

    这种 key 为 null 的 Entry 在源码中被称为 stale (过期的)

    • ThreadLocal 一定会发生内存泄漏吗?

    由于 ThreadLocal 弱引用的设计使得 ThreadLocal 本身不存在内存泄漏,但是这里有个问题就是弱引用仅仅针对 key 而不是整个 Entry,这样设计的结果就是 ThreadLocalMap 中会存在过期 Entry,这些 Entry 如果不能得到有效的回收也是会造成内存泄漏的。

    注意

    1. 如果整个线程执行完任务马上回收,即非线程池模式,那么线程结束后整个 ThreadLocalMap 都会被回收,自然就不存在长久性的内存泄漏,只能说从 ThreadLocal 结束到线程结束之间存在内存泄漏
    2. 如果是线程池模式,那么要分两种情况(以下线程均指核心线程,而非溢出线程)
    • 线程池内线程会继续使用 ThreadLocal
      也就是会调用 set、get 等方法,在这些方法内部会有尝试性清除部分过期 Entry(注意不是全部的过期 Entry)的动作,是有机会将内存泄漏控制在一个可以接收的范围的。
    • 创建 ThreadLocal 后不再使用,ThreadLocalMap 这块内存创建后就不再访问了,而线程一直存活导致的强引用无法回收,这就发生了内存泄漏。

    而通常情况下我们碰到的使用ThreadLocal的线程基本都是线程池,比如spring mvc 的线程池使用 ThreadLocal 存储当前登录用户信息,所以推荐做法是在使用结束后主动调用 remove 方法。(PS:最好是在finally块中调用)

    四、ThreadLocalMap

    1. 基础属性

    ThreadLocalMap 首先是一个Map,先看一些属性

    // 初始容量
    private static final int INITIAL_CAPACITY = 16;
    // 底层 Entry 数组
    private Entry[] table;
    // 初始大小
    private int size = 0;
    // 初始阈值
    private int threshold; // Default to 0
    // 设置阈值
    private void setThreshold(int len) { threshold = len * 2 / 3; }
    

    再看下 Entry 结构

    // Entry 结构
    static class Entry extends WeakReference<ThreadLocal<?>> {
        /** The value associated with this ThreadLocal. */
        Object value;
    
        Entry(ThreadLocal<?> k, Object v) {
            super(k);
            value = v;
        }
    }
    

    Entry弱引用持有key,这个key就是 ThreadLocal

    • 初始化
    // 塞值
    public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }
    
    // 创建 ThreadLocalMap
    void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }
    
    // 初始化 ThreadLocalMap
    ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
        // 初始容量 16
        table = new Entry[INITIAL_CAPACITY];
        int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
        table[i] = new Entry(firstKey, firstValue);
        // size 为1
        size = 1;
        setThreshold(INITIAL_CAPACITY);
    }
    

    按照上面调用链可以知道

    • 初始容量为 16
    • 阈值为 len * 2 / 3

    2. hash取值

    // 每个 ThreadLocal 的hash值 final修饰无法修改
    private final int threadLocalHashCode = nextHashCode();
    // static 修饰的原子类,所有 ThreadLocal 共用
    private static AtomicInteger nextHashCode = new AtomicInteger();
    // 魔术数字 1640531527,均匀分布
    private static final int HASH_INCREMENT = 0x61c88647;
    // CAS添加 0x61c88647
    private static int nextHashCode() {
        return nextHashCode.getAndAdd(HASH_INCREMENT);
    }
    

    这里是个魔术数字 1640531527,主要是结合其开放定址法来减少hash冲突。

    这篇博客中有对这个数字的随机分布做的实验 ThreadLocal 细节实现分析

    另外,还记得 HashMap 的 hash 值是怎么算的吗

    static final int hash(Object key) {
        int h;
        return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
    }
    

    用 key 的 hashCode 高16位与低16位进行异或,目的也是减少 hash 碰撞,只不过HashMap 使用的 hash 算法是链表法,这里我们就知道了两种不同的哈希实践。

    • 链表法 + 高低异或 (代表作 HashMap)
    • 开放定址法 + 魔术字 0x61c88647 (代表作 ThreadLocalMap)

    3. hash冲突解决方式

    只要是哈希结构的数据结构都不可避免的需要处理 hash 冲突,HashMap 的 hash 值是32 位,哪怕你的 hash 算法实现如何完美,数据情况如何可观,只要我超过2^32的数据塞进去就一定存在 hash 冲突。

    那我们需要考虑的是不过多损耗哈希结构高效性的前提下处理这种冲突,HashMap 用的链表法,ThreadLocalMap 用的开放定址法。

    看一段set代码

    int i = key.threadLocalHashCode & (len-1);
    for (Entry e = tab[i];
     e != null;
     e = tab[i = nextIndex(i, len)]) {
    ...
    }
    
    tab[i] = new Entry(key, value);
    int sz = ++size;
    

    for 循环里面是为了找到传入 ThreadLocal 实例作为key时在 tab 数组中的下标,发现如果直接运算得到的下标位置上有值则调用 nextIndex 方法作为下一个下标位置继续寻找直到找到数据为空的那个下标然后将值塞进去,看下 nextIndex 方法实现

    private static int nextIndex(int i, int len) {
        return ((i + 1 < len) ? i + 1 : 0);
    }
    

    就是将数组头尾相连顺时针加一的往后找,这就是开放定址法了。

    4. 扩容

    HashMap 有个负载因子(loadFactor)的概念,实际大小与数组长度的比值大于 loadFactor 就需要进行扩容。
    ThreadLocalMap 没有负载因子的概念,它直接用阈值 threshold 来控制扩容时机,我们把 threshold 相关的代码都列出来

    private int threshold; // Default to 0
    
    /**
     * Set the resize threshold to maintain at worst a 2/3 load factor.
     */
    private void setThreshold(int len) {
        threshold = len * 2 / 3;
    }
    
    /**
     * Re-pack and/or re-size the table. First scan the entire
     * table removing stale entries. If this doesn't sufficiently
     * shrink the size of the table, double the table size.
     */
    private void rehash() {
        expungeStaleEntries();
    
        // Use lower threshold for doubling to avoid hysteresis
        if (size >= threshold - threshold / 4)
            resize();
    }
    
    /**
     * Double the capacity of the table.
     */
    private void resize() {
        /* 具体扩容细节 开始 */
        ...
        /* 具体扩容细节 结束 */
        // 扩容后设置阈值
        setThreshold(newLen);
    }
    

    扩容的具体方法就是 resize 方法,类中只有 rehash 时调用了。
    rehash 的方法注释写了:调整数组tab的大小(这里指调用expungeStaleEntries方法来清除过期entry),如果调整后仍不能缩小tab大小则将tab大小加倍

    ThreadLocalMap 扩容是一次扩一倍,HashMap的扩容呢?(1.7是1.5倍,1.8是2倍)

    expungeStaleEntries方法后文再说,这里扩容的条件就很简单了

    size >= threshold - threshold / 4
    

    如果当前数据大小已经超过了 threshold - threshold / 4 那么就要扩容,而 setThreshold 方法只有在两处初始化与一处扩容时调用,大小始终是tab长度 len 的 2/3

    threshold = len * 2 / 3
    

    那么扩容的条件就是

    size > threshold - threshold / 4 
          = threshold * 3 / 4 
          = ( len * 2 / 3 ) * 3 / 4 
          = len / 2
    

    原来只要容量达到了数组长度的一半就要扩容。

    我们知道 HashMap 的默认负载因子为 0.75,那这里如果有负载因子的概念就是 0.5,之所以比 HashMap 的小三分之一还是因为开放定址法的hash冲突概率比链表法要大,所以这个负载因子不能大,大了就要频繁处理 hash 冲突会增加时间复杂度。


    下面是重头戏 - ThreadLocal 的实现细节

    5. expungeStaleEntry方法解析

    先贴下源码,然后我们一段一段解析

    /**
     * Expunge a stale entry by rehashing any possibly colliding entries
     * lying between staleSlot and the next null slot.  This also expunges
     * any other stale entries encountered before the trailing null.  See
     * Knuth, Section 6.4
     *
     * @param staleSlot index of slot known to have null key
     * @return the index of the next null slot after staleSlot
     * (all between staleSlot and this slot will have been checked
     * for expunging).
     */
    private int expungeStaleEntry(int staleSlot) {
        Entry[] tab = table;
        int len = tab.length;
    
        // expunge entry at staleSlot
        tab[staleSlot].value = null;
        tab[staleSlot] = null;
        size--;
    
        // Rehash until we encounter null
        Entry e;
        int i;
        for (i = nextIndex(staleSlot, len);
             (e = tab[i]) != null;
             i = nextIndex(i, len)) {
            ThreadLocal<?> k = e.get();
            if (k == null) {
                e.value = null;
                tab[i] = null;
                size--;
            } else {
                int h = k.threadLocalHashCode & (len - 1);
                if (h != i) {
                    tab[i] = null;
    
                    // Unlike Knuth 6.4 Algorithm R, we must scan until
                    // null because multiple entries could have been stale.
                    while (tab[h] != null)
                        h = nextIndex(h, len);
                    tab[h] = e;
                }
            }
        }
        return i;
    }
    

    首先我们需要明确这个方法是做什么的

    • 清除过期 Entry,这个操作下面两行就已经做完了
    tab[staleSlot].value = null;
    tab[staleSlot] = null;
    
    • 还要执行一次 Rehash until we encounter null,看下具体逻辑
    1. 循环条件
      就是从当前过期槽点开始往后找,直到 Entry 为 null
    2. 循环体
      2.1 如果 Entry 的 key 为 null,则表示此 Entry 已过期,回收整个 Entry
      2.2 如果 Entry 的 key 不为 null,那么判断是否需要进行一次 rehash

    这段有点复杂,对着代码来看下

    int h = k.threadLocalHashCode & (len - 1);
    if (h != i) {
        tab[i] = null;
    
        // Unlike Knuth 6.4 Algorithm R, we must scan until
        // null because multiple entries could have been stale.
        while (tab[h] != null)
            h = nextIndex(h, len);
        tab[h] = e;
    }
    

    h是当前 Entry 在 tab 数组中第一次计算得到的下标
    i是当前 Entry 在 tab 数组中的实际下标
    注意:以开放定址法的逻辑来看,h 是要 "小于" i 的,所以后面重新定址也是从 hi 的顺序遍历。
    这俩不相等意味着当前 Entry 塞入 tab 数组时发生了 hash 碰撞(也就是插入时 tab[h] != null),而前面我们可能回收了一些过期 Entry 将 tab[h] 到 tab[i] 之间的某些下标空了出来,所以需要尝试地将当前 Entry 往“前”挪一挪位置,使后续get定位时可以更快点(这里查找的总体时间复杂度的量级还是没变,变的只是常量或系数),有那么点拨乱反正的意思。

    最差的结果就是 [h, i] 之间没有找到空槽,重新将 e 赋值给 tab[i]。

    1. 方法返回

    这个方法最后返回的是 i,而循环结束的条件是 (e = tab[i]) == null,所以返回的是过期槽点往后找到的第一个Entry为null的下标。

    1. 总结
    • 清除当前过期 Entry
    • 以当前 Entry 往后查找,清除过期Entry,并尝试 rehash 非空 Entry
    • 返回的给定过期槽点往后找到的第一个 Entry 为 null 的下标

    6. cleanSomeSlots方法解析

    这方法名字,清除一些槽?

    /**
     * Heuristically scan some cells looking for stale entries.
     * This is invoked when either a new element is added, or
     * another stale one has been expunged. It performs a
     * logarithmic number of scans, as a balance between no
     * scanning (fast but retains garbage) and a number of scans
     * proportional to number of elements, that would find all
     * garbage but would cause some insertions to take O(n) time.
     *
     * @param i a position known NOT to hold a stale entry. The
     * scan starts at the element after i.
     *
     * @param n scan control: {@code log2(n)} cells are scanned,
     * unless a stale entry is found, in which case
     * {@code log2(table.length)-1} additional cells are scanned.
     * When called from insertions, this parameter is the number
     * of elements, but when from replaceStaleEntry, it is the
     * table length. (Note: all this could be changed to be either
     * more or less aggressive by weighting n instead of just
     * using straight log n. But this version is simple, fast, and
     * seems to work well.)
     *
     * @return true if any stale entries have been removed.
     */
    private boolean cleanSomeSlots(int i, int n) {
        boolean removed = false;
        Entry[] tab = table;
        int len = tab.length;
        do {
            i = nextIndex(i, len);
            Entry e = tab[i];
            if (e != null && e.get() == null) {
                n = len;
                removed = true;
                i = expungeStaleEntry(i);
            }
        } while ( (n >>>= 1) != 0);
        return removed;
    }
    

    先看下方法注释中的一句:
    添加新元素或删除另一过时的元素时将调用此方法。 它执行对数扫描,作为无扫描(快速但保留垃圾)和与元素数量成比例的扫描数量(发现所有垃圾,但会导致某些插入花费O(N) 时间)之间的平衡。

    • 调用时机为新增或删除过期元素时调用
    • 对数扫描的意思是扫描次数为对数次,即时间复杂度为O(logN)级别(可以类比二分查找算法,每次除以二直到为0)
    • 以一种平衡的扫描方式来清除过期 Entry
    • 如果扫描过程中没有发现一个过期 Entry,则会扫描 log2(n) 个元素
    • 第二个参数 n, 插入时调用是当前包含元素个数, 而 replaceStaleEntry 调用时是数组长度

    最后一句没怎么看明白

    all this could be changed to be either more or less aggressive 
    by weighting n instead of just using straight log n. 
    But this version is simple, fast, and seems to work well.
    

    最后的结论是当前这种方式简单、快速并且看上去运行良好。

    代码实现其实挺简单

    1. 传入的i是一个已知非过期的 Entry,所以扫描是从i的下一个下标开始的
    2. 判断是否过期,执行log2(N) 次没有发现过期元素时结束,返回 false
    3. 如果扫描过程中存在过期元素,那么最后会返回 true,并且每发现一次都会
      3.1 将 n 置为数组长度继续循环
      3.2 调用 expungeStaleEntry 方法

    7. resize方法解析

    两倍扩容

    /**
     * Double the capacity of the table.
     */
    private void resize() {
        Entry[] oldTab = table;
        int oldLen = oldTab.length;
        int newLen = oldLen * 2;
        Entry[] newTab = new Entry[newLen];
        int count = 0;
    
        for (int j = 0; j < oldLen; ++j) {
            Entry e = oldTab[j];
            if (e != null) {
                ThreadLocal<?> k = e.get();
                if (k == null) {
                    e.value = null; // Help the GC
                } else {
                    int h = k.threadLocalHashCode & (newLen - 1);
                    while (newTab[h] != null)
                        h = nextIndex(h, newLen);
                    newTab[h] = e;
                    count++;
                }
            }
        }
    
        setThreshold(newLen);
        size = count;
        table = newTab;
    }
    
    1. 新数组长度为原数组两倍
    2. 扩容期间会回收过期 Entry
    3. 遍历旧数组,计算在新数组中的下标并放入
    4. 设置阈值、容量

    看过 hashmap 实现的应该要想起 hashmap 扩容时的头插法(死链)与尾插法(维持数据顺序)

    8. replaceStaleEntry 方法解析

    这个方法应该是 ThreadLocal 类中最复杂的方法了,难顶~~

    /**
     * Replace a stale entry encountered during a set operation
     * with an entry for the specified key.  The value passed in
     * the value parameter is stored in the entry, whether or not
     * an entry already exists for the specified key.
     *
     * As a side effect, this method expunges all stale entries in the
     * "run" containing the stale entry.  (A run is a sequence of entries
     * between two null slots.)
     *
     * @param  key the key
     * @param  value the value to be associated with key
     * @param  staleSlot index of the first stale entry encountered while
     *         searching for key.
     */
    private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                                   int staleSlot) {
        Entry[] tab = table;
        int len = tab.length;
        Entry e;
    
        // Back up to check for prior stale entry in current run.
        // We clean out whole runs at a time to avoid continual
        // incremental rehashing due to garbage collector freeing
        // up refs in bunches (i.e., whenever the collector runs).
        int slotToExpunge = staleSlot;
        for (int i = prevIndex(staleSlot, len);
             (e = tab[i]) != null;
             i = prevIndex(i, len))
            if (e.get() == null)
                slotToExpunge = i;
    
        // Find either the key or trailing null slot of run, whichever
        // occurs first
        for (int i = nextIndex(staleSlot, len);
             (e = tab[i]) != null;
             i = nextIndex(i, len)) {
            ThreadLocal<?> k = e.get();
    
            // If we find key, then we need to swap it
            // with the stale entry to maintain hash table order.
            // The newly stale slot, or any other stale slot
            // encountered above it, can then be sent to expungeStaleEntry
            // to remove or rehash all of the other entries in run.
            if (k == key) {
                e.value = value;
    
                tab[i] = tab[staleSlot];
                tab[staleSlot] = e;
    
                // Start expunge at preceding stale entry if it exists
                if (slotToExpunge == staleSlot)
                    slotToExpunge = i;
                cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
                return;
            }
    
            // If we didn't find stale entry on backward scan, the
            // first stale entry seen while scanning for key is the
            // first still present in the run.
            if (k == null && slotToExpunge == staleSlot)
                slotToExpunge = i;
        }
    
        // If key not found, put new entry in stale slot
        tab[staleSlot].value = null;
        tab[staleSlot] = new Entry(key, value);
    
        // If there are any other stale entries in run, expunge them
        if (slotToExpunge != staleSlot)
            cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
    }
    

    首先看下这个方法的唯一调用是在set方法中,伪代码如下

    private void set(ThreadLocal<?> key, Object value) {
        int i = key.threadLocalHashCode & (len-1);
        if (tab[i] != null && tab[i].get() == null) {
            replaceStaleEntry(key, value, i);
            ...
        }
    }
    

    也就是说塞值时碰到 hash 冲突,然后在定址过程中碰到的第一个过期 Entry 会调用 replaceStaleEntry 方法并结束此次塞值,三个参数分别是

    • key 传入 ThreadLocal
    • value 传入对象
    • i hash 冲突定址时碰到的第一个过期Entry的下标

    介于这个方法的复杂程度,我们还是先把这个方法的注释仔细过一遍

    Replace a stale entry encountered during a set operation with an entry for the specified key. The value passed in the value parameter is stored in the entry, whether or not an entry already exists for the specified key.

    在设值过程中,使用给定 key 的 Entry 替换碰到的第一个过期 Entry。不管指定 key 是否存在,替换 Entry 的 value 都使用入参的 value 参数。

    As a side effect, this method expunges all stale entries in the "run" containing the stale entry. (A run is a sequence of entries between two null slots.)

    同时,这个方法会清除“run”中包含的全部过期 Entry。(“run” 就是两个空槽之间的一系列 Entry)

    简单总结一下注释:

    1. 此方法中有一个名词叫 “run”,这个“run”代表一个 Entry 序列,此序列的前后两个 Entry 都是 null,也就是说“run”是两个空槽之间的序列


      run

    看到这里,expungeStaleEntry 方法不就是处理了一整个 run 中的过期 Entry 吗?传入的过期 Entry 被清除了,这就是 run 左边边界,然后往右遍历,结束条件是碰到了下一个 Null Entry,这不就是 run 右边边界吗?所以 expungeStaleEntry 方法实际是对以当前过期 Entry 为起点的 run 中所有 Entry 进行了清除过期 Entry 与 rehash 非过期 Entry 两步操作。(多读两遍~~)

    1. 这个方法会找到包含传入过期 Entry 的一个"run",然后进行一些操作

    这里要分清楚空槽(null slot)与过期 Entry (stale entry)的区别

    null VS stale

    下面看下实现

    1.先找 run “左边”第一个过期的 Entry

    // If we find key, then we need to swap it
    // with the stale entry to maintain hash table order.
    // The newly stale slot, or any other stale slot
    // encountered above it, can then be sent to expungeStaleEntry
    // to remove or rehash all of the other entries in run.
    int slotToExpunge = staleSlot;
    for (int i = prevIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = prevIndex(i, len))
        if (e.get() == null)
            slotToExpunge = i;
    
    • 往"左边"遍历,设置 slotToExpunge 表示当前 run 中左边开始第一个过期 Entry 的下标
    • 注释
      Back up to check for prior stale entry in current run. We clean out whole runs at a time to avoid continual incremental rehashing due to garbage collector freeing up refs in bunches (i.e., whenever the collector runs).
      往前检查当前"run"中是否有过期的 Entry。 我们会一次清理整个 "run",以避免由于垃圾收集器释放成堆的引用(即每当收集器运行时)而导致的连续增量重新哈希。

    2. 找完了左边要找右边的了

    // 1. 当前过期槽点往后遍历,直到当前 run 的边界
    for (int i = nextIndex(staleSlot, len);
         (e = tab[i]) != null;
         i = nextIndex(i, len)) {
        ThreadLocal<?> k = e.get();
    
        // 2 找到了目标 key
        if (k == key) {
            // 2.1 设置 value值
            e.value = value;
    
            // 2.2 使用目标 value 替换过期 Entry,tab[i] 此时代表一个过期 Entry
            tab[i] = tab[staleSlot];
            tab[staleSlot] = e;
    
            // 2.3 如果 slotToExpunge 与 staleSlot 相等, 说明在当前run中, 过期槽点左边的Entry不存在过期的
            // 这时就可以从右边第一个过期槽点开始清理操作, 也就是 i 了
            if (slotToExpunge == staleSlot)
                slotToExpunge = i;
            // 2.4 以 slotToExpunge 执行 expungeStaleEntry 返回的是 run 的右边边界 Entry (即第一个为null 的 Entry 下标)
            // 2.5 再执行 cleanSomeSlots 回收
            cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
            return;
        }
    
        // 3 如果 key 为 null 而左边又没有找到则替换左边过期Entry下标
        // 需要注意的是, 这个替换只会执行一次, 也就是 slotToExpunge == staleSlot 只会满足一次
        if (k == null && slotToExpunge == staleSlot)
            slotToExpunge = i;
    }
    

    这里再重复一遍 slotToExpunge 的含义:左边开始第一个过期 Entry 的下标

    流程

    1. 当前过期槽点往后遍历,直到当前 run 的右边边界
    2. 找到了目标 key
      2.1 设置 value 值
      2.2 使用目标 value 替换过期 Entry,tab[i] 此时代表一个过期 Entry
      2.3 如果 slotToExpunge 与 staleSlot 相等, 说明在当前 run 中, 过期槽点左边的 Entry 不存在过期的,这时就可以从右边第一个过期槽点开始清理操作, 也就是 i 了
      2.4 以 slotToExpunge 执行 expungeStaleEntry 返回的是 run 的右边边界 Entry (即第一个为null 的 Entry 下标)
      2.5 再执行 cleanSomeSlots 回收
    3. 如果 key 为 null 而左边又没有找到过期 Entry 则替换左边过期 Entry 下标。需要注意的是, 这个替换只会执行一次, 也就是 slotToExpunge == staleSlot 只会满足一次

    3. 没有找到 key 的情况下就直接使用过期 Entry 作为新的 Entry

    tab[staleSlot].value = null;
    tab[staleSlot] = new Entry(key, value);
    

    这里有个判断 slotToExpunge != staleSlot,我们发现只有 slotToExpunge 值从头到尾没有动过才满足,因为其初始值就是 staleSlot,需要满足两个条件

    • 当前 run 中 staleSlot 左边没有过期 Entry
    • 当前 run 中 staleSlot 右边没有过期 Entry

    总结一下就是整个 run 中只有一个 staleSlot 是过期的,那么已经被赋予了当前 K、V 这里就没有需要清除的过期 Entry 了。

    那么如果满足条件则存在除 staleSlot 以外的过期 Entry,则以 slotToExpunge 开始往右边进行清理。

    小结

    1. replaceStaleEntry 方法中有一个 run 的概念要理清楚
    2. 在整个方法过程中会清理掉整个 run 中的所有过期 Entry,而不是整个 tab
    3. slotToExpunge 始终代表当前 run 中的第一个 过期 Entry,想明白这点就能理解其每次赋值操作所需满足的条件

    9. 再谈 set 方法

    前面谈 set 方法还有最后一部分没讲,执行到最后这部分有两种情况

    • 第一次定址直接就是 null
    • 第一次定址直接不是 null ,而遍历找到下一个 null Entry 之前没有碰到过期 Entry
    tab[i] = new Entry(key, value);
    int sz = ++size;
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
        rehash();
    
    private void rehash() {
        expungeStaleEntries();
    
        // Use lower threshold for doubling to avoid hysteresis
        if (size >= threshold - threshold / 4)
            resize();
    }
    

    不管哪种情况,都需要新增一个 Entry,新增就可能导致容量不够需要扩容,这里在实际扩容之前先执行一波回收,如果有回收成功的动作则不用扩容,很简单这里只新增了一个 Entry,而回收则是一个或一个以上。

    还有一个 sz >= threshold 条件需要满足才能进入 rehash 方法,这只是一个初始条件,注意这里的 threshold 是 len * 2 / 3 (Set the resize threshold to maintain at worst a 2/3 load factor.)

    然后 rehash 方法中会先调用 expungeStaleEntries 方法,这个方法就是循环整个数组,逐个调用 expungeStaleEntry 来清除过期 Entry 并尝试 rehash 非空 Entry

    只有这里是清理整个 ThreadLocalMap 中全部的过期 Entry,其他的两个都是清理当前 run 中的过期 Entry

    整体清理后的判断就不是直接与 threshold 比了,而是threshold - threshold / 4(Use lower threshold for doubling to avoid hysteresis,用更低的阈值来判断是否加倍,避免数据过多(数组太小)导致的插入凝滞( hash 冲突概率增加)),也就是数组长度的一半,也就是说如果全部清理过一遍后的 Entry 个数已经超过了数组长度的一半才会进行扩容。

    10. 总结

    1. ThreadLocal 的操作实际都转换为了 Thread 对象内部的 ThreadLocalMap 的方法调用,这个 ThreadLocalMap 是 ThreadLocal 的内部类,属于 K-V 型数据结构,其中 K 是指向 ThreadLocal 的弱引用,V 是存入的对象
    2. ThreadLocal 天然线程安全,并且可以在线程人任意地方进行存取
    3. ThreadLocalMap 采用的开放定址法来处理 hash 冲突,其 hash 值的取值用了一个魔术数字 0x61c88647,以减少 hash 冲突
    4. ThreadLocalMap 中有个很重要的概念是 run,表示一段 Entry 的序列,此序列中没有 Null Entry(可能有 Stale Entry),而此序列的前后两个 Entry 均为 null
    5. ThreadLocalMap 中最重要的两个方法是 expungeStaleEntry 和 replaceStaleEntry,这俩方法都可以清除当前 run 中的全部过期 Entry

    如切如磋,如琢如磨。

    相关文章

      网友评论

        本文标题:死磕源码系列 - ThreadLocal

        本文链接:https://www.haomeiwen.com/subject/mnhaghtx.html