美文网首页java并发源码解读nettybugstac...
Netty源码分析2 - ThreadLocal 源码解析

Netty源码分析2 - ThreadLocal 源码解析

作者: 原水寒 | 来源:发表于2018-07-29 10:53 被阅读237次
netty新增了FastThreadLocal和FastThreadLocalThread等类来实现了一个FastThread框架,相较于java.lang.Thread和java.lang.ThreadLocal速度更快。
在看FastThread框架之前,先看一下java.lang.ThreadLocal的源码。
  • 一、使用姿势
  • 二、数据结构
  • 三、源码分析
  • 四、回收机制
  • 总结

一、使用姿势

public class ThreadLocalTest {
    /**
     * 最佳实践一:使用private static
     * {@code ThreadLocal} instances are typically private static fields in classes
     * that wish to associate state with a thread (e.g., a user ID or Transaction ID)
     *
     * java8写法:
     * private static final ThreadLocal<Integer> integerThreadLocal = ThreadLocal.withInitial(() -> 100);
     *
     * java8之前的写法:
     * <pre>
     *     private static final ThreadLocal<Integer> integerThreadLocal = new ThreadLocal<Integer>() {
     *          @Override
     *          protected Integer initialValue() {
     *              return 100;
     *          }
     *      };
     * </pre>
     */
    private static final ThreadLocal<Integer> integerThreadLocal = new ThreadLocal<Integer>() {
        /**
         * 最佳实践二:重写initialValue()
         */
        @Override
        protected Integer initialValue() {
            return 100;
        }
    };

    @Test
    public void testThreadLocal() {
        System.out.println(integerThreadLocal.get());
        /**
         * 最佳实践三:用完如果可以删除,则手动删除
         */
        integerThreadLocal.remove();
        System.out.println(integerThreadLocal.get());
    }
}

最佳实践

  • 在类中定义ThreadLocal,用private static修饰;
  • 根据源码分析,由于ThreadLocal对象本身会作为Entry的key去获取数据,所以最好也要用final去修饰。key通常都是不可变对象,否则当作为key的对象发生了变化之后,之前存储的数据将无法get,造成资源泄露;
  • 使用匿名内部类实现initialValue(),在执行get()时,如果当前的Thread的ThreadLocalMap没有integerThreadLocal这个key的Entry(不管是第一次get还是remove之后的第一次get),则直接使用initialValue()进行初始化操作;
  • 在使用完ThreadLocal后,及时手动remove()无用的ThreadLocal,防止资源泄露

二、数据结构

threadlocal.png

说明

  • 每一个Thread类都有一个属性:ThreadLocal.ThreadLocalMap threadLocals = null
  • 每一个ThreadLocalMap都有一个Entry[]数组
  • 每一个Entry继承于WeakReference,Entry的key(ThreadLocal实例)作为WeakReference的referent,传入的value作为value

java引用:

注意

  • 从第三条说明点发现每一个ThreadLocal只能存储一个值,对于dubbo的RpcContext.LOCAL来讲,存储了一个RpcContext上下文信息;如果需要存储多个值,根据情况,将多个值合并为对象进行存储或者使用多个ThreadLocal实例进行存储

三、源码分析

3.1、ThreadLocal的创建

基础属性

    /**
     * 每一个ThreadLocal对象都有一个唯一的threadLocalHashCode。
     * ThreadLocal对象会作为ThreadLocalMap的Entry的key     
     */
    private final int threadLocalHashCode = nextHashCode();

    private static AtomicInteger nextHashCode = new AtomicInteger();

    /**
     * 魔数:与斐波那契数列和黄金分割点有关。用于散列均匀,减少hash冲突
     */
    private static final int HASH_INCREMENT = 0x61c88647;

    private static int nextHashCode() {
        return nextHashCode.getAndAdd(HASH_INCREMENT);
    }

说明

  • 每一个ThreadLocal对象都有一个唯一的threadLocalHashCode,该值用于计算Entry元素存储的直接索引。
    • 直接索引:元素根据threadLocalHashCode&(table.len-1)算出来的值
    • 真实索引:元素可能由于hash冲突,使用线性探测法,放置在不是直接索引的位置
  • hash算法:每一个ThreadLocal的threadLocalHashCode都是前一个的threadLocalHashCode + 0x61c88647。这样的hash算法可以很大程度的避免hash冲突,散列很均匀。0x61c88647这个数是怎么算出来的,见:https://www.javaspecialists.eu/archive/Issue164.html

普通构造函数

    public ThreadLocal() {
    }

java8提供的构造函数

    public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) {
        return new SuppliedThreadLocal<>(supplier);
    }
    
    static final class SuppliedThreadLocal<T> extends ThreadLocal<T> {

        private final Supplier<? extends T> supplier;

        SuppliedThreadLocal(Supplier<? extends T> supplier) {
            this.supplier = Objects.requireNonNull(supplier);
        }

        @Override
        protected T initialValue() {
            return supplier.get();
        }
    }

