美文网首页exam
彻底理解ThreadLocal

彻底理解ThreadLocal

作者: 炮灰向前冲啦 | 来源:发表于2018-03-13 16:27 被阅读0次

    ThreadLocal的作用是提供一个线程的局部变量,比如context、session。是直接把某个对象在各自线程中实例化一份,每个线程都有属于自己的该对象。ThreadLocal实例通常来说都是private static类型

    ThreadLocal首先不是解决线程安全问题。ThreadLocal内部保存的局部变量在多线程之间不能共享,也就是必须new出来。如果ThreadLocal内部保存的变量本身就是一个多线程共享的对象,那么还是会有线程安全的问题,此时用同步锁Synchronized来处理

    Synchronized处理线程间的数据共享,ThreadLocal处理线程间的数据独占

    使用姿势

    public class DateUtils {
        private static ThreadLocal<SimpleDateFormat> local = new ThreadLocal<SimpleDateFormat>() {
            @Override
            protected SimpleDateFormat initialValue() {
                return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            }
        };
    
        public static String format(Date date) {
            return local.get().format(date);
        }
    }
    

    SimpleDateFormat是线程不安全的,每次使用时必须new。用ThreadLocal来实现线程缓存,每个线程都有一个独享对象,避免了频繁创建对象,也避免了多线程的竞争

    FastDateFormat是apache commons包的实现

    private final ConcurrentMap<MultipartKey, F> cInstanceCache
            = new ConcurrentHashMap<>(7);
            
    public F getInstance(final String pattern, TimeZone timeZone, Locale locale) {
        Validate.notNull(pattern, "pattern must not be null");
        if (timeZone == null) {
            timeZone = TimeZone.getDefault();
        }
        if (locale == null) {
            locale = Locale.getDefault();
        }
        final MultipartKey key = new MultipartKey(pattern, timeZone, locale);
        F format = cInstanceCache.get(key);
        if (format == null) {
            format = createInstance(pattern, timeZone, locale);
            final F previousValue= cInstanceCache.putIfAbsent(key, format);
            if (previousValue != null) {
                // another thread snuck in and did the same work
                // we should return the instance that is in ConcurrentMap
                format= previousValue;
            }
        }
        return format;
    }
    

    FastDateFormat通过ConcurrentHashMap,避免所有相关操作线程都存放SimpleDateFormat对象,内存浪费。Map的key是MultipartKey对象(必须实现equals、hashcode方法),value是SimpleDateFormat

    序列化框架Kryo对象也是线程不安全的。通常使用ThreadLocal或者KryoPool实现操作安全

    get、set、remove实现原理

    先看ThreadLocal.get()方法实现

    public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();
    }
    
    ThreadLocalMap getMap(Thread t) {
        return t.threadLocals;
    }
    
    private T setInitialValue() {
        T value = initialValue();
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
        return value;
    }
    
    void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }
    
    // 初始化Entry[] table数组,长度16
    ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
        table = new Entry[INITIAL_CAPACITY];
        int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
        table[i] = new Entry(firstKey, firstValue);
        size = 1;
        
        // 设置resize值: 16*2/3
        setThreshold(INITIAL_CAPACITY);
    }
    

    每个Thread线程都有一个ThreadLocalMap数据结构,该Map的key是用户new的ThreadLocal对象,value是需要线程存储的object值

    在ThreadLocalMap中,初始化一个大小16的Entry数组,Entry对象用来保存每一个key-value键值对,只不过这里的key永远都是ThreadLocal对象。通过ThreadLocal对象的set方法,结果把ThreadLocal对象自己当做key,放进了ThreadLocalMap中

    ThreadLocal-Structure

    为什么key不是Thread ID,或者所有Thread共享一个全局的ConcurrentHashMap?

    既然每个Thread都有自己的局部变量存储空间,那Map肯定是属于Thread类的成员变量,全局Map反而会引入并发控制。至于Map的key,若是Thread id,那一个Thread就只能存储一个value值。试想,从Controller->Service->Dao,每个类都有ThreadLocal成员变量的话,同一个线程经历不同阶段,当然是以类的ThreadLocal对象为key,才能同一个线程,经历不同类时,获取到当前类设置的线程存储值

    分析ThreadLocalMap.Entry e = map.getEntry(this);

    // Entry类继承了WeakReference<ThreadLocal<?>>,即每个Entry对象都有一个ThreadLocal的弱引用
    static class Entry extends WeakReference<ThreadLocal<?>> {
        /** The value associated with this ThreadLocal. */
        Object value;
    
        Entry(ThreadLocal<?> k, Object v) {
            super(k);
            value = v;
        }
    }
    
    private Entry getEntry(ThreadLocal<?> key) {
        int i = key.threadLocalHashCode & (table.length - 1);
        Entry e = table[i];
        if (e != null && e.get() == key)
            return e;
        else
            return getEntryAfterMiss(key, i, e);
    }
    

    key.threadLocalHashCode & (table.length - 1),hash取模的实现。hashcode & (size-1) 比 hashcode % size实现更高效。

    private final int threadLocalHashCode = nextHashCode();
    
    private static AtomicInteger nextHashCode = new AtomicInteger();
    
    private static final int HASH_INCREMENT = 0x61c88647;
    
    private static int nextHashCode() {
        return nextHashCode.getAndAdd(HASH_INCREMENT);
    }
    

    每个ThreadLocal对象都有一个hash值threadLocalHashCode,每初始化一个ThreadLocal对象,hash值就增加一个固定大小0x61c88647。nextHashCode是静态的AtomicInteger,所有ThreadLocal对象共享getAndAdd(HASH_INCREMENT)

    魔数0x61c88647,斐波那契散列

    public class ThreadLocalDemo {
    
        private static final int size = 16;
        private static final int threshold = size - 1;
    
        private static final AtomicInteger atomic = new AtomicInteger();
        private static final int HASH_INCREMENT = 0x61c88647;
    
        public static void main(String[] args) {
            List<Integer> hashCode = Lists.newArrayList();
            List<Integer> fiboHash = Lists.newArrayList();
            List<Integer> murmurHash = Lists.newArrayList();
            List<Integer> consistentHash = Lists.newArrayList();
    
            for (int i = 0; i < size; i++) {
                Object a = new Object();
                hashCode.add(a.hashCode() & threshold);
                fiboHash.add(atomic.getAndAdd(HASH_INCREMENT) & threshold);
                murmurHash.add(Hashing.murmur3_32().hashInt(i).asInt() & threshold);
                consistentHash.add(Hashing.consistentHash(i, size) & threshold);
            }
    
            System.out.println(StringUtils.join(hashCode, ", "));
            System.out.println(StringUtils.join(fiboHash, ", "));
            System.out.println(StringUtils.join(murmurHash, ", "));
            System.out.println(StringUtils.join(consistentHash, ", "));
        }
    }
    

    结果为

    11, 5, 6, 3, 13, 4, 12, 10, 5, 7, 0, 2, 13, 4, 6, 7
    0, 7, 14, 5, 12, 3, 10, 1, 8, 15, 6, 13, 4, 11, 2, 9
    14, 10, 15, 1, 7, 14, 14, 1, 1, 3, 13, 10, 2, 1, 11, 6
    0, 6, 15, 8, 12, 10, 9, 13, 4, 14, 14, 12, 10, 0, 13, 14

    可以看出斐波散列分布很均匀,没有冲突。其他hashcode、murmurHash、consistentHash都会有或多或少的冲突。不过斐波散列需要AtomicInteger共享变量

    那是否可以直接用AtomicInteger递增取模,而不用递增0x61c88647以及Hash取模?

    主要原因:当插入新的Entry且出现Entry冲突,而进行线性探测时,后续的Entry坑也极大可能被占了(因为之前是连续存储),使得线性探测性能差。而斐波散列的nextIndex()很大可能是有坑且可以插入的。Netty的FastThreadLocal是AtomicInteger递增的

    进入getEntryAfterMiss(key, i, e)方法

    private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
        Entry[] tab = table;
        int len = tab.length;
    
        // 当hash取模的散列值所对应的entry有值但不是当前的ThreadLocal对象时,将对象顺延存放到下一个index,不断检测,直到entry有坑可以存放
        while (e != null) {
            ThreadLocal<?> k = e.get();
            if (k == key)
                return e;
            if (k == null)
                // 当Entry的key,也就是ThreadLocal对象为空,清理value值
                // value = null; entry = null;
                expungeStaleEntry(i);
            else
                i = nextIndex(i, len);
            e = tab[i];
        }
        return null;
    }
    
    // 开放定址法-线性探测
    private static int nextIndex(int i, int len) {
        return ((i + 1 < len) ? i + 1 : 0);
    }
    

    Hash散列冲突时,可以分离链表法开放定址法

    分离链表法:使用链表解决冲突,将散列值相同的元素都保存到一个链表中。当查询的时候,首先找到元素所在的链表,然后遍历链表查找对应的元素

    modulo

    开放定址法:当散列到的数组slot被占用时,就会尝试在数组中寻找其他空slot。探测数组空slot的方式有很多,这里介绍一种最简单的 –- 线性探测法。线性探测法就是从冲突的数组slot开始,依次往后探测空slot,如果到数组尾部,再从头开始探测(环形查找)

    nextIndex

    分析ThreadLocal.set()方法实现

    public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }
    
    private void set(ThreadLocal<?> key, Object value) {
        // We don't use a fast path as with get() because it is at
        // least as common to use set() to create new entries as
        // it is to replace existing ones, in which case, a fast
        // path would fail more often than not.
        Entry[] tab = table;
        int len = tab.length;
        // 根据ThreadLocal的散列值,查找对应元素在数组中的位置
        int i = key.threadLocalHashCode & (len-1);
    
        // 使用线性探测法查找元素
        for (Entry e = tab[i];
             e != null;
             e = tab[i = nextIndex(i, len)]) {
            ThreadLocal<?> k = e.get();
            
            // ThreadLocal存在时,直接覆盖之前的值
            if (k == key) {
                e.value = value;
                return;
            }
            
            // key为null,但是值不为null,说明之前的ThreadLocal对象已经被回收了,当前数组中的Entry是一个陈旧(stale)的元素
            if (k == null) {
                // 用新元素代替陈旧的元素,并cleanSomeSlots()
                replaceStaleEntry(key, value, i);
                return;
            }
        }
    
        tab[i] = new Entry(key, value);
        int sz = ++size;
        
        // 如果没有可清理的陈旧Entry并且数组中的元素大于阈值,则进行rehash
        if (!cleanSomeSlots(i, sz) && sz >= threshold)
            rehash();
    }
    

    分析ThreadLocal.remove()方法实现

     public void remove() {
         ThreadLocalMap m = getMap(Thread.currentThread());
         if (m != null)
             m.remove(this);
     }
     
     private void remove(ThreadLocal<?> key) {
        Entry[] tab = table;
        int len = tab.length;
        int i = key.threadLocalHashCode & (len-1);
        for (Entry e = tab[i];
             e != null;
             e = tab[i = nextIndex(i, len)]) {
            if (e.get() == key) {
                // 引用设置为null
                e.clear();
                // 清理陈旧的 Entry
                expungeStaleEntry(i);
                return;
            }
        }
    }
    

    内存泄漏问题

    Entry本身是一个只接受ThreadLocal对象的弱引用类,也就是说Entry存储的key是弱引用,但是它所存储的value是强引用。弱引用不用担心,当ThreadLocal在没有外部强引用时,下次GC就会被回收,那么持有强引用的value会不会引起内存泄漏?

    ThreadLocalMap本身对帮助垃圾回收做了很多处理,每次调用get、set方法的时候都会对无效元素进行清理,表内空间不足的时候更是会进行一次彻底的rehash。其中调用expungeStaleEntry()方法,都会在key已经失效时,对Entry的value以及Entry本身置null,释放这些强引用

    但是有些情况仍然会造成内存泄漏,那就是使用线程池。因为线程会永远存在线程池中,线程无法结束后回收,线程中的value引用会一直得不到释放,导致内存泄漏

    所以最稳妥的方法,还是在使用完ThreadLocal后及时调用remove方法。该方法会查找以当前ThreadLocal对象为key的Entry,及时清理这个元素的key、value和Entry本身

    父子线程值传递--InheritableThreadLocal

    private static final ThreadLocal<String> threadLocal = new ThreadLocal<>();
    
    public static void main(String[] args) throws InterruptedException {
        threadLocal.set("hello world");
        System.out.println("main: " + threadLocal.get());
    
        Thread thread = new Thread(() -> {
            System.out.println("thread: " + threadLocal.get());
            threadLocal.set("hi");
            System.out.println("thread: " + threadLocal.get());
        });
    
        thread.start();
        thread.join();
    
        System.out.println("main: " + threadLocal.get());
    }
    

    打印结果:

    main: hello world
    thread: null
    thread: hi
    main: hello world
    

    因为ThreadLocal的值跟Thread绑定,子线程get时获取值为null,父子线程之间set的值互不影响

    private static final ThreadLocal<String> threadLocal = new InheritableThreadLocal<>();
    

    修改ThreadLocal为InheritableThreadLocal后

    main: hello world
    thread: hello world
    thread: hi
    main: hello world
    

    子线程获取到父线程set的值,而子线程自定义的值不会影响父线程。子继承父线程值,而不会影响父线程值

    InheritableThreadLocal实现:

    其实Thread类有两个ThreadLocalMap变量。一个用于保存ThreadLocal对象和其value值;一个用于保存InheritableThreadLocal对象和其value值

    /* ThreadLocal values pertaining to this thread. This map is maintained
     * by the ThreadLocal class. */
    ThreadLocal.ThreadLocalMap threadLocals = null;
    
    /*
     * InheritableThreadLocal values pertaining to this thread. This map is
     * maintained by the InheritableThreadLocal class.
     */
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
    

    当new Thread()时,调用Thread的init方法。ThreadLocal.createInheritedMap()创建一个新的ThreadLocalMap,并copy父Thread的Map数据

    Thread parent = currentThread();
    
    if (inheritThreadLocals && parent.inheritableThreadLocals != null)
        this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
    

    注意:当使用线程池时,多个任务之间会因为线程共用,而导致ThreadLocal对象get、set操作相互污染

    可以参考阿里的实现方案: TransmittableThreadLocal

    扩展ThreadLocal

    Netty的FastThreadLocal<V>

    private final int index;
    public FastThreadLocal() {
        index = InternalThreadLocalMap.nextVariableIndex();
    }
    public static int nextVariableIndex() {
        int index = nextIndex.getAndIncrement();
        return index;
    }
    static final AtomicInteger nextIndex = new AtomicInteger();
    
    public static final Object UNSET = new Object();
    Object[] array = new Object[32];
    Arrays.fill(array, UNSET);
    

    每个FastThreadLocal对象都有一个index,该index是全局自增的AtomicInteger.getAndIncrement()。UnpaddedInternalThreadLocalMap维护一个初始32长度的Object[]数组,数组存放value值

    set()时,根据FastThreadLocal的index,将value插入到Object[index]下,因为index是全局自增的,不会出现slot槽有值的情况;get()时,根据index,直接数组Object[index]读取value即可;remove()时,设置Object[index] = new Object();

    FastThreadLocal相对于ThreadLocal,不需要hash,不需要线性探测(O(1)->O(n)),不需要在数组中存放ThreadLocal对象本身。缺点是有多少FastThreadLocal对象,就得至少多长数组,也无法利用回收后的数组槽(nextIndex自增导致)

    Spring的NamedThreadLocal<T>

    public class NamedThreadLocal<T> extends ThreadLocal<T> {
        private final String name;
    
        /**
         * Create a new NamedThreadLocal with the given name.
         * @param name a descriptive name for this ThreadLocal
         */
        public NamedThreadLocal(String name) {
            Assert.hasText(name, "Name must not be empty");
            this.name = name;
        }
    
        @Override
        public String toString() {
            return this.name;
        }
    }
    

    引申

    ThreadLocal源码详细解析

    Java弱引用

    理解Java-Reference

    TransmittableThreadLocal

    FastThreadLocal测试

    相关文章

      网友评论

        本文标题:彻底理解ThreadLocal

        本文链接:https://www.haomeiwen.com/subject/dhizfftx.html