美文网首页java并发源码解读nettybugstac...
Netty源码分析2 - ThreadLocal 源码解析

Netty源码分析2 - ThreadLocal 源码解析

作者: 原水寒 | 来源:发表于2018-07-29 10:53 被阅读237次
    netty新增了FastThreadLocal和FastThreadLocalThread等类来实现了一个FastThread框架,相较于java.lang.Thread和java.lang.ThreadLocal速度更快。
    在看FastThread框架之前,先看一下java.lang.ThreadLocal的源码。
    
    • 一、使用姿势
    • 二、数据结构
    • 三、源码分析
    • 四、回收机制
    • 总结

    一、使用姿势

    public class ThreadLocalTest {
        /**
         * 最佳实践一:使用private static
         * {@code ThreadLocal} instances are typically private static fields in classes
         * that wish to associate state with a thread (e.g., a user ID or Transaction ID)
         *
         * java8写法:
         * private static final ThreadLocal<Integer> integerThreadLocal = ThreadLocal.withInitial(() -> 100);
         *
         * java8之前的写法:
         * <pre>
         *     private static final ThreadLocal<Integer> integerThreadLocal = new ThreadLocal<Integer>() {
         *          @Override
         *          protected Integer initialValue() {
         *              return 100;
         *          }
         *      };
         * </pre>
         */
        private static final ThreadLocal<Integer> integerThreadLocal = new ThreadLocal<Integer>() {
            /**
             * 最佳实践二:重写initialValue()
             */
            @Override
            protected Integer initialValue() {
                return 100;
            }
        };
    
        @Test
        public void testThreadLocal() {
            System.out.println(integerThreadLocal.get());
            /**
             * 最佳实践三:用完如果可以删除,则手动删除
             */
            integerThreadLocal.remove();
            System.out.println(integerThreadLocal.get());
        }
    }
    

    最佳实践

    • 在类中定义ThreadLocal,用private static修饰;
    • 根据源码分析,由于ThreadLocal对象本身会作为Entry的key去获取数据,所以最好也要用final去修饰。key通常都是不可变对象,否则当作为key的对象发生了变化之后,之前存储的数据将无法get,造成资源泄露;
    • 使用匿名内部类实现initialValue(),在执行get()时,如果当前的Thread的ThreadLocalMap没有integerThreadLocal这个key的Entry(不管是第一次get还是remove之后的第一次get),则直接使用initialValue()进行初始化操作;
    • 在使用完ThreadLocal后,及时手动remove()无用的ThreadLocal,防止资源泄露

    二、数据结构

    threadlocal.png

    说明

    • 每一个Thread类都有一个属性:ThreadLocal.ThreadLocalMap threadLocals = null
    • 每一个ThreadLocalMap都有一个Entry[]数组
    • 每一个Entry继承于WeakReference,Entry的key(ThreadLocal实例)作为WeakReference的referent,传入的value作为value

    java引用:

    注意

    • 从第三条说明点发现每一个ThreadLocal只能存储一个值,对于dubbo的RpcContext.LOCAL来讲,存储了一个RpcContext上下文信息;如果需要存储多个值,根据情况,将多个值合并为对象进行存储或者使用多个ThreadLocal实例进行存储

    三、源码分析

    3.1、ThreadLocal的创建

    基础属性

        /**
         * 每一个ThreadLocal对象都有一个唯一的threadLocalHashCode。
         * ThreadLocal对象会作为ThreadLocalMap的Entry的key     
         */
        private final int threadLocalHashCode = nextHashCode();
    
        private static AtomicInteger nextHashCode = new AtomicInteger();
    
        /**
         * 魔数:与斐波那契数列和黄金分割点有关。用于散列均匀,减少hash冲突
         */
        private static final int HASH_INCREMENT = 0x61c88647;
    
        private static int nextHashCode() {
            return nextHashCode.getAndAdd(HASH_INCREMENT);
        }
    

    说明

    • 每一个ThreadLocal对象都有一个唯一的threadLocalHashCode,该值用于计算Entry元素存储的直接索引。
      • 直接索引:元素根据threadLocalHashCode&(table.len-1)算出来的值
      • 真实索引:元素可能由于hash冲突,使用线性探测法,放置在不是直接索引的位置
    • hash算法:每一个ThreadLocal的threadLocalHashCode都是前一个的threadLocalHashCode + 0x61c88647。这样的hash算法可以很大程度的避免hash冲突,散列很均匀。0x61c88647这个数是怎么算出来的,见:https://www.javaspecialists.eu/archive/Issue164.html

    普通构造函数

        public ThreadLocal() {
        }
    

    java8提供的构造函数

        public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) {
            return new SuppliedThreadLocal<>(supplier);
        }
        
        static final class SuppliedThreadLocal<T> extends ThreadLocal<T> {
    
            private final Supplier<? extends T> supplier;
    
            SuppliedThreadLocal(Supplier<? extends T> supplier) {
                this.supplier = Objects.requireNonNull(supplier);
            }
    
            @Override
            protected T initialValue() {
                return supplier.get();
            }
        }
    

    java8提供了Supplier(提供者) FunctionalInterface。基于该接口可以方便的替代掉示例中的使用匿名内部类来重写initialValue()的方法。需要注意的是,supplier函数不可为null。

    @FunctionalInterface
    public interface Supplier<T> {
    
        /**
         * Gets a result.
         */
        T get();
    }
    

    3.2 数据的初始化与设置

        /**
         * 设置初始值
         * 与set(T value)几乎相同,下边代码等价于
         * <pre>
         *     private T setInitialValue() {
         *          T value = initialValue();
         *          set(value);
         *          return value;
         *     }
         * </pre>
         */
        private T setInitialValue() {
            // 获取初始值(通常是调用使用方重写的initialValue(),否则使用{@link ThreadLocal#initialValue()返回null})
            T value = initialValue();
            // 获取当前线程
            Thread t = Thread.currentThread();
            // 获取当前线程的ThreadLocalMap对象
            ThreadLocalMap map = getMap(t);
            // 如果当前线程的ThreadLocalMap对象不为null,则直接将{当前的ThreadLocal对象,value}设置到ThreadLocalMap对象中去
            if (map != null)
                map.set(this, value);
            // 如果当前线程的ThreadLocalMap为null,则直接新建ThreadLocalMap对象,并将{当前的ThreadLocal对象,value}设置到ThreadLocalMap对象中去
            else
                createMap(t, value);
            // 返回初始值
            return value;
        }
        
        protected T initialValue() {
            return null;
        }
        
        /**
         * 设置值
         */
        public void set(T value) {
            // 获取当前线程
            Thread t = Thread.currentThread();
            // 获取当前线程的ThreadLocalMap对象
            ThreadLocalMap map = getMap(t);
            // 如果当前线程的ThreadLocalMap对象不为null,则直接将{当前的ThreadLocal对象,value}设置到ThreadLocalMap对象中去
            if (map != null)
                map.set(this, value);
            // 如果当前线程的ThreadLocalMap为null,则直接新建ThreadLocalMap对象,并将{当前的ThreadLocal对象,value}设置到ThreadLocalMap对象中去
            else
                createMap(t, value);
        }
        
        /**
         * 获取当前线程t的ThreadLocalMap
         */
        ThreadLocalMap getMap(Thread t) {
            return t.threadLocals;
        }
        
        /**
         * 创建ThreadLocalMap,并将 {当前的ThreadLocal对象,firstValue} 设置到ThreadLocalMap对象中去
         * this:当前的ThreadLocal对象
         */
        void createMap(Thread t, T firstValue) {
            t.threadLocals = new ThreadLocalMap(this, firstValue);
        }
    

    setInitialValue()流程:

    • 首先调用initialValue()获取初始值(通常是调用使用方重写的initialValue(),否则返回null)
    • 然后获取当前线程
    • 然后获取当前线程的ThreadLocalMap对象属性
    • 如果当前线程的ThreadLocalMap对象不为null,则直接将{当前的ThreadLocal对象,value}设置到ThreadLocalMap对象中去
    • 如果当前线程的ThreadLocalMap为null,则直接新建ThreadLocalMap对象,并将{当前的ThreadLocal对象,value}设置到ThreadLocalMap对象中去
    • 最后返回initialValue()获取到的初始值

    setInitialValue()被使用的时机:ThreadLocal.get()。

    关于ThreadLocalMap的创建和向ThreadLocalMap中设置参数的源码后续再说。

    3.3 数据的获取

        public T get() {
            // 获取当前线程
            Thread t = Thread.currentThread();
            // 获取当前线程的ThreadLocalMap属性
            ThreadLocalMap map = getMap(t);
            if (map != null) {
                // 从ThreadLocalMap属性中获取key为当前的ThreadLocal的Entry
                ThreadLocalMap.Entry e = map.getEntry(this);
                if (e != null) {
                    // 如果Entry不为null,直接获取entry的value,返回即可
                    @SuppressWarnings("unchecked")
                    T result = (T) e.value;
                    return result;
                }
            }
            // 如果当前线程的ThreadLocalMap为null或者key为当前的ThreadLocal的Entry为null
            return setInitialValue();
        }
    

    get()流程:

    • 获取当前线程
    • 获取当前线程的ThreadLocalMap属性
    • 如果当前线程的ThreadLocalMap对象不为null,从ThreadLocalMap对象中获取key为当前的ThreadLocal的Entry
    • 如果该Entry不为null,直接获取entry的value,返回即可
    • 如果当前线程的ThreadLocalMap为null(还未createMap(Thread t, T firstValue))或者key为当前的ThreadLocal的Entry为null(没有调用map.set(this, value)),则执行设置初始化值的方法。

    关于根据ThreadLocal从ThreadLocalMap中获取Entry的源码后续再说。

    3.4 数据的删除

        public void remove() {
            ThreadLocalMap m = getMap(Thread.currentThread());
            if (m != null)
                m.remove(this);
        }
    

    remove()步骤:

    • 获取当前的线程
    • 获取当前线程的ThreadLocalMap
    • 如果当前的ThreadLocalMap不为null,直接执行调用ThreadLocalMap的remove(ThreadLocal<?> key)

    关于根据ThreadLocal从ThreadLocalMap中remove Entry的源码后续再说。

    可以看到具体的操作全部委托到了ThreadLocalMap中来执行。

    3.5 ThreadLocalMap的创建

    基础属性

            /**
             * The initial capacity -- MUST be a power of two.
             */
            private static final int INITIAL_CAPACITY = 16;
    
            /**
             * The table, resized as necessary.
             * table.length MUST always be a power of two.
             */
            private Entry[] table;
    
            /**
             * The number of entries in the table.
             */
            private int size = 0;
    
            /**
             * threshold == table大小的2/3,
             * 1、当size >= threshold,遍历table并删除所有key为null的元素(rehash()),
             * 2、如果删除后size >= threshold*3/4 == 数组长度*1/2 时,需要对table进行扩容
             */
            private int threshold; // Default to 0
    
            private void setThreshold(int len) {
                threshold = len * 2 / 3;
            }
    

    静态内部类Entry

            static class Entry extends WeakReference<ThreadLocal<?>> {
                Object value;
                Entry(ThreadLocal<?> k, Object v) {
                    super(k);
                    value = v;
                }
            }
    

    其中value就是set的value,而k是当前的ThreadLocal对象,作为WeakReference的referent属性。

    构造器

            ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
                // 创建数组实例,默认长度16
                table = new Entry[INITIAL_CAPACITY];
                // 求余计算数组索引i
                int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
                // 第一次插入,没有hash冲突,直接放置
                table[i] = new Entry(firstKey, firstValue);
                // 数组实际元素长度 + 1
                size = 1;
                // 设置扩容的阈值为 16*2/3 = 10,当有10个元素时,发生扩容
                setThreshold(INITIAL_CAPACITY);
            }
    

    步骤:

    • 创建数组实例,默认长度16
    • 计算firstKey的直接索引,当len=2的n次方时,x&(len-1) <=> x%len,所以这里相当于直接求余,只是&运算效率更高
    • 第一次插入,没有hash冲突,直接创建Entry并放置就ok了
    • 然后数组中元素个数size+1
    • 设置rehash()的阈值

    索引操作

            /**
             * i:数组索引
             * len:数组长度
             * 如果 i+1<数组长度,则返回i+1, 如果 i+1>=数组长度,则直接取0(即开头table[0]),
             * 在实际使用中,实际上就是不断向后的循环取,到了末尾,再回到table[0],再向后循环取
             */
            private static int nextIndex(int i, int len) {
                return ((i + 1 < len) ? i + 1 : 0);
            }
    
            /**
             * 如果 i-1>=0,则返回i-1, 如果 i<1,则直接取数组长度-1(即末尾table[len-1]),
             * 在实际使用中,实际上就是不断向前的循环取,到了开头,再回到table[len-1],再向前循环取
             */
            private static int prevIndex(int i, int len) {
                return ((i - 1 >= 0) ? i - 1 : len - 1);
            }
    

    从上述的逻辑可知,我们可以将table[]看做是一个循环数组。

    3.6 向ThreadLocalMap中设置Entry

            /**
             * 设置 {key, value} 到当前线程的ThreadLocalMap中
             */
            private void set(ThreadLocal<?> key, Object value) {
                // 成员变量局部化,提高性能
                Entry[] tab = table;
                // 获取数组容量
                int len = tab.length;
                // 获取当前的threadlocal位于数组的下标,当len=2的n次方时,
                // key.threadLocalHashCode & (len - 1) <=> key.threadLocalHashCode % len
                int i = key.threadLocalHashCode & (len - 1);
    
                /**
                 * 先获取tab[i],判断如果不为null,则进行循环体内逻辑;
                 * 若没有return,则进行二次循环,tab[i+1],判断如果不为null,则进行循环体内逻辑;
                 * 当 i + 1 >= len 时,tab[0]回到头再进行循环。
                 *
                 * 这就是解决hash冲突的另一种方式:线性探测法。如果当前槽没有元素,直接插入元素;如果当前槽有元素,则向后寻找第一个为null的槽,放置该元素。
                 * 在这里,还添加了hash冲突时替换原有元素或者替换无效元素的逻辑。
                 */
                for (Entry e = tab[i];
                     e != null;
                     e = tab[i = nextIndex(i, len)]) {
                    // 获取当前的Entry的key,使用e.get(),是因为当前的key是一个WeakReference,使用get()获取key
                    ThreadLocal<?> k = e.get();
                    // 如果当前的key就是刚刚查询出来的Entry的key(即修改的是同一个Entry),则直接替换值
                    if (k == key) {
                        e.value = value;
                        return;
                    }
                    // 如果刚刚查询出来的Entry的key是null,则表明该Entry的key(ThreadLocal)已经被回收了
                    // 使用replaceStaleEntry替换掉已经无效的Entry
                    if (k == null) {
                        replaceStaleEntry(key, value, i);
                        return;
                    }
                }
    
                // 通过上述for循环解决了hash冲突之后,创建新的Entry,放置到当前的table[i]节点上
                tab[i] = new Entry(key, value);
                // table的size+1
                int sz = ++size;
                // 回收部分无效的Entry,如果成功,则当前的数组中的元素个数将小于阈值,则肯定不需要扩容,
                // 否则判断当前的数组中的元素个数是否达到了扩容阈值
                if (!cleanSomeSlots(i, sz) && sz >= threshold)
                    rehash();
            }
    

    说明:

    • 首先将table[]成员变量局部化(在后续的操作中如果会对table成员变量进行多次操作,局部化会提高性能)
    • 获取数组长度,计算直接索引
    • 然后进行hash冲突检测与处理(线性探测法)
      • 获取直接索引位置的Entry,如果不为null,表示发生了hash冲突,则先获取Entry中的key,如果该key与当前的ThreadLocal是同一个对象,则直接替换value,返回;
      • 如果Entry中的key==null,表示当前的Entry已经是一个无效的Entry了,执行replaceStaleEntry方法进行处理(如果从该Entry后的第一个元素开始在一个连续的Entry子数组内找到一个key与当前ThreadLocal相等的元素,则替换值,然后互换该Entry和当前的key==null的Entry位置;如果没找到,则将当前的key==null的Entry设置为新的将设置的Entry)返回;
      • 如果发生了hash冲突,但是直接索引位置上的Entry即不是当前的ThreadLocal的Entry,也不是一个无效的Entry,则需要向后循环遍历下一个Entry元素,再进行上述逻辑的处理。直到找到一个Entry为null的位置或者Entry失效的位置或者Entry就是当前的ThreadLocal的Entry的位置。
    • 假设没有发生hash冲突或者经过上述的for循环找到了一个Entry为null的位置的时候,在该位置创建设置新添加的Entry,数组元素+1;
    • 尝试回收部分无效的Entry,如果成功,则当前的数组中的元素个数将小于阈值,则肯定不需要rehash();如果失败,且数组元素个数 > threshold,执行rehash() -- 即执行部分资源清理与可能的扩容操作。

    replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot)

            /**
             * 对一段连续的Entry子数组进行操作
             * Replace a stale entry encountered during a set operation
             * with an entry for the specified key.
             */
            private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) {
                // 获取当前的table及其容量
                Entry[] tab = table;
                int len = tab.length;
                Entry e;
    
                // 设置需要被回收的槽索引(无效的Entry索引)
                int slotToExpunge = staleSlot;
                /**
                 * 向前遍历,直到找到第一个为null的槽
                 * 在遍历的过程中,如果找到了无效的Entry,则将slotToExpunge设置为当前的被回收的无效索引
                 */
                for (int i = prevIndex(staleSlot, len);
                     (e = tab[i]) != null;
                     i = prevIndex(i, len))
                    if (e.get() == null)
                        slotToExpunge = i;
    
                /**
                 * 向后遍历,直到找到第一个为null的槽或者找到与当前Entry key相等的槽
                 */
                for (int i = nextIndex(staleSlot, len);
                     (e = tab[i]) != null;
                     i = nextIndex(i, len)) {
                    ThreadLocal<?> k = e.get();
    
                    // The newly stale slot, or any other stale slot
                    // encountered above it, can then be sent to expungeStaleEntry
                    // to remove or rehash all of the other entries in run.
                    // 如果key相等,直接替换
                    if (k == key) {
                        // 替换值
                        e.value = value;
    
                        /**
                         * 互换 tab[i] 和 tab[staleSlot],将无效的tab[staleSlot]中的Entry后移;
                         * 将正确的 tab[i]中的Entry前移,方便下一次的查找(可以根据直接槽索引查找到了)
                         */
                        tab[i] = tab[staleSlot];
                        tab[staleSlot] = e;
    
                        // 如果待回收的槽索引就是当前的无效索引,则设置待回收的槽索引为i
                        if (slotToExpunge == staleSlot)
                            slotToExpunge = i;
                        cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
                        return;
                    }
    
                    // 如果 "k==null && 待回收的槽索引就是当前的无效索引",则设置待回收的槽索引为i
                    if (k == null && slotToExpunge == staleSlot)
                        slotToExpunge = i;
                }
    
                // 置空原本的entry的value,将当前的Entry添加到这个槽上,原本的Entry等待GC
                tab[staleSlot].value = null;
                tab[staleSlot] = new Entry(key, value);
    
                // If there are any other stale entries in run, expunge them
                if (slotToExpunge != staleSlot)
                    cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
            }
    

    步骤:

    • 首先从当前索引staleSlot的前一个元素开始向前遍历一个连续的Entry子数组,一直找到该连续子数组最靠前的那个无效索引,设置为slotToExpunge
    • 然后从staleSlot的 后一个元素开始 向后遍历一个连续的Entry子数组
    • 如果找到的Entry的key是当前的ThreadLocal,则直接替换value,之后将找到的Entry放置到table[staleSlot]处,也就是Entry本身的直接索引处,而Entry本身的真实索引的位置放置为原来的table[staleSlot];然后进行一次连续段的回收,之后进行一次log次数级别的回收,最后返回
    • 如果经过循环,没有找到key与ThreadLocal相等的Entry,则直接将要set的Entry放置到table[staleSlot]处,最后根据需要进行一次连续段的回收,之后在进行一次log次数级别的回收

    关于回收与扩容相关的操作,后续分析。

    3.7 从ThreadLocalMap中获取Entry

            private Entry getEntry(ThreadLocal<?> key) {
                // 计算所查找的Entry在table[]中的索引
                int i = key.threadLocalHashCode & (table.length - 1);
                Entry e = table[i];
                // 如果快速查找到的Entry!=null并且key也相等,则直接返回(也就是说在直接的hash槽找到了Entry)
                if (e != null && e.get() == key)
                    return e;
                //
                else
                    return getEntryAfterMiss(key, i, e);
            }
    
            private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
                Entry[] tab = table;
                int len = tab.length;
    
                /**
                 * 循环遍历,直到找到下一个不为null的Entry
                 */
                while (e != null) {
                    ThreadLocal<?> k = e.get();
                    // 如果key相等,表示找到了,直接返回
                    if (k == key)
                        return e;
                    // 如果key==null,则尝试回收或者整理"i到i之后的第一个Entry为null的索引(即返回值)之间"的Entry数据
                    if (k == null)
                        expungeStaleEntry(i);
                    // 继续向后遍历
                    else
                        i = nextIndex(i, len);
                    e = tab[i];
                }
                // 如果没找到,返回null
                return null;
            }
    

    步骤:

    • 计算当前的ThreadLocal的直接索引,获取直接索引位置的Entry
    • 如果该位置的Entry不为null且Entry的key与当前的ThreadLocal是同一个元素,则直接返回该Entry
    • 如果该位置的Entry为null,直接返回null(看getEntryAfterMiss逻辑)
    • 如果该位置的Entry不为null,但是Entry的key与当前的ThreadLocal不是同一个元素,则表明发生了hash冲突。此时,会不断的向后循环寻找,直到找到了要查找的Entry或者遍历到的Entry为null,如果找到了,返回Entry,如果没找到,返回null。

    注意:

    • 在没有根据直接索引找到Entry后的while循环中,如果检测到无效的Entry时,会使用expungeStaleEntry方法尝试回收或者整理"i到i之后的一段连续的不为null的索引(即返回值)之间"的Entry数据。
    • 对于get来讲,线性探测法是以Entry!=null为循环查找结束条件的,所以要注意remove时的操作。

    3.8 从ThreadLocalMap中删除Entry

            private void remove(ThreadLocal<?> key) {
                Entry[] tab = table;
                int len = tab.length;
                int i = key.threadLocalHashCode & (len - 1);
                for (Entry e = tab[i];
                     e != null;
                     e = tab[i = nextIndex(i, len)]) {
                    if (e.get() == key) {
                        // key - ThreadLocal置为null
                        e.clear();
                        // 尝试回收或者整理"i到i之后的一段连续的不为null的索引(即返回值)之间"的Entry数据
                        expungeStaleEntry(i);
                        return;
                    }
                }
            }
    

    步骤:

    • 计算key的直接索引,从直接索引的Entry开始,依旧在一段连续的不为null的Entry进行循环,如果找到的Entry的key与当前的key相同,则进行删除操作:
      • 首先将Entry的key置空(e.clear())
      • 尝试回收或者整理"i到i之后的一段连续的不为null的索引(即返回值)之间"的Entry数据

    注意:

    • 这里的expungeStaleEntry中的整理逻辑非常重要,直接影响get查询的查询链是否会断掉的问题。

    3.9 rehash()与resize()

            private void rehash() {
                // 回收table中所有无效的Entry,如果可以有效减小size到不满足下边的扩容条件,则下边的resize()不需要执行
                expungeStaleEntries();
    
                // 实际上扩容是发生在size>=threshold*3/4==length*(2/3)*(3/4)==lenth*1/2
                if (size >= threshold - threshold / 4)
                    resize();
            }
    
            /**
             * 扩容两倍数组,之后rehash
             */
            private void resize() {
                // 获取旧数组及其长度
                Entry[] oldTab = table;
                int oldLen = oldTab.length;
                // 新数组扩容2倍
                int newLen = oldLen * 2;
                Entry[] newTab = new Entry[newLen];
                int count = 0;
    
                for (int j = 0; j < oldLen; ++j) {
                    Entry e = oldTab[j];
                    if (e != null) {
                        ThreadLocal<?> k = e.get();
                        // 无效的Entry
                        if (k == null) {
                            e.value = null; // Help the GC
                        } else {
                            int h = k.threadLocalHashCode & (newLen - 1);
                            while (newTab[h] != null)
                                h = nextIndex(h, newLen);
                            newTab[h] = e;
                            count++;
                        }
                    }
                }
                // 设置新的阈值
                setThreshold(newLen);
                size = count;
                table = newTab;
            }
    

    rehash()仅用在ThreadLocalMap#set(ThreadLocal<?> key, Object value)的末尾部分。

    rehash()步骤:

    • 首先进行一次全表回收无效的Entry操作,如果可以有效减小数组中的元素个数size到数组长度的一半以下,则不需要进行扩容,如果达到一半及以上,则进行扩容操作。

    resize()步骤:

    • 创建新的Entry[]是旧的长度的两倍;之后遍历旧数组,之后根据新的数组的长度重新计算每个有效Entry的直接索引,根据线性探测法重新填充数组。
    • 最后设置新的Entry[]的阈值threshold、数组元素个数size,将新数组赋值给全局变量table

    四、回收机制

    ThreadLocal 回收机制 就是如下的四个函数以及WeakReference<ThreadLocal<?>>

    1、expungeStaleEntry(int staleSlot) 
    从staleSlot开始的连续Entry子数组中的无效Entry的清理和 整理:
    
    2、expungeStaleEntries()
    清理整个table中的无效Entry
    
    3、cleanSomeSlots(int i, int n)
    进行log2(n)次循环(如果正好遍历到无效的Entry,则n=table.length,调用expungeStaleEntry进行连续段的清理和整理);
    是一种介于完全不扫描和完全扫描(expungeStaleEntries)之间的一种扫描方式
    
    4、replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot)
    该函数已在set部分分析:从table[staleSlot]后的第一个元素开始在一个连续的Entry子数组内如果找到一个key与当前ThreadLocal相等的Entry,则替换值,然后互换该Entry和table[staleSlot](①),返回;如果没找到,则将table[staleSlot]直接设置为新的将设置的Entry(②)
    ①②处都会对该Entry所在的连续段,从连续段的第一个无效Entry(可能在当前的Entry之前)开始或者从该Entry的后一个元素开始(表示该Entry之前的元素全部有效)进行一次连续段的 回收和整理,之后在进行一次log级别的回收;
    

    4.1 expungeStaleEntry(int staleSlot)

            /**
             * 尝试回收或者整理"staleSlot到staleSlot之后的第一个Entry为null的索引(即返回值)之间"的Entry数据
             *
             * @return the index of the next null slot after staleSlot
             * (all between staleSlot and this slot will have been checked
             * for expunging).
             */
            private int expungeStaleEntry(int staleSlot) {
                Entry[] tab = table;
                int len = tab.length;
    
                // 回收table[staleSlot]处的值,value置为null,整体置为null
                tab[staleSlot].value = null;
                tab[staleSlot] = null;
                // size-1
                size--;
    
                // Rehash until we encounter null
                Entry e;
                int i;
                for (i = nextIndex(staleSlot, len);
                     (e = tab[i]) != null;
                     i = nextIndex(i, len)) {
                    ThreadLocal<?> k = e.get();
                    // 如果当前的table[i] Entry无用,则直接回收
                    if (k == null) {
                        e.value = null;
                        tab[i] = null;
                        size--;
                    } else {
                        /**
                         * 问题:
                         * 假设ThreadLocalMap的table长度为4的,其中table[1]、table[2]已经有值,而且key都不为null,
                         * 这时候set一个threadlocal1,算出来的索引为1,根据线性探测法,最后会设置到table[3];
                         * 然后,table[2]被remove,此时table[2]==null;
                         * 然后,get threadlocal1,此时算出索引还是1, 从table[1]向后查找,结果table[2]==null,则查找链就断了,结果返回null,
                         * 而没有办法查找到真正的table[3],这种情况threadlocal是怎么处理的呢?
                         *
                         * 答案就是下边的代码:会对remove元素的后边的元素循环遍历,直到遍历到非null为止。对每个元素做如下逻辑:
                         * 如果当前元素的k==null,则回收,如果不是无效元素,则计算其真实的直接索引,例如重新计算table[3]的索引,此处为1,与当前遍历的索引3不同,
                         * 所以将table[3]先置空,然后从table[1]开始向后寻找第一个为null的位置,重新放置table[3]的元素,此处就是table[2]=table[3]
                         *
                         * 简言之:remove一个Entry之后,会整理remove的Entry元素之后的一块儿连续的(中间没有null的entry)元素子数组,全部使用线性探测法重新计算,
                         * 这样remove掉的那一块儿就会被后续的Entry重新填充,查询链就不会断
                         */
                        // 计算真实的直接索引位置,因为可能k是因为hash冲突循环后移的
                        int h = k.threadLocalHashCode & (len - 1);
                        // 如果h和i,不相等,表示确实k是循环后移了的
                        if (h != i) {
                            // 首先将之前的Entry置为null
                            tab[i] = null;
    
                            // 一直向后循环找到第一个为null的索引,之后table[h] = e(将当前的Entry放置到table[h]新的hash位置上)
                            while (tab[h] != null)
                                h = nextIndex(h, len);
                            tab[h] = e;
                        }
                    }
                }
                return i;
            }
    

    步骤:

    • 将table[staleSlot]的value和Entry都置为null,元素个数size - 1
    • 遍历staleSlot之后的一段连续的Entry,如果Entry的key==null,将该Entry的value和Entry本身置空,如果key是有效的,将要进行 整理操作
      • 计算直接索引的位置,如果当前的Entry的真实索引与直接索引不相等,则表示该Entry之前是因为hash冲突被冲到后边的slot里的,此处会重新从直接索引位置开始向后找到第一个不为null的的位置,将当前的Entry元素放置到这里,这样会保证原本连续的Entry子数组,在从中删除一个Entry后,子数组中剩下的元素还可以组成一个连续的Entry数组,这保证了get操作的查询链不会断。

    注意:

    • remove一个元素后,会调用这里的 整理操作 进行连续Entry子数组的整理,不会使查询链断掉。要仔细的看注释中的那个例子。
    • 整理的过程相当于一次从staleSlot开始的连续段 内存碎片整理,将无效的Entry删除,有效的Entry按照原本的顺序聚集。

    4.2 expungeStaleEntries()

            /**
             * 回收table中所有的无效Entry.
             */
            private void expungeStaleEntries() {
                Entry[] tab = table;
                int len = tab.length;
                for (int j = 0; j < len; j++) {
                    Entry e = tab[j];
                    if (e != null && e.get() == null)
                        expungeStaleEntry(j);
                }
            }
    

    遍历table中所有的元素,发现无效的Entry后,就调用expungeStaleEntry(int staleSlot)进行连续子数组的资源清理与整理。该方法影响较大,仅仅用在rehash()中。

    4.3 cleanSomeSlots(int i, int n)

            /**
             * 对数级别的扫描,介于 "不扫描(fast but retains garbage)" 和 "完全扫描(find all garbage but would cause some insertions to take O(n) time)" 之间
             *
             * @param i a position known NOT to hold a stale entry. The
             *          scan starts at the element after i.
             * @param n scan control: {@code log2(n)} cells are scanned,
             *          unless a stale entry is found, in which case
             *          {@code log2(table.length)-1} additional cells are scanned.
             *          When called from insertions, this parameter is the number
             *          of elements, but when from replaceStaleEntry, it is the
             *          table length. (Note: all this could be changed to be either
             *          more or less aggressive by weighting n instead of just
             *          using straight log n. But this version is simple, fast, and
             *          seems to work well.)
             * @return true if any stale entries have been removed.
             */
            private boolean cleanSomeSlots(int i, int n) {
                boolean removed = false;
                Entry[] tab = table;
                int len = tab.length;
                do {
                    // 向后循环获取下一个索引,如果已经到了末尾,则返回table[0]
                    i = nextIndex(i, len);
                    Entry e = tab[i];
                    // 如果此Entry已经无效了,key-ThreadLocal已经被回收掉了,无法再通过该key获取Entry,如果不回收此Entry,则会造成内存泄露
                    if (e != null && e.get() == null) {
                        n = len;
                        removed = true;
                        // 尝试回收或者rehash"i到i之后的第一个Entry为null的索引(即返回值)之间"的Entry数据
                        i = expungeStaleEntry(i);
                    }
                } while ((n >>>= 1) != 0); // n / 2,假设n=16,当然set的时候n=size,则log2(16)=4,也就是说需要循环4次(也可以直接理解为16/2/2/2/2=1,第5次1/2==0则不再进行循环)
                return removed;
            }
    

    该方法只扫描一部分Entry,进行log2(n)次循环,如果恰好扫描到无效的Entry,则n=table.length,即进行log2(table.length)次循环。每个循环的步骤:

    • 获取下一个索引位置i及该位置上的Entry
    • 如果该Entry==null或者Entry依然有效,则进行下一次循环;
    • 如果该Entry!=null且无效,则先将n设置为长度(在set方法中n本来是)调用expungeStaleEntry(i)回收或者 整理 i之后的一段连续的Entry,之后再进行一下轮的回收和整理操作。

    4.4 弱引用WeakReference

    由于Entry继承了弱引用,他的key是referent,当发生gc时,如果该key已经不可达了,则直接回收(key==null),之后在如下操作时删除key==null的Entry。

    • get时调用expungeStaleEntry
    • set时调用replaceStaleEntry或者cleanSomeSlots或者expungeStaleEntries
    • remove时调用expungeStaleEntry

    五、总结

    5.1 set整个流程

    1. 首先将table[]成员变量局部化(在后续的操作中如果会对table成员变量进行多次操作,局部化会提高性能)
    2. 获取数组长度,计算直接索引
    3. 然后进行hash冲突检测与处理(线性探测法)
      • 获取直接索引位置上的Entry,如果不为null,表示发生了hash冲突,则先获取Entry中的key,如果该key与当前的ThreadLocal是同一个对象,则直接替换value,返回;
      • 如果Entry中的key==null,表示当前的Entry已经是一个无效的Entry了,执行replaceStaleEntry方法进行处理(replaceStaleEntry:如果从该Entry后的第一个元素开始在一个连续的Entry子数组内找到一个key与当前ThreadLocal相等的元素,则替换值,然后互换该Entry和当前的key==null的Entry位置;如果没找到,则将当前的key==null的Entry设置为新的将设置的Entry)返回;
      • 如果发生了hash冲突,但是直接索引位置上的Entry即不是当前的ThreadLocal的Entry,也不是一个无效的Entry,则需要向后循环遍历下一个Entry元素,再进行上述逻辑的处理。直到找到一个Entry为null的位置或者Entry失效的位置或者Entry就是当前的ThreadLocal的Entry的位置。
    4. 假设没有发生hash冲突或者经过上述的for循环找到了一个Entry为null的位置的时候,在该位置创建设置新添加的Entry,数组元素+1;
    5. 尝试log级别的回收部分无效的Entry,如果成功,则当前的数组中的元素个数将小于阈值,则肯定不需要rehash();如果失败,且数组元素个数 >= threshold,执行rehash()
      • 首先对整个table回收无效的Entry操作,如果可以有效减小数组中的元素个数size到数组长度的一半以下,则不需要进行扩容,如果size达到length的一半及以上,则进行扩容操作
      • 扩容:创建新的Entry[]是旧的长度的两倍;之后遍历旧数组,根据新的数组的长度重新计算每个有效Entry的直接索引,根据线性探测法重新填充数组。
        最后设置rehash()阈值、数组元素个数size,将新数组赋值给全局变量table。

    5.2 get整个流程

    1. 获取当前线程
    2. 获取当前线程的ThreadLocalMap属性
    3. 如果当前线程的ThreadLocalMap对象不为null,从ThreadLocalMap对象中获取key为当前的ThreadLocal的Entry
      • 计算当前的ThreadLocal的直接索引,获取直接索引位置的Entry
      • 如果该位置的Entry不为null且Entry的key与当前的ThreadLocal是同一个元素,则直接返回该Entry
      • 如果该位置的Entry为null,直接返回null(看getEntryAfterMiss逻辑)
      • 如果该位置的Entry不为null,但是Entry的key与当前的ThreadLocal不是同一个元素,则表明发生了hash冲突。此时,会不断的向后循环寻找,直到找到了要查找的Entry或者遍历到的Entry为null,如果找到了,返回Entry,如果没找到,返回null。
    4. 如果该Entry不为null,直接获取entry的value,返回即可
    5. 如果当前线程的ThreadLocalMap为null(还未createMap(Thread t, T firstValue))或者key为当前的ThreadLocal的Entry为null(没有调用map.set(this, value)),则执行设置初始化值的方法。

    5.3 各类回收的时机

    1. set或者setInitialValue
      • 如果从直接索引开始向后遍历的一段连续Entry段内发生hash冲突且冲突的Entry已经是无效的,此时会执行replaceStaleEntry,会执行回收和整理操作
      • 如果没有发生hash冲突 或者 发生了hash冲突但是在从直接索引开始的一段连续Entry段内没有找到与设置的key相等的Entry或者无效的Entry,如果直接索引所在的整个连续Entry段如果有无效Entry,则在set的尾部都会执行一次log级别的回收操作,如果回收不理想,需要进行rehash()
      • rehash()会进行一次全table的回收
    2. get
      • 在通过直接索引没有get到数据的时候,会循环遍历一段连续的Entry,如果遍历到的Entry是无效的,则进行一次连续Entry子数组的回收和整理
    3. remove
      • 当找到需要被删除的Entry时,进行一次连续Entry子数组的回收和整理
    4. gc时
      • 由于Entry继承了弱引用,他的key是referent,当发生gc时,如果该key已经无引用指向了,则直接回收(key==null了),之后再根据上述1,2,3的操作删除key==null的Entry。(这也是为什么ThreadLocal没有为Entry绑定ReferenceQueue的原因,因为Entry的删除已经可以发生在1,2,3中了;但是WeakHashMap不一样,WeakHashMap的Entry继承于WeakReference,其key是referent,当发生gc时,如果key不可达了,则回收key,之后将key对应的Entry放入queue,在后续对WeakHashMap的操作 - getTable()/size()/resize(int newCapacity)的时候进行Entry的回收)

    5.4 ThreadLocal与线程池

    线程池中的线程由于会被复用,所以线程池中的每一条线程在执行task结束后,要清理掉其ThreadLocalMap及其中的各个Entry,否则,当这条线程在下一次被复用的时候,其ThreadLocalMap信息还存储着上一次被使用的时的信息;另外,假设这条线程不再被使用,但是这个线程有可能不会被销毁(与线程池的类型和配置相关),那么其上的ThreadLocal将发生了资源泄露。所以netty的io线程池使用FastThreadLocalRunnable wrap了Runnable任务,当任务执行结束后,会做InternalThreadLocalMap和FastThreadLocal的清理操作。

    相关文章

      网友评论

        本文标题:Netty源码分析2 - ThreadLocal 源码解析

        本文链接:https://www.haomeiwen.com/subject/aylzmftx.html