java8提供了Supplier(提供者) FunctionalInterface。基于该接口可以方便的替代掉示例中的使用匿名内部类来重写initialValue()的方法。需要注意的是,supplier函数不可为null。

@FunctionalInterface
public interface Supplier<T> {

    /**
     * Gets a result.
     */
    T get();
}

3.2 数据的初始化与设置

    /**
     * 设置初始值
     * 与set(T value)几乎相同,下边代码等价于
     * <pre>
     *     private T setInitialValue() {
     *          T value = initialValue();
     *          set(value);
     *          return value;
     *     }
     * </pre>
     */
    private T setInitialValue() {
        // 获取初始值(通常是调用使用方重写的initialValue(),否则使用{@link ThreadLocal#initialValue()返回null})
        T value = initialValue();
        // 获取当前线程
        Thread t = Thread.currentThread();
        // 获取当前线程的ThreadLocalMap对象
        ThreadLocalMap map = getMap(t);
        // 如果当前线程的ThreadLocalMap对象不为null,则直接将{当前的ThreadLocal对象,value}设置到ThreadLocalMap对象中去
        if (map != null)
            map.set(this, value);
        // 如果当前线程的ThreadLocalMap为null,则直接新建ThreadLocalMap对象,并将{当前的ThreadLocal对象,value}设置到ThreadLocalMap对象中去
        else
            createMap(t, value);
        // 返回初始值
        return value;
    }
    
    protected T initialValue() {
        return null;
    }
    
    /**
     * 设置值
     */
    public void set(T value) {
        // 获取当前线程
        Thread t = Thread.currentThread();
        // 获取当前线程的ThreadLocalMap对象
        ThreadLocalMap map = getMap(t);
        // 如果当前线程的ThreadLocalMap对象不为null,则直接将{当前的ThreadLocal对象,value}设置到ThreadLocalMap对象中去
        if (map != null)
            map.set(this, value);
        // 如果当前线程的ThreadLocalMap为null,则直接新建ThreadLocalMap对象,并将{当前的ThreadLocal对象,value}设置到ThreadLocalMap对象中去
        else
            createMap(t, value);
    }
    
    /**
     * 获取当前线程t的ThreadLocalMap
     */
    ThreadLocalMap getMap(Thread t) {
        return t.threadLocals;
    }
    
    /**
     * 创建ThreadLocalMap,并将 {当前的ThreadLocal对象,firstValue} 设置到ThreadLocalMap对象中去
     * this:当前的ThreadLocal对象
     */
    void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }

setInitialValue()流程:

  • 首先调用initialValue()获取初始值(通常是调用使用方重写的initialValue(),否则返回null)
  • 然后获取当前线程
  • 然后获取当前线程的ThreadLocalMap对象属性
  • 如果当前线程的ThreadLocalMap对象不为null,则直接将{当前的ThreadLocal对象,value}设置到ThreadLocalMap对象中去
  • 如果当前线程的ThreadLocalMap为null,则直接新建ThreadLocalMap对象,并将{当前的ThreadLocal对象,value}设置到ThreadLocalMap对象中去
  • 最后返回initialValue()获取到的初始值

setInitialValue()被使用的时机:ThreadLocal.get()。

关于ThreadLocalMap的创建和向ThreadLocalMap中设置参数的源码后续再说。

3.3 数据的获取

    public T get() {
        // 获取当前线程
        Thread t = Thread.currentThread();
        // 获取当前线程的ThreadLocalMap属性
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            // 从ThreadLocalMap属性中获取key为当前的ThreadLocal的Entry
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                // 如果Entry不为null,直接获取entry的value,返回即可
                @SuppressWarnings("unchecked")
                T result = (T) e.value;
                return result;
            }
        }
        // 如果当前线程的ThreadLocalMap为null或者key为当前的ThreadLocal的Entry为null
        return setInitialValue();
    }

get()流程:

  • 获取当前线程
  • 获取当前线程的ThreadLocalMap属性
  • 如果当前线程的ThreadLocalMap对象不为null,从ThreadLocalMap对象中获取key为当前的ThreadLocal的Entry
  • 如果该Entry不为null,直接获取entry的value,返回即可
  • 如果当前线程的ThreadLocalMap为null(还未createMap(Thread t, T firstValue))或者key为当前的ThreadLocal的Entry为null(没有调用map.set(this, value)),则执行设置初始化值的方法。

关于根据ThreadLocal从ThreadLocalMap中获取Entry的源码后续再说。

3.4 数据的删除

    public void remove() {
        ThreadLocalMap m = getMap(Thread.currentThread());
        if (m != null)
            m.remove(this);
    }

remove()步骤:

  • 获取当前的线程
  • 获取当前线程的ThreadLocalMap
  • 如果当前的ThreadLocalMap不为null,直接执行调用ThreadLocalMap的remove(ThreadLocal<?> key)

