美文网首页JUC并发相关
20. 并发终结之ThreadLocal

20. 并发终结之ThreadLocal

作者: 涣涣虚心0215 | 来源:发表于2020-09-21 09:46 被阅读0次

    我们提到过如何保证共享变量的线程安全性,比如可以用synchronized内部锁,也可以使用Lock,volatile或者原子变量。
    除此之外ThreadLocal也是保证共享变量线程安全的一个手段,它将共享变量变成线程私有(保证了对非线程安全对象访问的线程安全,又避免了锁的开销)。

    final static ThreadLocal<T> VAR = new ThreadLocal()

    ThreadLocal通常会被作为某个类的静态字段使用,因为如果声明成实例变量,那么每创建该类的一个实例都会导致新的ThreadLocal实例被创建,即使没有导致错误,也会导致重复创建对象带来的浪费;static意思就是这个类的所有实例共有这个VAR变量。

    ThreadLocal实现机制:

    Java里面每个Thread都维护了一个ThreadLocalMap对象threadLocals,ThreadLocalMap里面包含了若干Entry(private Entry[] table....一个key-value键值对),因此可以说每个线程都拥有若干这样的条目,相应的线程就被称为这些条目的属主线程
    Entry的key是一个ThreadLocal实例,Value是一个线程特有对象,即Entry的作用就是建立一个ThreadLocal实例与线程特有对象之间的对应关系,但是由于Entry对ThreadLocal实例的引用(Key的引用)是一个弱引用(WeakReference),因此它不会阻止被引用的ThreadLocal实例被垃圾回收,如果ThreadLocal引用置为null,那么GC的时候ThreadLocal实例被垃圾回收,那么其所在Entry的Key会被设置成null,此时Entry就是无效条目。另一方面,由于Entry对Value的引用是强引用,如果Entry本身有对它的可达强引用,那么Entry也会阻止其引用的Value被垃圾回收。
    此时,当ThreadLocalMap有新的ThreadLocal线程和Value对象关系被创建(相当于有新的Entry对象被添加到ThreadLocalMap)的时候,ThreadLocalMap会将无效条目清理,即使得对Value对象的强引用被清除,Value对象得以被垃圾回收;但是也有缺点,如果线程在相当长时间(或者一直)处于非运行状态,那么该线程的ThreadLocalMap可能就没有任何变化,相应的ThreadLocalMap中无效条目也不会被清理,这就可能导致这些线程的Entry所引用的Value对象无法被线程回收。导致伪内存泄漏

    ThreadLocal
    例如Value实线部分,即线程对象持有线程特有对象的强引用,可能会导致伪内存泄漏,那么只要打破这种关系,即通过调用ThreadLocal.remove()将线程特有对象从其所属的Entry中剥离,即可使线程特有对象(ThreadLocal包含的对象)和线程局部变量(ThreadLocal对象)都被垃圾回收。
    关于WeakReference

    弱引用, 当一个对象仅仅被weak reference指向, 而没有任何其他强引用指向的时候,只能生存到下一次垃圾回收之前,GC发生时,不管内存够不够,都会被回收。
    下面代码里a=null并不能使对象A在GC的时候被垃圾回收,因为b也是强引用并持有对象A的引用。
    如果用WeakReference来引用a,这时候如果a=null,那么GC的时候堆内存的对象A就会被回收;如果a==null不写,那么对象A在内存还是有强引用指向的,就不会被垃圾回收。

    A a = new A();
    B b = new B(a);
    a = null; 
    ------------------------------
    A a = new A();
    WeakReference<A> reference = new WeakReference(a);
    System.out.println(reference.get());
    System.gc();
    System.out.println(reference.get());
    结果:
    com.refinitiv.applayer.ftp.entity.A@5910e440
    com.refinitiv.applayer.ftp.entity.A@5910e440
    ------------------------------
    A a = new A();
    WeakReference<A> reference = new WeakReference(a);
    a = null;
    System.out.println(reference.get());
    System.gc();
    System.out.println(reference.get());
    结果:
    com.refinitiv.applayer.ftp.entity.A@5910e440
    null
    
    
    ThreadLocal在JVM中(简易版)

    Thread实例里面都有ThreadLocalMap变量,在线程里面调用ThreadLocal.set()或者get()方法,会创建当前线程所属的ThreadLocalMap实例,ThreadLocalMap内部主要是Entry类型数组Entry[] table,即会创建一个Entry来关联当前ThreadLocal实例(WeakReference类型的 )以及共享变量Value。
    当threadLocal引用变量置为null(指向内存ThreadLocal的引用变量被回收),没有强引用指向堆内存ThreadLocal对象,那么发生GC,WeakReference类型的变量会被回收,那么entry的key=null,但是thread对于value的强引用还存在,导致entry对象也一直留在堆内存,如果不主动调用remove(),如果thread挂起或者别的原因不能正常结束,由于ThreadLocalMap 的生命周期跟 Thread 一样长,那么堆内存一直得不到释放,会造成内存泄漏。

    JVM-ThreadLocal
    源码分析

    Thread类里面定义了一个ThreadLocal.ThreadLocalMap成员变量,这个变量是在ThreadLocal类里面进行维护的。

        /* ThreadLocal values pertaining to this thread. This map is maintained
         * by the ThreadLocal class. */
        ThreadLocal.ThreadLocalMap threadLocals = null;
    

    ThreadLocal主要的方法有set(), get(), setInitialValue()以及remove()方法。
    因为WeakReference的存活时间只能到下一次GC,GC就会被回收,所以set()和get()方法都会调用expungeStaleEntry()来清除无效entry,避免内存泄漏。

    public T get() {
        //拿到当前Thread的ThreadLocalMap实例
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            //拿到ThreadMap之后就根据当前ThreadLocal实例获得Entry实例
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
       //如果Entry不存在就调用setInitialValue来设置初始值,initialValue一般在在声明ThreadLocal变量的时候提供,如果没有提供,则为null。
        return setInitialValue();
    }
    /**
     * 根据提供的initialValue方法来设置初始值,这里面会创建当前Thread内部的ThreadLocalMap
     */
    private T setInitialValue() {
        T value = initialValue();
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
        return value;
    }
    
    /**
     * 设置当前线程的私有变量
     */
    public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }
    
    /**
     * 移除当前Thread关联的私有变量
     * 拿到ThreadLocalMap里的Entry对象,清除WeakReference,然后调用expungeStaleEntry清除无效entry
     */
     public void remove() {
         ThreadLocalMap m = getMap(Thread.currentThread());
         if (m != null)
             m.remove(this);
     }
    

    ThreadLocalMap是ThreadLocal的静态内部类,ThreadLocalMap内部又定义了一个Entry类型的静态内部类。

    static class ThreadLocalMap {
        //Entry静态内部类,继承WeakReference,Entry内部定义了一个ThreadLocal类型的WeakReference弱引用,以及强引用value
        static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;
    
            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }
        //Entry类型的数组,ThreadLocalMap内部维护的Entry数组
        private Entry[] table;
        //懒加载模式创建ThreadLocalMap对象
        ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
            table = new Entry[INITIAL_CAPACITY];
            int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
            table[i] = new Entry(firstKey, firstValue);
            size = 1;
            setThreshold(INITIAL_CAPACITY);
        }
        //根据ThreadLocal实例获得Entry对象
        private Entry getEntry(ThreadLocal<?> key) {
            int i = key.threadLocalHashCode & (table.length - 1);
            Entry e = table[i];
            //如果找到Entry e并且e的Reference就是key,就return
            //如果e不为null,但是key不一样就调用getEntryAfterMiss
            if (e != null && e.get() == key)
                return e;
            else
                return getEntryAfterMiss(key, i, e);
        }
        //getEntryAfterMiss主要做的就是Entry的WeakReference如果被垃圾回收,而变成null,
        //就可以提前通过expungeStaleEntry来回收已经无效的entry,防止内存泄漏。
        private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
            Entry[] tab = table;
            int len = tab.length;
    
            while (e != null) {
                ThreadLocal<?> k = e.get();
                if (k == key)
                    return e;
                if (k == null)
                   //清楚stale无效的entry
                    expungeStaleEntry(i);
                else
                    i = nextIndex(i, len);
                e = tab[i];
            }
            return null;
        }
        //为ThreadLocal实例和Value创建关联
        private void set(ThreadLocal<?> key, Object value) {
            Entry[] tab = table;
            int len = tab.length;
            int i = key.threadLocalHashCode & (len-1);
            //根据ThreadLocal拿到Entr
            //如果Entry存在,且key相等则直接更新value
            //   如果key已经被垃圾回收则首先会清理expunge无效的Entry,并创建新的Entry来替换。
            //如果Entry不存在,则直接创建Entry
            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                ThreadLocal<?> k = e.get();
                if (k == key) {
                    e.value = value;
                    return;
                }
                if (k == null) {
                    replaceStaleEntry(key, value, i);
                    return;
                }
            }
            tab[i] = new Entry(key, value);
            int sz = ++size;
            if (!cleanSomeSlots(i, sz) && sz >= threshold)
                rehash();
        }
        /**
         * 根据ThreadLocal实例移除Entry
         */
        private void remove(ThreadLocal<?> key) {
            Entry[] tab = table;
            int len = tab.length;
            int i = key.threadLocalHashCode & (len-1);
            for (Entry e = tab[i];
                 e != null;
                 e = tab[i = nextIndex(i, len)]) {
                if (e.get() == key) {
                    e.clear();
                    expungeStaleEntry(i);
                    return;
                }
            }
        }
        /**
         * 在set的时候如果entry无效了,需要重新创建entry替换无效entry
         */
        private void replaceStaleEntry(ThreadLocal<?> key, Object value,
                                       int staleSlot) {
            Entry[] tab = table;
            int len = tab.length;
            Entry e;
            int slotToExpunge = staleSlot;
            for (int i = prevIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = prevIndex(i, len))
                if (e.get() == null)
                    slotToExpunge = i;
            for (int i = nextIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = nextIndex(i, len)) {
                ThreadLocal<?> k = e.get();
                if (k == key) {
                    e.value = value;
                    tab[i] = tab[staleSlot];
                    tab[staleSlot] = e;
    
                    // Start expunge at preceding stale entry if it exists
                    if (slotToExpunge == staleSlot)
                        slotToExpunge = i;
                    cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
                    return;
                }
                if (k == null && slotToExpunge == staleSlot)
                    slotToExpunge = i;
            }
    
            // If key not found, put new entry in stale slot
            tab[staleSlot].value = null;
            tab[staleSlot] = new Entry(key, value);
    
            // If there are any other stale entries in run, expunge them
            if (slotToExpunge != staleSlot)
                cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
        }
    
        /**
         * 主要清除无效Entry的方法
         */
        private int expungeStaleEntry(int staleSlot) {
            Entry[] tab = table;
            int len = tab.length;
    
            // expunge entry at staleSlot
            tab[staleSlot].value = null;
            tab[staleSlot] = null;
            size--;
            // Rehash until we encounter null
            Entry e;
            int i;
            for (i = nextIndex(staleSlot, len);
                 (e = tab[i]) != null;
                 i = nextIndex(i, len)) {
                ThreadLocal<?> k = e.get();
                if (k == null) {
                    e.value = null;
                    tab[i] = null;
                    size--;
                } else {
                    int h = k.threadLocalHashCode & (len - 1);
                    if (h != i) {
                        tab[i] = null;
                        // Unlike Knuth 6.4 Algorithm R, we must scan until
                        // null because multiple entries could have been stale.
                        while (tab[h] != null)
                            h = nextIndex(h, len);
                        tab[h] = e;
                    }
                }
            }
            return i;
        }
        /**
         * 清除固定位置的无效Entry
         */
        private boolean cleanSomeSlots(int i, int n) {
            boolean removed = false;
            Entry[] tab = table;
            int len = tab.length;
            do {
                i = nextIndex(i, len);
                Entry e = tab[i];
                if (e != null && e.get() == null) {
                    n = len;
                    removed = true;
                    i = expungeStaleEntry(i);
                }
            } while ( (n >>>= 1) != 0);
            return removed;
        }
    
        /**
         * 在set的时候需要扩张entry数组,会调用expungeStaleEntries清除无效Entry
         */
        private void rehash() {
            expungeStaleEntries();
    
            // Use lower threshold for doubling to avoid hysteresis
            if (size >= threshold - threshold / 4)
                resize();
        }
    
        /**
         * rehash的时候会遍历table,清除stale的无效Entry
         */
        private void expungeStaleEntries() {
            Entry[] tab = table;
            int len = tab.length;
            for (int j = 0; j < len; j++) {
                Entry e = tab[j];
                if (e != null && e.get() == null)
                    expungeStaleEntry(j);
            }
        }
    }
    

    相关文章

      网友评论

        本文标题:20. 并发终结之ThreadLocal

        本文链接:https://www.haomeiwen.com/subject/kyowhktx.html