1. 介绍
1.1 什么是 ThreadLocal
ThreadLocal
类顾名思义可以理解为线程本地变量。每个线程往这个 ThreadLocal
中读写是线程隔离,互相之间不会影响的。它提供了一种将可变数据通过每个线程有自己的独立副本从而实现线程封闭的机制。
但是 ThreadLocal
没有解决多线程访问共享数据的问题,因此被多线程共享的数据不能放入 threadLocal
需在代码中以 private static
来实例化 ThreadLocal
对象。
ThreadLocal
中存放的元素需要有很短的生命周期。最好能在线程结束之后,显示调用remove
方法释放元素。
1.2 实现思路
Thread
中有一个类型为ThreadLocal.ThreadLocalMap
的属性 threadLocals
。也就是说每一个线程都有自己的一个 ThreadLocalMap
用来存储自己的独立数据。
每个线程在往某个 ThreadLocal
里插值的时候,都会往自己的 ThreadLocalMap
里存,读也是以某个 ThreadLocal
作为引用,在自己的map里找对应的key,从而实现了线程隔离。
2. 主要属性
private final int threadLocalHashCode = nextHashCode();
private static AtomicInteger nextHashCode = new AtomicInteger();
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
3. 主要方法
3.1 T get()
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
- 获取当前线程,并从线程中获取
ThreadLocalMap
。 - 如果
map
不为空且元素不为空, 则返回对象。 - 如果不存在则调用
setInitialValue
初始化map
。
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
获取线程中的 ThreadLocalMap
private T setInitialValue() {
T value = initialValue();
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
map.set(this, value);
} else {
createMap(t, value);
}
return value;
}
// 初始值 `value` 为 `null`,此处代码没有明白作者真正的含义
protected T initialValue() {
return null;
}
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
- 如果
map
不为空,则将当前对象为key
,null
为value
存入ThreadLocalMap
中。 - 如果
map
为空,则创建map
。
3.2 void set(T value)
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
map.set(this, value);
} else {
createMap(t, value);
}
}
- 从当前线程中获取
ThreadLocalMap
- 如果
map
不为空,则将当前对象为key
,null
为value
存入ThreadLocalMap
中。 - 如果
map
为空,则创建map
。
3.3 void remove()
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null) {
m.remove(this);
}
}
4. ThreadLocalMap
ThreadLocalMap提供了一种为ThreadLocal定制的高效实现,并且自带一种基于弱引用的垃圾清理机制。
虽然是 Map
,但是与 java.util.Map
没有任何关系。这个 Map
有自己的 key
和 value
。我们可以简单的认为 ThreadLocal
对象为 key
。是因为事实上 ThreadLocalMap
的 key
是存放着 ThreadLocal
的弱引用。
为什么要弱引用
因为如果这里使用普通的key-value形式来定义存储结构,实质上就会造成节点的生命周期与线程强绑定,只要线程没有销毁,那么节点在GC分析中一直处于可达状态,没办法被回收,而程序本身也无法判断是否可以清理节点。弱引用是Java中四档引用的第三档,比软引用更加弱一些,如果一个对象没有强引用链可达,那么一般活不过下一次GC。当某个ThreadLocal已经没有强引用可达,则随着它被垃圾回收,在ThreadLocalMap里对应的Entry的键值会失效,这为ThreadLocalMap本身的垃圾清理提供了便利。
4.1 存储结构
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
类 Entry
很显然是一个保存map键值对的实体,ThreadLocal<?>
为key, 要保存的线程局部变量的值为value。super(k)调用的 WeakReference
的构造函数,表示将ThreadLocal<?>
对象转换成弱引用对象,用做key。
4.2 主要属性
/**
* 初始容量,必须为2的幂
*/
private static final int INITIAL_CAPACITY = 16;
/**
* Entry表,大小必须为2的幂
*/
private Entry[] table;
/**
* 表里entry的个数
*/
private int size = 0;
/**
* 重新分配表大小的阈值,默认为0
*/
private int threshold;
ThreadLocalMap维护了一个Entry表或者说Entry数组,并且要求表的大小必须为2的幂,同时记录表里面entry的个数以及下一次需要扩容的阈值。
显然这里会产生一个问题,为什么必须是2的幂?
4.3 数据结构
/**
* 设置resize阈值以维持最坏2/3的装载因子
*/
private void setThreshold(int len) {
threshold = len * 2 / 3;
}
/**
* 环形意义的下一个索引
*/
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
/**
* 环形意义的上一个索引
*/
private static int prevIndex(int i, int len) {
return ((i - 1 >= 0) ? i - 1 : len - 1);
}
ThreadLocal有两个方法用于得到上一个/下一个索引,注意这里实际上是环形意义下的上一个与下一个。
ThreadLocalMap.png4.4 构造函数
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}
- 初始化
table
,默认容量为INITIAL_CAPACITY
。 - 用
firstKey
的threadLocalHashCode
与INITIAL_CAPACITY
取模得到哈希值。 - 初始化节点并设置
size
。 - 设置扩容阈值。
回想下 ThreadLocal
的主要属性:
private final int threadLocalHashCode = nextHashCode();
private static AtomicInteger nextHashCode = new AtomicInteger();
// 十进制为1640531527。
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
HASH_INCREMENT
:生成hash code间隙,这个魔数可以让生成出来的值或者说 ThreadLocal
的ID较为均匀地分布在2的幂大小的数组中。
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1)
这个魔数的选取与斐波那契散列有关,通过理论与实践,当我们用 0x61c88647
作为魔数累加为每个 ThreadLocal
分配各自的ID也就是 threadLocalHashCode
再与2的幂取模,得到的结果分布很均匀。
ThreadLocalMap
使用的是线性探测法,均匀分布的好处在于很快就能探测到下一个临近的可用slot,从而保证效率。这样就能很好的回答了上面为什么必须是2的幂的问题了,为了性能。
而 & (INITIAL_CAPACITY - 1)
, 这是一个取模小技巧。对于2的幂作为模数取模,可以用 &(2^n-1)
来替代 % 2^n
,位运算比取模效率高很多。
(15 & 3) == (15 % 4)
(18 & 7) == (18 % 8)
因为对2^n取模,只要不是低n位对结果的贡献显然都是0,会影响结果的只能是低n位。
4.5 Entry getEntry(ThreadLocal<?> key)
获取元素
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);
}
- 根据
key.threadLocalHashCode & (table.length - 1)
计算索引位置,即哈希值 - 获取
Entry
对象,如果不为空,且弱引用指向的ThreadLocal
对象是key
, 则返回Entry
对象 - 如果
Entry
对象为空或者弱引用对象不是key
或者弱引用对象已被回收,则需要调用getEntryAfterMiss
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
while (e != null) {
ThreadLocal<?> k = e.get();
if (k == key){
return e;
}
if (k == null){
expungeStaleEntry(i);
} else {
i = nextIndex(i, len);
}
e = tab[i];
}
return null;
}
- 如果
Entity
对象为空,则直接返回null
。表明用户没有存储对象。 - 当弱引用指向的对象是
key
,则返回。 - 如果由于弱引用对象已经被系统所回收,则调用
expungeStaleEntry
清理无效的entry
/**
* 清除无效弱引用的方法
*/
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
// expunge entry at staleSlot
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
// Rehash until we encounter null
Entry e;
int i;
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
int h = k.threadLocalHashCode & (len - 1);
if (h != i) {
tab[i] = null;
// Unlike Knuth 6.4 Algorithm R, we must scan until
// null because multiple entries could have been stale.
while (tab[h] != null){
h = nextIndex(h, len);
}
tab[h] = e;
}
}
}
return i;
}
- 将当前
entry
的数据清空,并将此entry
与数组解除关系。 - 获取环形下标中的
entry
,直到entry
为空,判断弱引用指向是否为空。 - 如果为空,则此
entry
的数据清空,并将此entry
与数组解除关系。 - 如果不为空,则重新计算索引位置,如果位置已被使用,则根据环形下标获取
entry
为空时设置到此位置。
总结
- 如果
index
对应的slot
就是要读的threadLocal
,则直接返回结果 - 调用
getEntryAfterMiss
线性探测,过程中每碰到无效slot
,调用expungeStaleEntry
进行段清理;如果找到了key
,则返回结果entry
- 没有找到
key
,返回null
- 由于是弱引用类型,所以
table
数组中slot
就有了三种情况:有效(key未回收)、无效(key已回收)、空(entry==null).
4.6 void set(ThreadLocal<?> key, Object value)
设置元素
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
if (k == key) {
e.value = value;
return;
}
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold){
rehash();
}
}
- 计算元素索引值
i
。 - 从
i
位置开始环形向下循环,如果之前设置过,则进行覆盖。如果出现无效元素,则调用replaceStaleEntry
替换。 - 如果是个新元素,则添加到
table
之中。 - 如果最后清理掉一些元素的话,需要重新哈希。
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;
int slotToExpunge = staleSlot;
//从无效位置 `staleSlot` 向前环形查找无效元素
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len)){
if (e.get() == null){
slotToExpunge = i;
}
}
//从无效位置 `staleSlot` 向后环形查找元素
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
//如果找到key,这种情况是如何出现的呢?
if (k == key) {
//替换对应的值与位置
e.value = value;
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
//如果之前向前环形查找到了无效元素,则以这个位置作为清理的起点,
//否则以当前位置作为起点
if (slotToExpunge == staleSlot){
slotToExpunge = i;
}
// 从slotToExpunge开始做一次连续段的清理,再做一次启发式清理
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
if (k == null && slotToExpunge == staleSlot){
slotToExpunge = i;
}
}
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
/**
* 启发式地清理slot,
* i对应entry是非无效(指向的ThreadLocal没被回收,或者entry本身为空)
* n是用于控制控制扫描次数的
* 正常情况下如果log n次扫描没有发现无效slot,函数就结束了
* 但是如果发现了无效的slot,将n置为table的长度len,做一次连续段的清理
* 再从下一个空的slot开始继续扫描
*
* 这个函数有两处地方会被调用,一处是插入的时候可能会被调用,另外个是在替换无效slot的时候可能会被调用,
* 区别是前者传入的n为元素个数,后者为table的容量
*/
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
// i在任何情况下自己都不会是一个无效slot,所以从下一个开始判断
i = nextIndex(i, len);
Entry e = tab[i];
if (e != null && e.get() == null) {
// 扩大扫描控制因子
n = len;
removed = true;
// 清理一个连续段
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);
return removed;
}
private void rehash() {
// 做一次全量清理
expungeStaleEntries();
/*
* 因为做了一次清理,所以size很可能会变小。
* ThreadLocalMap这里的实现是调低阈值来判断是否需要扩容,
* threshold默认为len*2/3,所以这里的threshold - threshold / 4 相当于len/2
*/
if (size >= threshold - threshold / 4) {
resize();
}
}
/**
* 扩容,因为需要保证table的容量len为2的幂,所以扩容即扩大2倍
*/
private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
int count = 0;
for (int j = 0; j < oldLen; ++j) {
Entry e = oldTab[j];
if (e != null) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null; // Help the GC
} else {
int h = k.threadLocalHashCode & (newLen - 1);
while (newTab[h] != null) {
h = nextIndex(h, newLen);
}
newTab[h] = e;
count++;
}
}
}
setThreshold(newLen);
size = count;
table = newTab;
}
/*
* 做一次全量清理
*/
private void expungeStaleEntries() {
Entry[] tab = table;
int len = tab.length;
for (int j = 0; j < len; j++) {
Entry e = tab[j];
if (e != null && e.get() == null){
expungeStaleEntry(j);
}
}
}
总结
我们来回顾一下 ThreadLocal
的set方法可能会有的情况
-
探测过程中slot都不无效,并且顺利找到key所在的slot,直接替换即可
-
探测过程中发现有无效slot,调用replaceStaleEntry,效果是最终一定会把key和value放在这个slot,并且会尽可能清理无效slot
- 在replaceStaleEntry过程中,如果找到了key,则做一个swap把它放到那个无效slot中,value置为新值
- 在replaceStaleEntry过程中,没有找到key,直接在无效slot原地放entry
-
探测没有发现key,则在连续段末尾的后一个空位置放上entry,这也是线性探测法的一部分。放完后,做一次启发式清理,如果没清理出去key,并且当前table大小已经超过阈值了,则做一次rehash,rehash函数会调用一次全量清理slot方法也即expungeStaleEntries,如果完了之后table大小超过了threshold - threshold / 4,则进行扩容2倍
4.7 void remove(ThreadLocal<?> key)
删除元素
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
e.clear();
expungeStaleEntry(i);
return;
}
}
}
网友评论