美文网首页
FastThreadLocal源码分析

FastThreadLocal源码分析

作者: 进击的蚂蚁zzzliu | 来源:发表于2021-01-25 10:23 被阅读0次

    概述

    Netty源码分析时经常看到 FastThreadLocal 的身影,Netty 作为高性能的网络通信框架,FastThreadLocal 是比 JDK 自身的 ThreadLocal 性能更高的通信框架。FastThreadLocal 到底比 ThreadLocal 快在哪里呢?

    一、JDK ThreadLocal

    ThreadLocal.png(勘误:图中虚线是弱引用不是虚引用)

    源码分析参考上一节,这里只做几点总结:

    • 每个Thread有一个ThreadLocalMap
    • ThreadLocal只是一个媒介,连接Thread与ThreadLocalMap
    • ThreadLocalMap数组中每个节点是一个Entry(ThreadLocal -> value)
    • 一个Thread可以有多个ThreadLocal但是只有一个ThreadLocalMap
    • threadLocal.set时是把当前Thread的当前threadLocal作为key构造Entry放到ThreadLocalMap
    • 当一个Thread有多个ThreadLocal时ThreadLocalMap数组中就有可能会出现冲突
    • threadLocal.get时是从当前Thread的ThreadLocalMap中获取

    内存泄漏
    Entry 的 key 设计成了弱引用,但是当 ThreadLocal 不再使用被 GC 回收后,ThreadLocalMap 中可能出现 Entry 的 key 为 NULL,那么 Entry 的 value 一直会强引用数据而得不到释放,只能等待线程销毁。那么应该如何避免 ThreadLocalMap 内存泄漏呢?ThreadLocal 已经帮助我们做了一定的保护措施,在执行 ThreadLocal.set()/get() 方法时,ThreadLocal 会清除 ThreadLocalMap 中 key 为 NULL 的 Entry 对象,让它还能够被 GC 回收。除此之外,当线程中某个 ThreadLocal 对象不再使用时,立即调用 remove() 方法删除 Entry 对象。如果是在异常的场景中,记得在 finally 代码块中进行清理,保持良好的编码意识。

    二、Netty FastThreadLocal

    FastThreadLocal.png
    对比ThreadLocal结构图可以看出几点不同:
    1.InternalThreadLocalMap中并不是Entry的key-value结构,直接就是一个Object数组;其中索引0位置存放FastThreadLocal的Set集合,其他索引位置初始化为UNSET,有数据存入时更新为具体的Object;
    2.FastThreadLocal中包含一个自增的index表示在InternalThreadLocalMap的数组中的索引位置;这样就不用像JDK ThreadLocal一样通过拉链法解决冲突;
    3.Set<FastThreadLocal<?>>结构中存放FastThreadLocal的引用,就不用像JDK ThreadLocal中通过Entry key的虚引用指向堆种FastThreadLocal对象,更容易解决内存泄漏的问题;

    三、FastThreadLocal源码分析

    首先看下 FastThreadLocal.set() 的源码:

    public final void set(V value) {
        if (value != InternalThreadLocalMap.UNSET) { // 1. value 是否为缺省值
            InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); // 2. 获取当前线程的 InternalThreadLocalMap
            setKnownNotUnset(threadLocalMap, value); // 3. 将 InternalThreadLocalMap 中数据替换为新的 value
        } else {
            remove();
        }
    }
    

    FastThreadLocal.set() 方法虽然入口只有几行代码,但是内部逻辑是相当复杂的。我们首先还是抓住代码主干,一步步进行拆解分析。set() 的过程主要分为三步:
    1.判断 value 是否为缺省值,如果等于缺省值,那么直接调用 remove() 方法。这里我们还不知道缺省值和 remove() 之间的联系是什么,我们暂且把 remove() 放在最后分析。
    2.如果 value 不等于缺省值,接下来会获取当前线程的 InternalThreadLocalMap。
    3.然后将 InternalThreadLocalMap 中对应数据替换为新的 value。

    首先我们看下 InternalThreadLocalMap.get() 方法,源码如下:

    public static InternalThreadLocalMap get() {
        Thread thread = Thread.currentThread();
        if (thread instanceof FastThreadLocalThread) { // 当前线程是否为 FastThreadLocalThread 类型
            return fastGet((FastThreadLocalThread) thread);
        } else {
            return slowGet();
        }
    }
    private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
        InternalThreadLocalMap threadLocalMap = thread.threadLocalMap(); // 获取 FastThreadLocalThread 的 threadLocalMap 属性
        if (threadLocalMap == null) {
            thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
        }
        return threadLocalMap;
    }
    private static InternalThreadLocalMap slowGet() {
        ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap; 
        InternalThreadLocalMap ret = slowThreadLocalMap.get(); // 从 JDK 原生 ThreadLocal 中获取 InternalThreadLocalMap
        if (ret == null) {
            ret = new InternalThreadLocalMap();
            slowThreadLocalMap.set(ret);
        }
        return ret;
    }
    

    如果当前线程是 FastThreadLocalThread 类型,那么直接通过 fastGet() 方法获取 FastThreadLocalThread 的 threadLocalMap 属性即可。如果此时 InternalThreadLocalMap 不存在,直接创建一个返回。关于 InternalThreadLocalMap 的初始化在上文中已经介绍过,它会初始化一个长度为 32 的 Object 数组,数组中填充着 32 个缺省对象 UNSET 的引用。

    那么 slowGet() 又是什么作用呢?从代码分支来看,slowGet() 是针对非 FastThreadLocalThread 类型的线程发起调用时的一种兜底方案。如果当前线程不是 FastThreadLocalThread,内部是没有 InternalThreadLocalMap 属性的,Netty 在 UnpaddedInternalThreadLocalMap 中保存了一个 JDK 原生的 ThreadLocal,ThreadLocal 中存放着 InternalThreadLocalMap,此时获取 InternalThreadLocalMap 就退化成 JDK 原生的 ThreadLocal 获取。

    获取 InternalThreadLocalMap 的过程已经讲完了,下面看下 setKnownNotUnset() 如何将数据添加到 InternalThreadLocalMap 的。

    private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
        if (threadLocalMap.setIndexedVariable(index, value)) { // 1. 找到数组下标 index 位置,设置新的 value
            addToVariablesToRemove(threadLocalMap, this); // 2. 将 FastThreadLocal 对象保存到待清理的 Set 中
        }
    }
    

    setKnownNotUnset() 主要做了两件事:
    1.找到数组下标 index 位置,设置新的 value。
    2.将 FastThreadLocal 对象保存到待清理的 Set 中。

    首先我们看下第一步 threadLocalMap.setIndexedVariable() 的源码实现:

    public boolean setIndexedVariable(int index, Object value) {
        Object[] lookup = indexedVariables;
        if (index < lookup.length) {
            Object oldValue = lookup[index]; 
            lookup[index] = value; // 直接将数组 index 位置设置为 value,时间复杂度为 O(1)
            return oldValue == UNSET;
        } else {
            expandIndexedVariableTableAndSet(index, value); // 容量不够,先扩容再设置值
            return true;
        }
    }
    

    indexedVariables 就是 InternalThreadLocalMap 中用于存放数据的数组,如果数组容量大于 FastThreadLocal 的 index 索引,那么直接找到数组下标 index 位置将新 value 设置进去,事件复杂度为 O(1)。在设置新的 value 之前,会将之前 index 位置的元素取出,如果旧的元素还是 UNSET 缺省对象,那么返回成功。

    如果数组容量不够了怎么办呢?InternalThreadLocalMap 会自动扩容,然后再设置 value。接下来看看 expandIndexedVariableTableAndSet() 的扩容逻辑:

    private void expandIndexedVariableTableAndSet(int index, Object value) {
        Object[] oldArray = indexedVariables;
        final int oldCapacity = oldArray.length;
        int newCapacity = index;
        newCapacity |= newCapacity >>>  1;
        newCapacity |= newCapacity >>>  2;
        newCapacity |= newCapacity >>>  4;
        newCapacity |= newCapacity >>>  8;
        newCapacity |= newCapacity >>> 16;
        newCapacity ++;
        Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
        Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
        newArray[index] = value;
        indexedVariables = newArray;
    }
    

    InternalThreadLocalMap 以 index 为基准进行扩容,将数组扩容后的容量向上取整为 2 的次幂。然后将原数组内容拷贝到新的数组中,空余部分填充缺省对象 UNSET,最终把新数组赋值给 indexedVariables。为什么 InternalThreadLocalMap 以 index 为基准进行扩容,而不是原数组长度呢?假设现在初始化了 70 个 FastThreadLocal,但是这些 FastThreadLocal 从来没有调用过 set() 方法,此时数组还是默认长度 32。当第 index = 70 的 FastThreadLocal 调用 set() 方法时,如果按原数组容量 32 进行扩容 2 倍后,还是无法填充 index = 70 的数据。所以使用 index 为基准进行扩容可以解决这个问题,但是如果 FastThreadLocal 特别多,数组的长度也是非常大的。

    回到 setKnownNotUnset() 的主流程,向 InternalThreadLocalMap 添加完数据之后,接下就是将 FastThreadLocal 对象保存到待清理的 Set 中。我们继续看下 addToVariablesToRemove() 是如何实现的。

    private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); // 获取数组下标为 0 的元素
        Set<FastThreadLocal<?>> variablesToRemove;
        if (v == InternalThreadLocalMap.UNSET || v == null) {
            variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>()); // 创建 FastThreadLocal 类型的 Set 集合
            threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove); // 将 Set 集合填充到数组下标 0 的位置
        } else {
            variablesToRemove = (Set<FastThreadLocal<?>>) v; // 如果不是 UNSET,Set 集合已存在,直接强转获得 Set 集合
        }
        variablesToRemove.add(variable); // 将 FastThreadLocal 添加到 Set 集合中
    }
    

    variablesToRemoveIndex 是采用 static final 修饰的变量,在 FastThreadLocal 初始化时 variablesToRemoveIndex 被赋值为 0。InternalThreadLocalMap 首先会找到数组下标为 0 的元素,如果该元素是缺省对象 UNSET 或者不存在,那么会创建一个 FastThreadLocal 类型的 Set 集合,然后把 Set 集合填充到数组下标 0 的位置。如果数组第一个元素不是缺省对象 UNSET,说明 Set 集合已经被填充,直接强转获得 Set 集合即可。这就解释了 InternalThreadLocalMap 的 value 数据为什么是从下标为 1 的位置开始存储了,因为 0 的位置已经被 Set 集合占用了。

    为什么 InternalThreadLocalMap 要在数组下标为 0 的位置存放一个 FastThreadLocal 类型的 Set 集合呢?这时候我们回过头看下 remove() 方法。

    public final void remove() {
        remove(InternalThreadLocalMap.getIfSet());
    }
    public static InternalThreadLocalMap getIfSet() {
        Thread thread = Thread.currentThread();
        if (thread instanceof FastThreadLocalThread) {
            return ((FastThreadLocalThread) thread).threadLocalMap();
        }
        return slowThreadLocalMap.get();
    }
    public final void remove(InternalThreadLocalMap threadLocalMap) {
        if (threadLocalMap == null) {
            return;
        }
        Object v = threadLocalMap.removeIndexedVariable(index); // 删除数组下标 index 位置对应的 value
        removeFromVariablesToRemove(threadLocalMap, this); // 从数组下标 0 的位置取出 Set 集合,并删除当前 FastThreadLocal
        if (v != InternalThreadLocalMap.UNSET) {
            try {
                onRemoval((V) v); // 空方法,用户可以继承实现
            } catch (Exception e) {
                PlatformDependent.throwException(e);
            }
        }
    }
    

    在执行 remove 操作之前,会调用 InternalThreadLocalMap.getIfSet() 获取当前 InternalThreadLocalMap。有了之前的基础,理解 getIfSet() 方法就非常简单了,如果是 FastThreadLocalThread 类型,直接取 FastThreadLocalThread 中 threadLocalMap 属性。如果是普通线程 Thread,从 ThreadLocal 类型的 slowThreadLocalMap 中获取。

    找到 InternalThreadLocalMap 之后,InternalThreadLocalMap 会从数组中定位到下标 index 位置的元素,并将 index 位置的元素覆盖为缺省对象 UNSET。接下来就需要清理当前的 FastThreadLocal 对象,此时 Set 集合就派上了用场,InternalThreadLocalMap 会取出数组下标 0 位置的 Set 集合,然后删除当前 FastThreadLocal。最后 onRemoval() 方法起到什么作用呢?Netty 只是留了一处扩展,并没有实现,用户需要在删除的时候做一些后置操作,可以继承 FastThreadLocal 实现该方法。

    至此,FastThreadLocal.set() 的完成过程已经讲完了,接下来我们继续 FastThreadLocal.get() 方法的实现就易如反掌拉。FastThreadLocal.get() 的源码实现如下:

    public final V get() {
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
        Object v = threadLocalMap.indexedVariable(index); // 从数组中取出 index 位置的元素
        if (v != InternalThreadLocalMap.UNSET) {
            return (V) v;
        }
        return initialize(threadLocalMap); // 如果获取到的数组元素是缺省对象,执行初始化操作
    }
    public Object indexedVariable(int index) {
        Object[] lookup = indexedVariables;
        return index < lookup.length? lookup[index] : UNSET;
    }
    private V initialize(InternalThreadLocalMap threadLocalMap) {
        V v = null;
        try {
            v = initialValue();
        } catch (Exception e) {
            PlatformDependent.throwException(e);
        }
        threadLocalMap.setIndexedVariable(index, v);
        addToVariablesToRemove(threadLocalMap, this);
        return v;
    }
    

    首先根据当前线程是否是 FastThreadLocalThread 类型找到 InternalThreadLocalMap,然后取出从数组下标 index 的元素,如果 index 位置的元素不是缺省对象 UNSET,说明该位置已经填充过数据,直接取出返回即可。如果 index 位置的元素是缺省对象 UNSET,那么需要执行初始化操作。可以看到,initialize() 方法会调用用户重写的 initialValue 方法构造需要存储的对象数据,如下所示。

    private final FastThreadLocal<String> threadLocal = new FastThreadLocal<String>() {
        @Override
        protected String initialValue() {
            return "hello world";
        }
    };
    

    构造完用户对象数据之后,接下来就会将它填充到数组 index 的位置,然后再把当前 FastThreadLocal 对象保存到待清理的 Set 中。

    Set 集合是为了保存 FastThreadLocal对象,Set 集合设计的好处有几点:1. 删除 FastThreadLocal 留扩展接口;2. 提高 removeAll 的删除效率,不需要去遍历膨胀的数组;3. 可以更好地做内存泄露的管理;

    小结

    简单总结下 FastThreadLocal 的优势:
    1. 高效查找 FastThreadLocal 在定位数据的时候可以直接根据数组下标 index 获取,时间复杂度 O(1)。而 JDK 原生的 ThreadLocal 在数据较多时哈希表很容易发生 Hash 冲突,线性探测法在解决 Hash 冲突时需要不停地向下寻找,效率较低。此外,FastThreadLocal 相比 ThreadLocal 数据扩容更加简单高效,FastThreadLocal 以 index 为基准向上取整到 2 的次幂作为扩容后容量,然后把原数据拷贝到新数组。而 ThreadLocal 由于采用的哈希表,所以在扩容后需要再做一轮 rehash。
    2. 安全性更高 JDK 原生的 ThreadLocal 使用不当可能造成内存泄漏,只能等待线程销毁。在使用线程池的场景下,ThreadLocal 只能通过主动检测的方式防止内存泄漏,从而造成了一定的开销。然而 FastThreadLocal 不仅提供了 remove() 主动清除对象的方法,而且在线程池场景中 Netty 还封装了 FastThreadLocalRunnable,FastThreadLocalRunnable 最后会执行 FastThreadLocal.removeAll() 将 Set 集合中所有 FastThreadLocal 对象都清理掉。

    劣势:
    1. 空间占用大 FastThreadLocal 采用的空间换时间的思路,InternalThreadLocalMap中数组只会扩容,无法缩容,一个线程中有极多FastThreadLocal时需要评估oom的风险;
    2. 对Thread的要求只有使用FastThreadLocalThread 类型的线程才会更快,如果是普通线程反而会更慢;

    相关文章

      网友评论

          本文标题:FastThreadLocal源码分析

          本文链接:https://www.haomeiwen.com/subject/mtdbzktx.html