关于根据ThreadLocal从ThreadLocalMap中remove Entry的源码后续再说。

可以看到具体的操作全部委托到了ThreadLocalMap中来执行。

3.5 ThreadLocalMap的创建

基础属性

        /**
         * The initial capacity -- MUST be a power of two.
         */
        private static final int INITIAL_CAPACITY = 16;

        /**
         * The table, resized as necessary.
         * table.length MUST always be a power of two.
         */
        private Entry[] table;

        /**
         * The number of entries in the table.
         */
        private int size = 0;

        /**
         * threshold == table大小的2/3,
         * 1、当size >= threshold,遍历table并删除所有key为null的元素(rehash()),
         * 2、如果删除后size >= threshold*3/4 == 数组长度*1/2 时,需要对table进行扩容
         */
        private int threshold; // Default to 0

        private void setThreshold(int len) {
            threshold = len * 2 / 3;
        }

静态内部类Entry

        static class Entry extends WeakReference<ThreadLocal<?>> {
            Object value;
            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }

其中value就是set的value,而k是当前的ThreadLocal对象,作为WeakReference的referent属性。

构造器

        ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
            // 创建数组实例,默认长度16
            table = new Entry[INITIAL_CAPACITY];
            // 求余计算数组索引i
            int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
            // 第一次插入,没有hash冲突,直接放置
            table[i] = new Entry(firstKey, firstValue);
            // 数组实际元素长度 + 1
            size = 1;
            // 设置扩容的阈值为 16*2/3 = 10,当有10个元素时,发生扩容
            setThreshold(INITIAL_CAPACITY);
        }

步骤:

  • 创建数组实例,默认长度16
  • 计算firstKey的直接索引,当len=2的n次方时,x&(len-1) <=> x%len,所以这里相当于直接求余,只是&运算效率更高
  • 第一次插入,没有hash冲突,直接创建Entry并放置就ok了
  • 然后数组中元素个数size+1
  • 设置rehash()的阈值

索引操作

        /**
         * i:数组索引
         * len:数组长度
         * 如果 i+1<数组长度,则返回i+1, 如果 i+1>=数组长度,则直接取0(即开头table[0]),
         * 在实际使用中,实际上就是不断向后的循环取,到了末尾,再回到table[0],再向后循环取
         */
        private static int nextIndex(int i, int len) {
            return ((i + 1 < len) ? i + 1 : 0);
        }

        /**
         * 如果 i-1>=0,则返回i-1, 如果 i<1,则直接取数组长度-1(即末尾table[len-1]),
         * 在实际使用中,实际上就是不断向前的循环取,到了开头,再回到table[len-1],再向前循环取
         */
        private static int prevIndex(int i, int len) {
            return ((i - 1 >= 0) ? i - 1 : len - 1);
        }

从上述的逻辑可知,我们可以将table[]看做是一个循环数组。

3.6 向ThreadLocalMap中设置Entry

        /**
         * 设置 {key, value} 到当前线程的ThreadLocalMap中
         */
        private void set(ThreadLocal<?> key, Object value) {
            // 成员变量局部化,提高性能
            Entry[] tab = table;
            // 获取数组容量
            int len = tab.length;
            // 获取当前的threadlocal位于数组的下标,当len=2的n次方时,
            // key.threadLocalHashCode & (len - 1) <=> key.threadLocalHashCode % len
            int i = key.threadLocalHashCode & (len - 1);

            /**
             * 先获取tab[i],判断如果不为null,则进行循环体内逻辑;
             * 若没有return,则进行二次循环,tab[i+1],判断如果不为null,则进行循环体内逻辑;
             * 当 i + 1 >= len 时,tab[0]回到头再进行循环。
             *
             * 这就是解决hash冲突的另一种方式:线性探测法。如果当前槽没有元素,直接插入元素;如果当前槽有元素,则向后寻找第一个为null的槽,放置该元素。
             * 在这里,还添加了hash冲突时替换原有元素或者替换无效元素的逻辑。
             */
            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                // 获取当前的Entry的key,使用e.get(),是因为当前的key是一个WeakReference,使用get()获取key
                ThreadLocal<?> k = e.get();
                // 如果当前的key就是刚刚查询出来的Entry的key(即修改的是同一个Entry),则直接替换值
                if (k == key) {
                    e.value = value;
                    return;
                }
                // 如果刚刚查询出来的Entry的key是null,则表明该Entry的key(ThreadLocal)已经被回收了
                // 使用replaceStaleEntry替换掉已经无效的Entry
                if (k == null) {
                    replaceStaleEntry(key, value, i);
                    return;
                }
            }

            // 通过上述for循环解决了hash冲突之后,创建新的Entry,放置到当前的table[i]节点上
            tab[i] = new Entry(key, value);
            // table的size+1
            int sz = ++size;
            // 回收部分无效的Entry,如果成功,则当前的数组中的元素个数将小于阈值,则肯定不需要扩容,
            // 否则判断当前的数组中的元素个数是否达到了扩容阈值
            if (!cleanSomeSlots(i, sz) && sz >= threshold)
                rehash();
        }

