ThreadLocal的作用是提供一个线程的局部变量,比如context、session。是直接把某个对象在各自线程中实例化一份,每个线程都有属于自己的该对象。ThreadLocal实例通常来说都是private static类型
ThreadLocal首先不是解决线程安全问题。ThreadLocal内部保存的局部变量在多线程之间不能共享,也就是必须new出来。如果ThreadLocal内部保存的变量本身就是一个多线程共享的对象,那么还是会有线程安全的问题,此时用同步锁Synchronized来处理
Synchronized处理线程间的数据共享,ThreadLocal处理线程间的数据独占
使用姿势
public class DateUtils {
private static ThreadLocal<SimpleDateFormat> local = new ThreadLocal<SimpleDateFormat>() {
@Override
protected SimpleDateFormat initialValue() {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
}
};
public static String format(Date date) {
return local.get().format(date);
}
}
SimpleDateFormat是线程不安全的,每次使用时必须new。用ThreadLocal来实现线程缓存,每个线程都有一个独享对象,避免了频繁创建对象,也避免了多线程的竞争
FastDateFormat是apache commons包的实现
private final ConcurrentMap<MultipartKey, F> cInstanceCache
= new ConcurrentHashMap<>(7);
public F getInstance(final String pattern, TimeZone timeZone, Locale locale) {
Validate.notNull(pattern, "pattern must not be null");
if (timeZone == null) {
timeZone = TimeZone.getDefault();
}
if (locale == null) {
locale = Locale.getDefault();
}
final MultipartKey key = new MultipartKey(pattern, timeZone, locale);
F format = cInstanceCache.get(key);
if (format == null) {
format = createInstance(pattern, timeZone, locale);
final F previousValue= cInstanceCache.putIfAbsent(key, format);
if (previousValue != null) {
// another thread snuck in and did the same work
// we should return the instance that is in ConcurrentMap
format= previousValue;
}
}
return format;
}
FastDateFormat通过ConcurrentHashMap,避免所有相关操作线程都存放SimpleDateFormat对象,内存浪费。Map的key是MultipartKey对象(必须实现equals、hashcode方法),value是SimpleDateFormat
序列化框架Kryo对象也是线程不安全的。通常使用ThreadLocal或者KryoPool实现操作安全
get、set、remove实现原理
先看ThreadLocal.get()方法实现
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
return value;
}
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
// 初始化Entry[] table数组,长度16
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
// 设置resize值: 16*2/3
setThreshold(INITIAL_CAPACITY);
}
每个Thread线程都有一个ThreadLocalMap数据结构,该Map的key是用户new的ThreadLocal对象,value是需要线程存储的object值
在ThreadLocalMap中,初始化一个大小16的Entry数组,Entry对象用来保存每一个key-value键值对,只不过这里的key永远都是ThreadLocal对象。通过ThreadLocal对象的set方法,结果把ThreadLocal对象自己当做key,放进了ThreadLocalMap中
ThreadLocal-Structure为什么key不是Thread ID,或者所有Thread共享一个全局的ConcurrentHashMap?
既然每个Thread都有自己的局部变量存储空间,那Map肯定是属于Thread类的成员变量,全局Map反而会引入并发控制。至于Map的key,若是Thread id,那一个Thread就只能存储一个value值。试想,从Controller->Service->Dao,每个类都有ThreadLocal成员变量的话,同一个线程经历不同阶段,当然是以类的ThreadLocal对象为key,才能同一个线程,经历不同类时,获取到当前类设置的线程存储值
分析ThreadLocalMap.Entry e = map.getEntry(this);
// Entry类继承了WeakReference<ThreadLocal<?>>,即每个Entry对象都有一个ThreadLocal的弱引用
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);
}
key.threadLocalHashCode & (table.length - 1),hash取模的实现。hashcode & (size-1) 比 hashcode % size实现更高效。
private final int threadLocalHashCode = nextHashCode();
private static AtomicInteger nextHashCode = new AtomicInteger();
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
每个ThreadLocal对象都有一个hash值threadLocalHashCode,每初始化一个ThreadLocal对象,hash值就增加一个固定大小0x61c88647。nextHashCode是静态的AtomicInteger,所有ThreadLocal对象共享getAndAdd(HASH_INCREMENT)
魔数0x61c88647,斐波那契散列
public class ThreadLocalDemo {
private static final int size = 16;
private static final int threshold = size - 1;
private static final AtomicInteger atomic = new AtomicInteger();
private static final int HASH_INCREMENT = 0x61c88647;
public static void main(String[] args) {
List<Integer> hashCode = Lists.newArrayList();
List<Integer> fiboHash = Lists.newArrayList();
List<Integer> murmurHash = Lists.newArrayList();
List<Integer> consistentHash = Lists.newArrayList();
for (int i = 0; i < size; i++) {
Object a = new Object();
hashCode.add(a.hashCode() & threshold);
fiboHash.add(atomic.getAndAdd(HASH_INCREMENT) & threshold);
murmurHash.add(Hashing.murmur3_32().hashInt(i).asInt() & threshold);
consistentHash.add(Hashing.consistentHash(i, size) & threshold);
}
System.out.println(StringUtils.join(hashCode, ", "));
System.out.println(StringUtils.join(fiboHash, ", "));
System.out.println(StringUtils.join(murmurHash, ", "));
System.out.println(StringUtils.join(consistentHash, ", "));
}
}
结果为
11, 5, 6, 3, 13, 4, 12, 10, 5, 7, 0, 2, 13, 4, 6, 7
0, 7, 14, 5, 12, 3, 10, 1, 8, 15, 6, 13, 4, 11, 2, 9
14, 10, 15, 1, 7, 14, 14, 1, 1, 3, 13, 10, 2, 1, 11, 6
0, 6, 15, 8, 12, 10, 9, 13, 4, 14, 14, 12, 10, 0, 13, 14
可以看出斐波散列分布很均匀,没有冲突。其他hashcode、murmurHash、consistentHash都会有或多或少的冲突。不过斐波散列需要AtomicInteger共享变量
那是否可以直接用AtomicInteger递增取模,而不用递增0x61c88647以及Hash取模?
主要原因:当插入新的Entry且出现Entry冲突,而进行线性探测时,后续的Entry坑也极大可能被占了(因为之前是连续存储),使得线性探测性能差。而斐波散列的nextIndex()很大可能是有坑且可以插入的。Netty的FastThreadLocal是AtomicInteger递增的
进入getEntryAfterMiss(key, i, e)方法
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
// 当hash取模的散列值所对应的entry有值但不是当前的ThreadLocal对象时,将对象顺延存放到下一个index,不断检测,直到entry有坑可以存放
while (e != null) {
ThreadLocal<?> k = e.get();
if (k == key)
return e;
if (k == null)
// 当Entry的key,也就是ThreadLocal对象为空,清理value值
// value = null; entry = null;
expungeStaleEntry(i);
else
i = nextIndex(i, len);
e = tab[i];
}
return null;
}
// 开放定址法-线性探测
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
Hash散列冲突时,可以分离链表法、开放定址法
分离链表法:使用链表解决冲突,将散列值相同的元素都保存到一个链表中。当查询的时候,首先找到元素所在的链表,然后遍历链表查找对应的元素
modulo开放定址法:当散列到的数组slot被占用时,就会尝试在数组中寻找其他空slot。探测数组空slot的方式有很多,这里介绍一种最简单的 –- 线性探测法。线性探测法就是从冲突的数组slot开始,依次往后探测空slot,如果到数组尾部,再从头开始探测(环形查找)
nextIndex分析ThreadLocal.set()方法实现
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
private void set(ThreadLocal<?> key, Object value) {
// We don't use a fast path as with get() because it is at
// least as common to use set() to create new entries as
// it is to replace existing ones, in which case, a fast
// path would fail more often than not.
Entry[] tab = table;
int len = tab.length;
// 根据ThreadLocal的散列值,查找对应元素在数组中的位置
int i = key.threadLocalHashCode & (len-1);
// 使用线性探测法查找元素
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
// ThreadLocal存在时,直接覆盖之前的值
if (k == key) {
e.value = value;
return;
}
// key为null,但是值不为null,说明之前的ThreadLocal对象已经被回收了,当前数组中的Entry是一个陈旧(stale)的元素
if (k == null) {
// 用新元素代替陈旧的元素,并cleanSomeSlots()
replaceStaleEntry(key, value, i);
return;
}
}
tab[i] = new Entry(key, value);
int sz = ++size;
// 如果没有可清理的陈旧Entry并且数组中的元素大于阈值,则进行rehash
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
分析ThreadLocal.remove()方法实现
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null)
m.remove(this);
}
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
// 引用设置为null
e.clear();
// 清理陈旧的 Entry
expungeStaleEntry(i);
return;
}
}
}
内存泄漏问题
Entry本身是一个只接受ThreadLocal对象的弱引用类,也就是说Entry存储的key是弱引用,但是它所存储的value是强引用。弱引用不用担心,当ThreadLocal在没有外部强引用时,下次GC就会被回收,那么持有强引用的value会不会引起内存泄漏?
ThreadLocalMap本身对帮助垃圾回收做了很多处理,每次调用get、set方法的时候都会对无效元素进行清理,表内空间不足的时候更是会进行一次彻底的rehash。其中调用expungeStaleEntry()方法,都会在key已经失效时,对Entry的value以及Entry本身置null,释放这些强引用
但是有些情况仍然会造成内存泄漏,那就是使用线程池。因为线程会永远存在线程池中,线程无法结束后回收,线程中的value引用会一直得不到释放,导致内存泄漏
所以最稳妥的方法,还是在使用完ThreadLocal后及时调用remove方法。该方法会查找以当前ThreadLocal对象为key的Entry,及时清理这个元素的key、value和Entry本身
父子线程值传递--InheritableThreadLocal
private static final ThreadLocal<String> threadLocal = new ThreadLocal<>();
public static void main(String[] args) throws InterruptedException {
threadLocal.set("hello world");
System.out.println("main: " + threadLocal.get());
Thread thread = new Thread(() -> {
System.out.println("thread: " + threadLocal.get());
threadLocal.set("hi");
System.out.println("thread: " + threadLocal.get());
});
thread.start();
thread.join();
System.out.println("main: " + threadLocal.get());
}
打印结果:
main: hello world
thread: null
thread: hi
main: hello world
因为ThreadLocal的值跟Thread绑定,子线程get时获取值为null,父子线程之间set的值互不影响
private static final ThreadLocal<String> threadLocal = new InheritableThreadLocal<>();
修改ThreadLocal为InheritableThreadLocal后
main: hello world
thread: hello world
thread: hi
main: hello world
子线程获取到父线程set的值,而子线程自定义的值不会影响父线程。子继承父线程值,而不会影响父线程值
InheritableThreadLocal实现:
其实Thread类有两个ThreadLocalMap变量。一个用于保存ThreadLocal对象和其value值;一个用于保存InheritableThreadLocal对象和其value值
/* ThreadLocal values pertaining to this thread. This map is maintained
* by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;
/*
* InheritableThreadLocal values pertaining to this thread. This map is
* maintained by the InheritableThreadLocal class.
*/
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
当new Thread()时,调用Thread的init方法。ThreadLocal.createInheritedMap()创建一个新的ThreadLocalMap,并copy父Thread的Map数据
Thread parent = currentThread();
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
注意:当使用线程池时,多个任务之间会因为线程共用,而导致ThreadLocal对象get、set操作相互污染
可以参考阿里的实现方案: TransmittableThreadLocal
扩展ThreadLocal
Netty的FastThreadLocal<V>
private final int index;
public FastThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
}
public static int nextVariableIndex() {
int index = nextIndex.getAndIncrement();
return index;
}
static final AtomicInteger nextIndex = new AtomicInteger();
public static final Object UNSET = new Object();
Object[] array = new Object[32];
Arrays.fill(array, UNSET);
每个FastThreadLocal对象都有一个index,该index是全局自增的AtomicInteger.getAndIncrement()。UnpaddedInternalThreadLocalMap维护一个初始32长度的Object[]数组,数组存放value值
set()时,根据FastThreadLocal的index,将value插入到Object[index]下,因为index是全局自增的,不会出现slot槽有值的情况;get()时,根据index,直接数组Object[index]读取value即可;remove()时,设置Object[index] = new Object();
FastThreadLocal相对于ThreadLocal,不需要hash,不需要线性探测(O(1)->O(n)),不需要在数组中存放ThreadLocal对象本身。缺点是有多少FastThreadLocal对象,就得至少多长数组,也无法利用回收后的数组槽(nextIndex自增导致)
Spring的NamedThreadLocal<T>
public class NamedThreadLocal<T> extends ThreadLocal<T> {
private final String name;
/**
* Create a new NamedThreadLocal with the given name.
* @param name a descriptive name for this ThreadLocal
*/
public NamedThreadLocal(String name) {
Assert.hasText(name, "Name must not be empty");
this.name = name;
}
@Override
public String toString() {
return this.name;
}
}
网友评论