说明:

  • 首先将table[]成员变量局部化(在后续的操作中如果会对table成员变量进行多次操作,局部化会提高性能)
  • 获取数组长度,计算直接索引
  • 然后进行hash冲突检测与处理(线性探测法)
    • 获取直接索引位置的Entry,如果不为null,表示发生了hash冲突,则先获取Entry中的key,如果该key与当前的ThreadLocal是同一个对象,则直接替换value,返回;
    • 如果Entry中的key==null,表示当前的Entry已经是一个无效的Entry了,执行replaceStaleEntry方法进行处理(如果从该Entry后的第一个元素开始在一个连续的Entry子数组内找到一个key与当前ThreadLocal相等的元素,则替换值,然后互换该Entry和当前的key==null的Entry位置;如果没找到,则将当前的key==null的Entry设置为新的将设置的Entry)返回;
    • 如果发生了hash冲突,但是直接索引位置上的Entry即不是当前的ThreadLocal的Entry,也不是一个无效的Entry,则需要向后循环遍历下一个Entry元素,再进行上述逻辑的处理。直到找到一个Entry为null的位置或者Entry失效的位置或者Entry就是当前的ThreadLocal的Entry的位置。
  • 假设没有发生hash冲突或者经过上述的for循环找到了一个Entry为null的位置的时候,在该位置创建设置新添加的Entry,数组元素+1;
  • 尝试回收部分无效的Entry,如果成功,则当前的数组中的元素个数将小于阈值,则肯定不需要rehash();如果失败,且数组元素个数 > threshold,执行rehash() -- 即执行部分资源清理与可能的扩容操作。

replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot)

        /**
         * 对一段连续的Entry子数组进行操作
         * Replace a stale entry encountered during a set operation
         * with an entry for the specified key.
         */
        private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) {
            // 获取当前的table及其容量
            Entry[] tab = table;
            int len = tab.length;
            Entry e;

            // 设置需要被回收的槽索引(无效的Entry索引)
            int slotToExpunge = staleSlot;
            /**
             * 向前遍历,直到找到第一个为null的槽
             * 在遍历的过程中,如果找到了无效的Entry,则将slotToExpunge设置为当前的被回收的无效索引
             */
            for (int i = prevIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = prevIndex(i, len))
                if (e.get() == null)
                    slotToExpunge = i;

            /**
             * 向后遍历,直到找到第一个为null的槽或者找到与当前Entry key相等的槽
             */
            for (int i = nextIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = nextIndex(i, len)) {
                ThreadLocal<?> k = e.get();

                // The newly stale slot, or any other stale slot
                // encountered above it, can then be sent to expungeStaleEntry
                // to remove or rehash all of the other entries in run.
                // 如果key相等,直接替换
                if (k == key) {
                    // 替换值
                    e.value = value;

                    /**
                     * 互换 tab[i] 和 tab[staleSlot],将无效的tab[staleSlot]中的Entry后移;
                     * 将正确的 tab[i]中的Entry前移,方便下一次的查找(可以根据直接槽索引查找到了)
                     */
                    tab[i] = tab[staleSlot];
                    tab[staleSlot] = e;

                    // 如果待回收的槽索引就是当前的无效索引,则设置待回收的槽索引为i
                    if (slotToExpunge == staleSlot)
                        slotToExpunge = i;
                    cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
                    return;
                }

                // 如果 "k==null && 待回收的槽索引就是当前的无效索引",则设置待回收的槽索引为i
                if (k == null && slotToExpunge == staleSlot)
                    slotToExpunge = i;
            }

            // 置空原本的entry的value,将当前的Entry添加到这个槽上,原本的Entry等待GC
            tab[staleSlot].value = null;
            tab[staleSlot] = new Entry(key, value);

            // If there are any other stale entries in run, expunge them
            if (slotToExpunge != staleSlot)
                cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
        }

步骤:

  • 首先从当前索引staleSlot的前一个元素开始向前遍历一个连续的Entry子数组,一直找到该连续子数组最靠前的那个无效索引,设置为slotToExpunge
  • 然后从staleSlot的 后一个元素开始 向后遍历一个连续的Entry子数组
  • 如果找到的Entry的key是当前的ThreadLocal,则直接替换value,之后将找到的Entry放置到table[staleSlot]处,也就是Entry本身的直接索引处,而Entry本身的真实索引的位置放置为原来的table[staleSlot];然后进行一次连续段的回收,之后进行一次log次数级别的回收,最后返回
  • 如果经过循环,没有找到key与ThreadLocal相等的Entry,则直接将要set的Entry放置到table[staleSlot]处,最后根据需要进行一次连续段的回收,之后在进行一次log次数级别的回收

关于回收与扩容相关的操作,后续分析。

3.7 从ThreadLocalMap中获取Entry

        private Entry getEntry(ThreadLocal<?> key) {
            // 计算所查找的Entry在table[]中的索引
            int i = key.threadLocalHashCode & (table.length - 1);
            Entry e = table[i];
            // 如果快速查找到的Entry!=null并且key也相等,则直接返回(也就是说在直接的hash槽找到了Entry)
            if (e != null && e.get() == key)
                return e;
            //
            else
                return getEntryAfterMiss(key, i, e);
        }

        private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
            Entry[] tab = table;
            int len = tab.length;

            /**
             * 循环遍历,直到找到下一个不为null的Entry
             */
            while (e != null) {
                ThreadLocal<?> k = e.get();
                // 如果key相等,表示找到了,直接返回
                if (k == key)
                    return e;
                // 如果key==null,则尝试回收或者整理"i到i之后的第一个Entry为null的索引(即返回值)之间"的Entry数据
                if (k == null)
                    expungeStaleEntry(i);
                // 继续向后遍历
                else
                    i = nextIndex(i, len);
                e = tab[i];
            }
            // 如果没找到,返回null
            return null;
        }

步骤:

  • 计算当前的ThreadLocal的直接索引,获取直接索引位置的Entry
  • 如果该位置的Entry不为null且Entry的key与当前的ThreadLocal是同一个元素,则直接返回该Entry
  • 如果该位置的Entry为null,直接返回null(看getEntryAfterMiss逻辑)
  • 如果该位置的Entry不为null,但是Entry的key与当前的ThreadLocal不是同一个元素,则表明发生了hash冲突。此时,会不断的向后循环寻找,直到找到了要查找的Entry或者遍历到的Entry为null,如果找到了,返回Entry,如果没找到,返回null。

注意:

  • 在没有根据直接索引找到Entry后的while循环中,如果检测到无效的Entry时,会使用expungeStaleEntry方法尝试回收或者整理"i到i之后的一段连续的不为null的索引(即返回值)之间"的Entry数据。
  • 对于get来讲,线性探测法是以Entry!=null为循环查找结束条件的,所以要注意remove时的操作。

3.8 从ThreadLocalMap中删除Entry

        private void remove(ThreadLocal<?> key) {
            Entry[] tab = table;
            int len = tab.length;
            int i = key.threadLocalHashCode & (len - 1);
            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                if (e.get() == key) {
                    // key - ThreadLocal置为null
                    e.clear();
                    // 尝试回收或者整理"i到i之后的一段连续的不为null的索引(即返回值)之间"的Entry数据
                    expungeStaleEntry(i);
                    return;
                }
            }
        }

步骤:

  • 计算key的直接索引,从直接索引的Entry开始,依旧在一段连续的不为null的Entry进行循环,如果找到的Entry的key与当前的key相同,则进行删除操作:
    • 首先将Entry的key置空(e.clear())
    • 尝试回收或者整理"i到i之后的一段连续的不为null的索引(即返回值)之间"的Entry数据

注意:

  • 这里的expungeStaleEntry中的整理逻辑非常重要,直接影响get查询的查询链是否会断掉的问题。

3.9 rehash()与resize()

        private void rehash() {
            // 回收table中所有无效的Entry,如果可以有效减小size到不满足下边的扩容条件,则下边的resize()不需要执行
            expungeStaleEntries();

            // 实际上扩容是发生在size>=threshold*3/4==length*(2/3)*(3/4)==lenth*1/2
            if (size >= threshold - threshold / 4)
                resize();
        }

        /**
         * 扩容两倍数组,之后rehash
         */
        private void resize() {
            // 获取旧数组及其长度
            Entry[] oldTab = table;
            int oldLen = oldTab.length;
            // 新数组扩容2倍
            int newLen = oldLen * 2;
            Entry[] newTab = new Entry[newLen];
            int count = 0;

            for (int j = 0; j < oldLen; ++j) {
                Entry e = oldTab[j];
                if (e != null) {
                    ThreadLocal<?> k = e.get();
                    // 无效的Entry
                    if (k == null) {
                        e.value = null; // Help the GC
                    } else {
                        int h = k.threadLocalHashCode & (newLen - 1);
                        while (newTab[h] != null)
                            h = nextIndex(h, newLen);
                        newTab[h] = e;
                        count++;
                    }
                }
            }
            // 设置新的阈值
            setThreshold(newLen);
            size = count;
            table = newTab;
        }

rehash()仅用在ThreadLocalMap#set(ThreadLocal<?> key, Object value)的末尾部分。

rehash()步骤:

  • 首先进行一次全表回收无效的Entry操作,如果可以有效减小数组中的元素个数size到数组长度的一半以下,则不需要进行扩容,如果达到一半及以上,则进行扩容操作。

resize()步骤:

  • 创建新的Entry[]是旧的长度的两倍;之后遍历旧数组,之后根据新的数组的长度重新计算每个有效Entry的直接索引,根据线性探测法重新填充数组。
  • 最后设置新的Entry[]的阈值threshold、数组元素个数size,将新数组赋值给全局变量table

四、回收机制

ThreadLocal 回收机制 就是如下的四个函数以及WeakReference<ThreadLocal<?>>

1、expungeStaleEntry(int staleSlot) 
从staleSlot开始的连续Entry子数组中的无效Entry的清理和 整理:

2、expungeStaleEntries()
清理整个table中的无效Entry

3、cleanSomeSlots(int i, int n)
进行log2(n)次循环(如果正好遍历到无效的Entry,则n=table.length,调用expungeStaleEntry进行连续段的清理和整理);
是一种介于完全不扫描和完全扫描(expungeStaleEntries)之间的一种扫描方式

4、replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot)
该函数已在set部分分析:从table[staleSlot]后的第一个元素开始在一个连续的Entry子数组内如果找到一个key与当前ThreadLocal相等的Entry,则替换值,然后互换该Entry和table[staleSlot](①),返回;如果没找到,则将table[staleSlot]直接设置为新的将设置的Entry(②)
①②处都会对该Entry所在的连续段,从连续段的第一个无效Entry(可能在当前的Entry之前)开始或者从该Entry的后一个元素开始(表示该Entry之前的元素全部有效)进行一次连续段的 回收和整理,之后在进行一次log级别的回收;

4.1 expungeStaleEntry(int staleSlot)

        /**
         * 尝试回收或者整理"staleSlot到staleSlot之后的第一个Entry为null的索引(即返回值)之间"的Entry数据
         *
         * @return the index of the next null slot after staleSlot
         * (all between staleSlot and this slot will have been checked
         * for expunging).
         */
        private int expungeStaleEntry(int staleSlot) {
            Entry[] tab = table;
            int len = tab.length;

            // 回收table[staleSlot]处的值,value置为null,整体置为null
            tab[staleSlot].value = null;
            tab[staleSlot] = null;
            // size-1
            size--;

            // Rehash until we encounter null
            Entry e;
            int i;
            for (i = nextIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = nextIndex(i, len)) {
                ThreadLocal<?> k = e.get();
                // 如果当前的table[i] Entry无用,则直接回收
                if (k == null) {
                    e.value = null;
                    tab[i] = null;
                    size--;
                } else {
                    /**
                     * 问题:
                     * 假设ThreadLocalMap的table长度为4的,其中table[1]、table[2]已经有值,而且key都不为null,
                     * 这时候set一个threadlocal1,算出来的索引为1,根据线性探测法,最后会设置到table[3];
                     * 然后,table[2]被remove,此时table[2]==null;
                     * 然后,get threadlocal1,此时算出索引还是1, 从table[1]向后查找,结果table[2]==null,则查找链就断了,结果返回null,
                     * 而没有办法查找到真正的table[3],这种情况threadlocal是怎么处理的呢?
                     *
                     * 答案就是下边的代码:会对remove元素的后边的元素循环遍历,直到遍历到非null为止。对每个元素做如下逻辑:
                     * 如果当前元素的k==null,则回收,如果不是无效元素,则计算其真实的直接索引,例如重新计算table[3]的索引,此处为1,与当前遍历的索引3不同,
                     * 所以将table[3]先置空,然后从table[1]开始向后寻找第一个为null的位置,重新放置table[3]的元素,此处就是table[2]=table[3]
                     *
                     * 简言之:remove一个Entry之后,会整理remove的Entry元素之后的一块儿连续的(中间没有null的entry)元素子数组,全部使用线性探测法重新计算,
                     * 这样remove掉的那一块儿就会被后续的Entry重新填充,查询链就不会断
                     */
                    // 计算真实的直接索引位置,因为可能k是因为hash冲突循环后移的
                    int h = k.threadLocalHashCode & (len - 1);
                    // 如果h和i,不相等,表示确实k是循环后移了的
                    if (h != i) {
                        // 首先将之前的Entry置为null
                        tab[i] = null;

                        // 一直向后循环找到第一个为null的索引,之后table[h] = e(将当前的Entry放置到table[h]新的hash位置上)
                        while (tab[h] != null)
                            h = nextIndex(h, len);
                        tab[h] = e;
                    }
                }
            }
            return i;
        }

步骤:

  • 将table[staleSlot]的value和Entry都置为null,元素个数size - 1
  • 遍历staleSlot之后的一段连续的Entry,如果Entry的key==null,将该Entry的value和Entry本身置空,如果key是有效的,将要进行 整理操作
    • 计算直接索引的位置,如果当前的Entry的真实索引与直接索引不相等,则表示该Entry之前是因为hash冲突被冲到后边的slot里的,此处会重新从直接索引位置开始向后找到第一个不为null的的位置,将当前的Entry元素放置到这里,这样会保证原本连续的Entry子数组,在从中删除一个Entry后,子数组中剩下的元素还可以组成一个连续的Entry数组,这保证了get操作的查询链不会断。

注意:

  • remove一个元素后,会调用这里的 整理操作 进行连续Entry子数组的整理,不会使查询链断掉。要仔细的看注释中的那个例子。
  • 整理的过程相当于一次从staleSlot开始的连续段 内存碎片整理,将无效的Entry删除,有效的Entry按照原本的顺序聚集。

4.2 expungeStaleEntries()

        /**
         * 回收table中所有的无效Entry.
         */
        private void expungeStaleEntries() {
            Entry[] tab = table;
            int len = tab.length;
            for (int j = 0; j < len; j++) {
                Entry e = tab[j];
                if (e != null && e.get() == null)
                    expungeStaleEntry(j);
            }
        }

遍历table中所有的元素,发现无效的Entry后,就调用expungeStaleEntry(int staleSlot)进行连续子数组的资源清理与整理。该方法影响较大,仅仅用在rehash()中。

4.3 cleanSomeSlots(int i, int n)

        /**
         * 对数级别的扫描,介于 "不扫描(fast but retains garbage)" 和 "完全扫描(find all garbage but would cause some insertions to take O(n) time)" 之间
         *
         * @param i a position known NOT to hold a stale entry. The
         *          scan starts at the element after i.
         * @param n scan control: {@code log2(n)} cells are scanned,
         *          unless a stale entry is found, in which case
         *          {@code log2(table.length)-1} additional cells are scanned.
         *          When called from insertions, this parameter is the number
         *          of elements, but when from replaceStaleEntry, it is the
         *          table length. (Note: all this could be changed to be either
         *          more or less aggressive by weighting n instead of just
         *          using straight log n. But this version is simple, fast, and
         *          seems to work well.)
         * @return true if any stale entries have been removed.
         */
        private boolean cleanSomeSlots(int i, int n) {
            boolean removed = false;
            Entry[] tab = table;
            int len = tab.length;
            do {
                // 向后循环获取下一个索引,如果已经到了末尾,则返回table[0]
                i = nextIndex(i, len);
                Entry e = tab[i];
                // 如果此Entry已经无效了,key-ThreadLocal已经被回收掉了,无法再通过该key获取Entry,如果不回收此Entry,则会造成内存泄露
                if (e != null && e.get() == null) {
                    n = len;
                    removed = true;
                    // 尝试回收或者rehash"i到i之后的第一个Entry为null的索引(即返回值)之间"的Entry数据
                    i = expungeStaleEntry(i);
                }
            } while ((n >>>= 1) != 0); // n / 2,假设n=16,当然set的时候n=size,则log2(16)=4,也就是说需要循环4次(也可以直接理解为16/2/2/2/2=1,第5次1/2==0则不再进行循环)
            return removed;
        }

该方法只扫描一部分Entry,进行log2(n)次循环,如果恰好扫描到无效的Entry,则n=table.length,即进行log2(table.length)次循环。每个循环的步骤:

  • 获取下一个索引位置i及该位置上的Entry
  • 如果该Entry==null或者Entry依然有效,则进行下一次循环;
  • 如果该Entry!=null且无效,则先将n设置为长度(在set方法中n本来是)调用expungeStaleEntry(i)回收或者 整理 i之后的一段连续的Entry,之后再进行一下轮的回收和整理操作。

4.4 弱引用WeakReference

由于Entry继承了弱引用,他的key是referent,当发生gc时,如果该key已经不可达了,则直接回收(key==null),之后在如下操作时删除key==null的Entry。

  • get时调用expungeStaleEntry
  • set时调用replaceStaleEntry或者cleanSomeSlots或者expungeStaleEntries
  • remove时调用expungeStaleEntry

五、总结

5.1 set整个流程

  1. 首先将table[]成员变量局部化(在后续的操作中如果会对table成员变量进行多次操作,局部化会提高性能)
  2. 获取数组长度,计算直接索引
  3. 然后进行hash冲突检测与处理(线性探测法)
    • 获取直接索引位置上的Entry,如果不为null,表示发生了hash冲突,则先获取Entry中的key,如果该key与当前的ThreadLocal是同一个对象,则直接替换value,返回;
    • 如果Entry中的key==null,表示当前的Entry已经是一个无效的Entry了,执行replaceStaleEntry方法进行处理(replaceStaleEntry:如果从该Entry后的第一个元素开始在一个连续的Entry子数组内找到一个key与当前ThreadLocal相等的元素,则替换值,然后互换该Entry和当前的key==null的Entry位置;如果没找到,则将当前的key==null的Entry设置为新的将设置的Entry)返回;
    • 如果发生了hash冲突,但是直接索引位置上的Entry即不是当前的ThreadLocal的Entry,也不是一个无效的Entry,则需要向后循环遍历下一个Entry元素,再进行上述逻辑的处理。直到找到一个Entry为null的位置或者Entry失效的位置或者Entry就是当前的ThreadLocal的Entry的位置。
  4. 假设没有发生hash冲突或者经过上述的for循环找到了一个Entry为null的位置的时候,在该位置创建设置新添加的Entry,数组元素+1;
  5. 尝试log级别的回收部分无效的Entry,如果成功,则当前的数组中的元素个数将小于阈值,则肯定不需要rehash();如果失败,且数组元素个数 >= threshold,执行rehash()
    • 首先对整个table回收无效的Entry操作,如果可以有效减小数组中的元素个数size到数组长度的一半以下,则不需要进行扩容,如果size达到length的一半及以上,则进行扩容操作
    • 扩容:创建新的Entry[]是旧的长度的两倍;之后遍历旧数组,根据新的数组的长度重新计算每个有效Entry的直接索引,根据线性探测法重新填充数组。
      最后设置rehash()阈值、数组元素个数size,将新数组赋值给全局变量table。

5.2 get整个流程

  1. 获取当前线程
  2. 获取当前线程的ThreadLocalMap属性
  3. 如果当前线程的ThreadLocalMap对象不为null,从ThreadLocalMap对象中获取key为当前的ThreadLocal的Entry
    • 计算当前的ThreadLocal的直接索引,获取直接索引位置的Entry
    • 如果该位置的Entry不为null且Entry的key与当前的ThreadLocal是同一个元素,则直接返回该Entry
    • 如果该位置的Entry为null,直接返回null(看getEntryAfterMiss逻辑)
    • 如果该位置的Entry不为null,但是Entry的key与当前的ThreadLocal不是同一个元素,则表明发生了hash冲突。此时,会不断的向后循环寻找,直到找到了要查找的Entry或者遍历到的Entry为null,如果找到了,返回Entry,如果没找到,返回null。
  4. 如果该Entry不为null,直接获取entry的value,返回即可
  5. 如果当前线程的ThreadLocalMap为null(还未createMap(Thread t, T firstValue))或者key为当前的ThreadLocal的Entry为null(没有调用map.set(this, value)),则执行设置初始化值的方法。

5.3 各类回收的时机

  1. set或者setInitialValue
    • 如果从直接索引开始向后遍历的一段连续Entry段内发生hash冲突且冲突的Entry已经是无效的,此时会执行replaceStaleEntry,会执行回收和整理操作
    • 如果没有发生hash冲突 或者 发生了hash冲突但是在从直接索引开始的一段连续Entry段内没有找到与设置的key相等的Entry或者无效的Entry,如果直接索引所在的整个连续Entry段如果有无效Entry,则在set的尾部都会执行一次log级别的回收操作,如果回收不理想,需要进行rehash()
    • rehash()会进行一次全table的回收
  2. get
    • 在通过直接索引没有get到数据的时候,会循环遍历一段连续的Entry,如果遍历到的Entry是无效的,则进行一次连续Entry子数组的回收和整理
  3. remove
    • 当找到需要被删除的Entry时,进行一次连续Entry子数组的回收和整理
  4. gc时
    • 由于Entry继承了弱引用,他的key是referent,当发生gc时,如果该key已经无引用指向了,则直接回收(key==null了),之后再根据上述1,2,3的操作删除key==null的Entry。(这也是为什么ThreadLocal没有为Entry绑定ReferenceQueue的原因,因为Entry的删除已经可以发生在1,2,3中了;但是WeakHashMap不一样,WeakHashMap的Entry继承于WeakReference,其key是referent,当发生gc时,如果key不可达了,则回收key,之后将key对应的Entry放入queue,在后续对WeakHashMap的操作 - getTable()/size()/resize(int newCapacity)的时候进行Entry的回收)

5.4 ThreadLocal与线程池

线程池中的线程由于会被复用,所以线程池中的每一条线程在执行task结束后,要清理掉其ThreadLocalMap及其中的各个Entry,否则,当这条线程在下一次被复用的时候,其ThreadLocalMap信息还存储着上一次被使用的时的信息;另外,假设这条线程不再被使用,但是这个线程有可能不会被销毁(与线程池的类型和配置相关),那么其上的ThreadLocal将发生了资源泄露。所以netty的io线程池使用FastThreadLocalRunnable wrap了Runnable任务,当任务执行结束后,会做InternalThreadLocalMap和FastThreadLocal的清理操作。

相关文章

网友评论

    本文标题:Netty源码分析2 - ThreadLocal 源码解析

    本文链接:https://www.haomeiwen.com/subject/aylzmftx.html