netty新增了FastThreadLocal和FastThreadLocalThread等类来实现了一个FastThread框架,相较于java.lang.Thread和java.lang.ThreadLocal速度更快。
在看FastThread框架之前,先看一下java.lang.ThreadLocal的源码。
- 一、使用姿势
- 二、数据结构
- 三、源码分析
- 四、回收机制
- 总结
一、使用姿势
public class ThreadLocalTest {
/**
* 最佳实践一:使用private static
* {@code ThreadLocal} instances are typically private static fields in classes
* that wish to associate state with a thread (e.g., a user ID or Transaction ID)
*
* java8写法:
* private static final ThreadLocal<Integer> integerThreadLocal = ThreadLocal.withInitial(() -> 100);
*
* java8之前的写法:
* <pre>
* private static final ThreadLocal<Integer> integerThreadLocal = new ThreadLocal<Integer>() {
* @Override
* protected Integer initialValue() {
* return 100;
* }
* };
* </pre>
*/
private static final ThreadLocal<Integer> integerThreadLocal = new ThreadLocal<Integer>() {
/**
* 最佳实践二:重写initialValue()
*/
@Override
protected Integer initialValue() {
return 100;
}
};
@Test
public void testThreadLocal() {
System.out.println(integerThreadLocal.get());
/**
* 最佳实践三:用完如果可以删除,则手动删除
*/
integerThreadLocal.remove();
System.out.println(integerThreadLocal.get());
}
}
最佳实践
- 在类中定义ThreadLocal,用private static修饰;
- 根据源码分析,由于ThreadLocal对象本身会作为Entry的key去获取数据,所以最好也要用final去修饰。key通常都是不可变对象,否则当作为key的对象发生了变化之后,之前存储的数据将无法get,造成资源泄露;
- 使用匿名内部类实现initialValue(),在执行get()时,如果当前的Thread的ThreadLocalMap没有integerThreadLocal这个key的Entry(不管是第一次get还是remove之后的第一次get),则直接使用initialValue()进行初始化操作;
- 在使用完ThreadLocal后,及时手动remove()无用的ThreadLocal,防止资源泄露
二、数据结构
threadlocal.png说明
- 每一个Thread类都有一个属性:ThreadLocal.ThreadLocalMap threadLocals = null
- 每一个ThreadLocalMap都有一个Entry[]数组
- 每一个Entry继承于WeakReference,Entry的key(ThreadLocal实例)作为WeakReference的referent,传入的value作为value
java引用:
注意
- 从第三条说明点发现每一个ThreadLocal只能存储一个值,对于dubbo的RpcContext.LOCAL来讲,存储了一个RpcContext上下文信息;如果需要存储多个值,根据情况,将多个值合并为对象进行存储或者使用多个ThreadLocal实例进行存储
三、源码分析
3.1、ThreadLocal的创建
基础属性
/**
* 每一个ThreadLocal对象都有一个唯一的threadLocalHashCode。
* ThreadLocal对象会作为ThreadLocalMap的Entry的key
*/
private final int threadLocalHashCode = nextHashCode();
private static AtomicInteger nextHashCode = new AtomicInteger();
/**
* 魔数:与斐波那契数列和黄金分割点有关。用于散列均匀,减少hash冲突
*/
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
说明
- 每一个ThreadLocal对象都有一个唯一的threadLocalHashCode,该值用于计算Entry元素存储的直接索引。
- 直接索引:元素根据threadLocalHashCode&(table.len-1)算出来的值
- 真实索引:元素可能由于hash冲突,使用线性探测法,放置在不是直接索引的位置
- hash算法:每一个ThreadLocal的threadLocalHashCode都是前一个的threadLocalHashCode + 0x61c88647。这样的hash算法可以很大程度的避免hash冲突,散列很均匀。0x61c88647这个数是怎么算出来的,见:https://www.javaspecialists.eu/archive/Issue164.html
普通构造函数
public ThreadLocal() {
}
java8提供的构造函数
public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) {
return new SuppliedThreadLocal<>(supplier);
}
static final class SuppliedThreadLocal<T> extends ThreadLocal<T> {
private final Supplier<? extends T> supplier;
SuppliedThreadLocal(Supplier<? extends T> supplier) {
this.supplier = Objects.requireNonNull(supplier);
}
@Override
protected T initialValue() {
return supplier.get();
}
}
java8提供了Supplier(提供者) FunctionalInterface。基于该接口可以方便的替代掉示例中的使用匿名内部类来重写initialValue()的方法。需要注意的是,supplier函数不可为null。
@FunctionalInterface
public interface Supplier<T> {
/**
* Gets a result.
*/
T get();
}
3.2 数据的初始化与设置
/**
* 设置初始值
* 与set(T value)几乎相同,下边代码等价于
* <pre>
* private T setInitialValue() {
* T value = initialValue();
* set(value);
* return value;
* }
* </pre>
*/
private T setInitialValue() {
// 获取初始值(通常是调用使用方重写的initialValue(),否则使用{@link ThreadLocal#initialValue()返回null})
T value = initialValue();
// 获取当前线程
Thread t = Thread.currentThread();
// 获取当前线程的ThreadLocalMap对象
ThreadLocalMap map = getMap(t);
// 如果当前线程的ThreadLocalMap对象不为null,则直接将{当前的ThreadLocal对象,value}设置到ThreadLocalMap对象中去
if (map != null)
map.set(this, value);
// 如果当前线程的ThreadLocalMap为null,则直接新建ThreadLocalMap对象,并将{当前的ThreadLocal对象,value}设置到ThreadLocalMap对象中去
else
createMap(t, value);
// 返回初始值
return value;
}
protected T initialValue() {
return null;
}
/**
* 设置值
*/
public void set(T value) {
// 获取当前线程
Thread t = Thread.currentThread();
// 获取当前线程的ThreadLocalMap对象
ThreadLocalMap map = getMap(t);
// 如果当前线程的ThreadLocalMap对象不为null,则直接将{当前的ThreadLocal对象,value}设置到ThreadLocalMap对象中去
if (map != null)
map.set(this, value);
// 如果当前线程的ThreadLocalMap为null,则直接新建ThreadLocalMap对象,并将{当前的ThreadLocal对象,value}设置到ThreadLocalMap对象中去
else
createMap(t, value);
}
/**
* 获取当前线程t的ThreadLocalMap
*/
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
/**
* 创建ThreadLocalMap,并将 {当前的ThreadLocal对象,firstValue} 设置到ThreadLocalMap对象中去
* this:当前的ThreadLocal对象
*/
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
setInitialValue()流程:
- 首先调用initialValue()获取初始值(通常是调用使用方重写的initialValue(),否则返回null)
- 然后获取当前线程
- 然后获取当前线程的ThreadLocalMap对象属性
- 如果当前线程的ThreadLocalMap对象不为null,则直接将{当前的ThreadLocal对象,value}设置到ThreadLocalMap对象中去
- 如果当前线程的ThreadLocalMap为null,则直接新建ThreadLocalMap对象,并将{当前的ThreadLocal对象,value}设置到ThreadLocalMap对象中去
- 最后返回initialValue()获取到的初始值
setInitialValue()被使用的时机:ThreadLocal.get()。
关于ThreadLocalMap的创建和向ThreadLocalMap中设置参数的源码后续再说。
3.3 数据的获取
public T get() {
// 获取当前线程
Thread t = Thread.currentThread();
// 获取当前线程的ThreadLocalMap属性
ThreadLocalMap map = getMap(t);
if (map != null) {
// 从ThreadLocalMap属性中获取key为当前的ThreadLocal的Entry
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
// 如果Entry不为null,直接获取entry的value,返回即可
@SuppressWarnings("unchecked")
T result = (T) e.value;
return result;
}
}
// 如果当前线程的ThreadLocalMap为null或者key为当前的ThreadLocal的Entry为null
return setInitialValue();
}
get()流程:
- 获取当前线程
- 获取当前线程的ThreadLocalMap属性
- 如果当前线程的ThreadLocalMap对象不为null,从ThreadLocalMap对象中获取key为当前的ThreadLocal的Entry
- 如果该Entry不为null,直接获取entry的value,返回即可
- 如果当前线程的ThreadLocalMap为null(还未
createMap(Thread t, T firstValue)
)或者key为当前的ThreadLocal的Entry为null(没有调用map.set(this, value)
),则执行设置初始化值的方法。
关于根据ThreadLocal从ThreadLocalMap中获取Entry的源码后续再说。
3.4 数据的删除
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null)
m.remove(this);
}
remove()步骤:
- 获取当前的线程
- 获取当前线程的ThreadLocalMap
- 如果当前的ThreadLocalMap不为null,直接执行调用ThreadLocalMap的remove(ThreadLocal<?> key)
关于根据ThreadLocal从ThreadLocalMap中remove Entry的源码后续再说。
可以看到具体的操作全部委托到了ThreadLocalMap中来执行。
3.5 ThreadLocalMap的创建
基础属性
/**
* The initial capacity -- MUST be a power of two.
*/
private static final int INITIAL_CAPACITY = 16;
/**
* The table, resized as necessary.
* table.length MUST always be a power of two.
*/
private Entry[] table;
/**
* The number of entries in the table.
*/
private int size = 0;
/**
* threshold == table大小的2/3,
* 1、当size >= threshold,遍历table并删除所有key为null的元素(rehash()),
* 2、如果删除后size >= threshold*3/4 == 数组长度*1/2 时,需要对table进行扩容
*/
private int threshold; // Default to 0
private void setThreshold(int len) {
threshold = len * 2 / 3;
}
静态内部类Entry
static class Entry extends WeakReference<ThreadLocal<?>> {
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
其中value就是set的value,而k是当前的ThreadLocal对象,作为WeakReference的referent属性。
构造器
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
// 创建数组实例,默认长度16
table = new Entry[INITIAL_CAPACITY];
// 求余计算数组索引i
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
// 第一次插入,没有hash冲突,直接放置
table[i] = new Entry(firstKey, firstValue);
// 数组实际元素长度 + 1
size = 1;
// 设置扩容的阈值为 16*2/3 = 10,当有10个元素时,发生扩容
setThreshold(INITIAL_CAPACITY);
}
步骤:
- 创建数组实例,默认长度16
- 计算firstKey的直接索引,当len=2的n次方时,x&(len-1) <=> x%len,所以这里相当于直接求余,只是&运算效率更高
- 第一次插入,没有hash冲突,直接创建Entry并放置就ok了
- 然后数组中元素个数size+1
- 设置rehash()的阈值
索引操作
/**
* i:数组索引
* len:数组长度
* 如果 i+1<数组长度,则返回i+1, 如果 i+1>=数组长度,则直接取0(即开头table[0]),
* 在实际使用中,实际上就是不断向后的循环取,到了末尾,再回到table[0],再向后循环取
*/
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
/**
* 如果 i-1>=0,则返回i-1, 如果 i<1,则直接取数组长度-1(即末尾table[len-1]),
* 在实际使用中,实际上就是不断向前的循环取,到了开头,再回到table[len-1],再向前循环取
*/
private static int prevIndex(int i, int len) {
return ((i - 1 >= 0) ? i - 1 : len - 1);
}
从上述的逻辑可知,我们可以将table[]看做是一个循环数组。
3.6 向ThreadLocalMap中设置Entry
/**
* 设置 {key, value} 到当前线程的ThreadLocalMap中
*/
private void set(ThreadLocal<?> key, Object value) {
// 成员变量局部化,提高性能
Entry[] tab = table;
// 获取数组容量
int len = tab.length;
// 获取当前的threadlocal位于数组的下标,当len=2的n次方时,
// key.threadLocalHashCode & (len - 1) <=> key.threadLocalHashCode % len
int i = key.threadLocalHashCode & (len - 1);
/**
* 先获取tab[i],判断如果不为null,则进行循环体内逻辑;
* 若没有return,则进行二次循环,tab[i+1],判断如果不为null,则进行循环体内逻辑;
* 当 i + 1 >= len 时,tab[0]回到头再进行循环。
*
* 这就是解决hash冲突的另一种方式:线性探测法。如果当前槽没有元素,直接插入元素;如果当前槽有元素,则向后寻找第一个为null的槽,放置该元素。
* 在这里,还添加了hash冲突时替换原有元素或者替换无效元素的逻辑。
*/
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
// 获取当前的Entry的key,使用e.get(),是因为当前的key是一个WeakReference,使用get()获取key
ThreadLocal<?> k = e.get();
// 如果当前的key就是刚刚查询出来的Entry的key(即修改的是同一个Entry),则直接替换值
if (k == key) {
e.value = value;
return;
}
// 如果刚刚查询出来的Entry的key是null,则表明该Entry的key(ThreadLocal)已经被回收了
// 使用replaceStaleEntry替换掉已经无效的Entry
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
// 通过上述for循环解决了hash冲突之后,创建新的Entry,放置到当前的table[i]节点上
tab[i] = new Entry(key, value);
// table的size+1
int sz = ++size;
// 回收部分无效的Entry,如果成功,则当前的数组中的元素个数将小于阈值,则肯定不需要扩容,
// 否则判断当前的数组中的元素个数是否达到了扩容阈值
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
说明:
- 首先将table[]成员变量局部化(在后续的操作中如果会对table成员变量进行多次操作,局部化会提高性能)
- 获取数组长度,计算直接索引
- 然后进行hash冲突检测与处理(线性探测法)
- 获取直接索引位置的Entry,如果不为null,表示发生了hash冲突,则先获取Entry中的key,如果该key与当前的ThreadLocal是同一个对象,则直接替换value,返回;
- 如果Entry中的key==null,表示当前的Entry已经是一个无效的Entry了,执行replaceStaleEntry方法进行处理(如果从该Entry后的第一个元素开始在一个连续的Entry子数组内找到一个key与当前ThreadLocal相等的元素,则替换值,然后互换该Entry和当前的key==null的Entry位置;如果没找到,则将当前的key==null的Entry设置为新的将设置的Entry)返回;
- 如果发生了hash冲突,但是直接索引位置上的Entry即不是当前的ThreadLocal的Entry,也不是一个无效的Entry,则需要向后循环遍历下一个Entry元素,再进行上述逻辑的处理。直到找到一个Entry为null的位置或者Entry失效的位置或者Entry就是当前的ThreadLocal的Entry的位置。
- 假设没有发生hash冲突或者经过上述的for循环找到了一个Entry为null的位置的时候,在该位置创建设置新添加的Entry,数组元素+1;
- 尝试回收部分无效的Entry,如果成功,则当前的数组中的元素个数将小于阈值,则肯定不需要rehash();如果失败,且数组元素个数 > threshold,执行rehash() -- 即执行部分资源清理与可能的扩容操作。
replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot)
/**
* 对一段连续的Entry子数组进行操作
* Replace a stale entry encountered during a set operation
* with an entry for the specified key.
*/
private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) {
// 获取当前的table及其容量
Entry[] tab = table;
int len = tab.length;
Entry e;
// 设置需要被回收的槽索引(无效的Entry索引)
int slotToExpunge = staleSlot;
/**
* 向前遍历,直到找到第一个为null的槽
* 在遍历的过程中,如果找到了无效的Entry,则将slotToExpunge设置为当前的被回收的无效索引
*/
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;
/**
* 向后遍历,直到找到第一个为null的槽或者找到与当前Entry key相等的槽
*/
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// The newly stale slot, or any other stale slot
// encountered above it, can then be sent to expungeStaleEntry
// to remove or rehash all of the other entries in run.
// 如果key相等,直接替换
if (k == key) {
// 替换值
e.value = value;
/**
* 互换 tab[i] 和 tab[staleSlot],将无效的tab[staleSlot]中的Entry后移;
* 将正确的 tab[i]中的Entry前移,方便下一次的查找(可以根据直接槽索引查找到了)
*/
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
// 如果待回收的槽索引就是当前的无效索引,则设置待回收的槽索引为i
if (slotToExpunge == staleSlot)
slotToExpunge = i;
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
// 如果 "k==null && 待回收的槽索引就是当前的无效索引",则设置待回收的槽索引为i
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}
// 置空原本的entry的value,将当前的Entry添加到这个槽上,原本的Entry等待GC
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);
// If there are any other stale entries in run, expunge them
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
步骤:
- 首先从当前索引staleSlot的前一个元素开始向前遍历一个连续的Entry子数组,一直找到该连续子数组最靠前的那个无效索引,设置为slotToExpunge
- 然后从staleSlot的 后一个元素开始 向后遍历一个连续的Entry子数组
- 如果找到的Entry的key是当前的ThreadLocal,则直接替换value,之后将找到的Entry放置到table[staleSlot]处,也就是Entry本身的直接索引处,而Entry本身的真实索引的位置放置为原来的table[staleSlot];然后进行一次连续段的回收,之后进行一次log次数级别的回收,最后返回
- 如果经过循环,没有找到key与ThreadLocal相等的Entry,则直接将要set的Entry放置到table[staleSlot]处,最后根据需要进行一次连续段的回收,之后在进行一次log次数级别的回收
关于回收与扩容相关的操作,后续分析。
3.7 从ThreadLocalMap中获取Entry
private Entry getEntry(ThreadLocal<?> key) {
// 计算所查找的Entry在table[]中的索引
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
// 如果快速查找到的Entry!=null并且key也相等,则直接返回(也就是说在直接的hash槽找到了Entry)
if (e != null && e.get() == key)
return e;
//
else
return getEntryAfterMiss(key, i, e);
}
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
/**
* 循环遍历,直到找到下一个不为null的Entry
*/
while (e != null) {
ThreadLocal<?> k = e.get();
// 如果key相等,表示找到了,直接返回
if (k == key)
return e;
// 如果key==null,则尝试回收或者整理"i到i之后的第一个Entry为null的索引(即返回值)之间"的Entry数据
if (k == null)
expungeStaleEntry(i);
// 继续向后遍历
else
i = nextIndex(i, len);
e = tab[i];
}
// 如果没找到,返回null
return null;
}
步骤:
- 计算当前的ThreadLocal的直接索引,获取直接索引位置的Entry
- 如果该位置的Entry不为null且Entry的key与当前的ThreadLocal是同一个元素,则直接返回该Entry
- 如果该位置的Entry为null,直接返回null(看getEntryAfterMiss逻辑)
- 如果该位置的Entry不为null,但是Entry的key与当前的ThreadLocal不是同一个元素,则表明发生了hash冲突。此时,会不断的向后循环寻找,直到找到了要查找的Entry或者遍历到的Entry为null,如果找到了,返回Entry,如果没找到,返回null。
注意:
- 在没有根据直接索引找到Entry后的while循环中,如果检测到无效的Entry时,会使用expungeStaleEntry方法尝试回收或者整理"i到i之后的一段连续的不为null的索引(即返回值)之间"的Entry数据。
- 对于get来讲,线性探测法是以Entry!=null为循环查找结束条件的,所以要注意remove时的操作。
3.8 从ThreadLocalMap中删除Entry
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len - 1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
// key - ThreadLocal置为null
e.clear();
// 尝试回收或者整理"i到i之后的一段连续的不为null的索引(即返回值)之间"的Entry数据
expungeStaleEntry(i);
return;
}
}
}
步骤:
- 计算key的直接索引,从直接索引的Entry开始,依旧在一段连续的不为null的Entry进行循环,如果找到的Entry的key与当前的key相同,则进行删除操作:
- 首先将Entry的key置空(e.clear())
- 尝试回收或者整理"i到i之后的一段连续的不为null的索引(即返回值)之间"的Entry数据
注意:
- 这里的expungeStaleEntry中的整理逻辑非常重要,直接影响get查询的查询链是否会断掉的问题。
3.9 rehash()与resize()
private void rehash() {
// 回收table中所有无效的Entry,如果可以有效减小size到不满足下边的扩容条件,则下边的resize()不需要执行
expungeStaleEntries();
// 实际上扩容是发生在size>=threshold*3/4==length*(2/3)*(3/4)==lenth*1/2
if (size >= threshold - threshold / 4)
resize();
}
/**
* 扩容两倍数组,之后rehash
*/
private void resize() {
// 获取旧数组及其长度
Entry[] oldTab = table;
int oldLen = oldTab.length;
// 新数组扩容2倍
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
int count = 0;
for (int j = 0; j < oldLen; ++j) {
Entry e = oldTab[j];
if (e != null) {
ThreadLocal<?> k = e.get();
// 无效的Entry
if (k == null) {
e.value = null; // Help the GC
} else {
int h = k.threadLocalHashCode & (newLen - 1);
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}
// 设置新的阈值
setThreshold(newLen);
size = count;
table = newTab;
}
rehash()仅用在ThreadLocalMap#set(ThreadLocal<?> key, Object value)
的末尾部分。
rehash()步骤:
- 首先进行一次全表回收无效的Entry操作,如果可以有效减小数组中的元素个数size到数组长度的一半以下,则不需要进行扩容,如果达到一半及以上,则进行扩容操作。
resize()步骤:
- 创建新的Entry[]是旧的长度的两倍;之后遍历旧数组,之后根据新的数组的长度重新计算每个有效Entry的直接索引,根据线性探测法重新填充数组。
- 最后设置新的Entry[]的阈值threshold、数组元素个数size,将新数组赋值给全局变量table
四、回收机制
ThreadLocal 回收机制 就是如下的四个函数以及WeakReference<ThreadLocal<?>>
1、expungeStaleEntry(int staleSlot)
从staleSlot开始的连续Entry子数组中的无效Entry的清理和 整理:
2、expungeStaleEntries()
清理整个table中的无效Entry
3、cleanSomeSlots(int i, int n)
进行log2(n)次循环(如果正好遍历到无效的Entry,则n=table.length,调用expungeStaleEntry进行连续段的清理和整理);
是一种介于完全不扫描和完全扫描(expungeStaleEntries)之间的一种扫描方式
4、replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot)
该函数已在set部分分析:从table[staleSlot]后的第一个元素开始在一个连续的Entry子数组内如果找到一个key与当前ThreadLocal相等的Entry,则替换值,然后互换该Entry和table[staleSlot](①),返回;如果没找到,则将table[staleSlot]直接设置为新的将设置的Entry(②)
①②处都会对该Entry所在的连续段,从连续段的第一个无效Entry(可能在当前的Entry之前)开始或者从该Entry的后一个元素开始(表示该Entry之前的元素全部有效)进行一次连续段的 回收和整理,之后在进行一次log级别的回收;
4.1 expungeStaleEntry(int staleSlot)
/**
* 尝试回收或者整理"staleSlot到staleSlot之后的第一个Entry为null的索引(即返回值)之间"的Entry数据
*
* @return the index of the next null slot after staleSlot
* (all between staleSlot and this slot will have been checked
* for expunging).
*/
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
// 回收table[staleSlot]处的值,value置为null,整体置为null
tab[staleSlot].value = null;
tab[staleSlot] = null;
// size-1
size--;
// Rehash until we encounter null
Entry e;
int i;
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// 如果当前的table[i] Entry无用,则直接回收
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
/**
* 问题:
* 假设ThreadLocalMap的table长度为4的,其中table[1]、table[2]已经有值,而且key都不为null,
* 这时候set一个threadlocal1,算出来的索引为1,根据线性探测法,最后会设置到table[3];
* 然后,table[2]被remove,此时table[2]==null;
* 然后,get threadlocal1,此时算出索引还是1, 从table[1]向后查找,结果table[2]==null,则查找链就断了,结果返回null,
* 而没有办法查找到真正的table[3],这种情况threadlocal是怎么处理的呢?
*
* 答案就是下边的代码:会对remove元素的后边的元素循环遍历,直到遍历到非null为止。对每个元素做如下逻辑:
* 如果当前元素的k==null,则回收,如果不是无效元素,则计算其真实的直接索引,例如重新计算table[3]的索引,此处为1,与当前遍历的索引3不同,
* 所以将table[3]先置空,然后从table[1]开始向后寻找第一个为null的位置,重新放置table[3]的元素,此处就是table[2]=table[3]
*
* 简言之:remove一个Entry之后,会整理remove的Entry元素之后的一块儿连续的(中间没有null的entry)元素子数组,全部使用线性探测法重新计算,
* 这样remove掉的那一块儿就会被后续的Entry重新填充,查询链就不会断
*/
// 计算真实的直接索引位置,因为可能k是因为hash冲突循环后移的
int h = k.threadLocalHashCode & (len - 1);
// 如果h和i,不相等,表示确实k是循环后移了的
if (h != i) {
// 首先将之前的Entry置为null
tab[i] = null;
// 一直向后循环找到第一个为null的索引,之后table[h] = e(将当前的Entry放置到table[h]新的hash位置上)
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
}
步骤:
- 将table[staleSlot]的value和Entry都置为null,元素个数size - 1
- 遍历staleSlot之后的一段连续的Entry,如果Entry的key==null,将该Entry的value和Entry本身置空,如果key是有效的,将要进行 整理操作
- 计算直接索引的位置,如果当前的Entry的真实索引与直接索引不相等,则表示该Entry之前是因为hash冲突被冲到后边的slot里的,此处会重新从直接索引位置开始向后找到第一个不为null的的位置,将当前的Entry元素放置到这里,这样会保证原本连续的Entry子数组,在从中删除一个Entry后,子数组中剩下的元素还可以组成一个连续的Entry数组,这保证了get操作的查询链不会断。
注意:
- remove一个元素后,会调用这里的 整理操作 进行连续Entry子数组的整理,不会使查询链断掉。要仔细的看注释中的那个例子。
- 整理的过程相当于一次从staleSlot开始的连续段 内存碎片整理,将无效的Entry删除,有效的Entry按照原本的顺序聚集。
4.2 expungeStaleEntries()
/**
* 回收table中所有的无效Entry.
*/
private void expungeStaleEntries() {
Entry[] tab = table;
int len = tab.length;
for (int j = 0; j < len; j++) {
Entry e = tab[j];
if (e != null && e.get() == null)
expungeStaleEntry(j);
}
}
遍历table中所有的元素,发现无效的Entry后,就调用expungeStaleEntry(int staleSlot)进行连续子数组的资源清理与整理。该方法影响较大,仅仅用在rehash()中。
4.3 cleanSomeSlots(int i, int n)
/**
* 对数级别的扫描,介于 "不扫描(fast but retains garbage)" 和 "完全扫描(find all garbage but would cause some insertions to take O(n) time)" 之间
*
* @param i a position known NOT to hold a stale entry. The
* scan starts at the element after i.
* @param n scan control: {@code log2(n)} cells are scanned,
* unless a stale entry is found, in which case
* {@code log2(table.length)-1} additional cells are scanned.
* When called from insertions, this parameter is the number
* of elements, but when from replaceStaleEntry, it is the
* table length. (Note: all this could be changed to be either
* more or less aggressive by weighting n instead of just
* using straight log n. But this version is simple, fast, and
* seems to work well.)
* @return true if any stale entries have been removed.
*/
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
// 向后循环获取下一个索引,如果已经到了末尾,则返回table[0]
i = nextIndex(i, len);
Entry e = tab[i];
// 如果此Entry已经无效了,key-ThreadLocal已经被回收掉了,无法再通过该key获取Entry,如果不回收此Entry,则会造成内存泄露
if (e != null && e.get() == null) {
n = len;
removed = true;
// 尝试回收或者rehash"i到i之后的第一个Entry为null的索引(即返回值)之间"的Entry数据
i = expungeStaleEntry(i);
}
} while ((n >>>= 1) != 0); // n / 2,假设n=16,当然set的时候n=size,则log2(16)=4,也就是说需要循环4次(也可以直接理解为16/2/2/2/2=1,第5次1/2==0则不再进行循环)
return removed;
}
该方法只扫描一部分Entry,进行log2(n)次循环,如果恰好扫描到无效的Entry,则n=table.length,即进行log2(table.length)次循环。每个循环的步骤:
- 获取下一个索引位置i及该位置上的Entry
- 如果该Entry==null或者Entry依然有效,则进行下一次循环;
- 如果该Entry!=null且无效,则先将n设置为长度(在set方法中n本来是)调用expungeStaleEntry(i)回收或者 整理 i之后的一段连续的Entry,之后再进行一下轮的回收和整理操作。
4.4 弱引用WeakReference
由于Entry继承了弱引用,他的key是referent,当发生gc时,如果该key已经不可达了,则直接回收(key==null),之后在如下操作时删除key==null的Entry。
- get时调用expungeStaleEntry
- set时调用replaceStaleEntry或者cleanSomeSlots或者expungeStaleEntries
- remove时调用expungeStaleEntry
五、总结
5.1 set整个流程
- 首先将table[]成员变量局部化(在后续的操作中如果会对table成员变量进行多次操作,局部化会提高性能)
- 获取数组长度,计算直接索引
- 然后进行hash冲突检测与处理(线性探测法)
- 获取直接索引位置上的Entry,如果不为null,表示发生了hash冲突,则先获取Entry中的key,如果该key与当前的ThreadLocal是同一个对象,则直接替换value,返回;
- 如果Entry中的key==null,表示当前的Entry已经是一个无效的Entry了,执行replaceStaleEntry方法进行处理(replaceStaleEntry:如果从该Entry后的第一个元素开始在一个连续的Entry子数组内找到一个key与当前ThreadLocal相等的元素,则替换值,然后互换该Entry和当前的key==null的Entry位置;如果没找到,则将当前的key==null的Entry设置为新的将设置的Entry)返回;
- 如果发生了hash冲突,但是直接索引位置上的Entry即不是当前的ThreadLocal的Entry,也不是一个无效的Entry,则需要向后循环遍历下一个Entry元素,再进行上述逻辑的处理。直到找到一个Entry为null的位置或者Entry失效的位置或者Entry就是当前的ThreadLocal的Entry的位置。
- 假设没有发生hash冲突或者经过上述的for循环找到了一个Entry为null的位置的时候,在该位置创建设置新添加的Entry,数组元素+1;
- 尝试log级别的回收部分无效的Entry,如果成功,则当前的数组中的元素个数将小于阈值,则肯定不需要rehash();如果失败,且数组元素个数 >= threshold,执行rehash()
- 首先对整个table回收无效的Entry操作,如果可以有效减小数组中的元素个数size到数组长度的一半以下,则不需要进行扩容,如果size达到length的一半及以上,则进行扩容操作
- 扩容:创建新的Entry[]是旧的长度的两倍;之后遍历旧数组,根据新的数组的长度重新计算每个有效Entry的直接索引,根据线性探测法重新填充数组。
最后设置rehash()阈值、数组元素个数size,将新数组赋值给全局变量table。
5.2 get整个流程
- 获取当前线程
- 获取当前线程的ThreadLocalMap属性
- 如果当前线程的ThreadLocalMap对象不为null,从ThreadLocalMap对象中获取key为当前的ThreadLocal的Entry
- 计算当前的ThreadLocal的直接索引,获取直接索引位置的Entry
- 如果该位置的Entry不为null且Entry的key与当前的ThreadLocal是同一个元素,则直接返回该Entry
- 如果该位置的Entry为null,直接返回null(看getEntryAfterMiss逻辑)
- 如果该位置的Entry不为null,但是Entry的key与当前的ThreadLocal不是同一个元素,则表明发生了hash冲突。此时,会不断的向后循环寻找,直到找到了要查找的Entry或者遍历到的Entry为null,如果找到了,返回Entry,如果没找到,返回null。
- 如果该Entry不为null,直接获取entry的value,返回即可
- 如果当前线程的ThreadLocalMap为null(还未createMap(Thread t, T firstValue))或者key为当前的ThreadLocal的Entry为null(没有调用map.set(this, value)),则执行设置初始化值的方法。
5.3 各类回收的时机
- set或者setInitialValue
- 如果从直接索引开始向后遍历的一段连续Entry段内发生hash冲突且冲突的Entry已经是无效的,此时会执行replaceStaleEntry,会执行回收和整理操作
- 如果没有发生hash冲突 或者 发生了hash冲突但是在从直接索引开始的一段连续Entry段内没有找到与设置的key相等的Entry或者无效的Entry,如果直接索引所在的整个连续Entry段如果有无效Entry,则在set的尾部都会执行一次log级别的回收操作,如果回收不理想,需要进行rehash()
- rehash()会进行一次全table的回收
- get
- 在通过直接索引没有get到数据的时候,会循环遍历一段连续的Entry,如果遍历到的Entry是无效的,则进行一次连续Entry子数组的回收和整理
- remove
- 当找到需要被删除的Entry时,进行一次连续Entry子数组的回收和整理
- gc时
- 由于Entry继承了弱引用,他的key是referent,当发生gc时,如果该key已经无引用指向了,则直接回收(key==null了),之后再根据上述1,2,3的操作删除key==null的Entry。(这也是为什么ThreadLocal没有为Entry绑定ReferenceQueue的原因,因为Entry的删除已经可以发生在1,2,3中了;但是WeakHashMap不一样,WeakHashMap的Entry继承于WeakReference,其key是referent,当发生gc时,如果key不可达了,则回收key,之后将key对应的Entry放入queue,在后续对WeakHashMap的操作 - getTable()/size()/resize(int newCapacity)的时候进行Entry的回收)
5.4 ThreadLocal与线程池
线程池中的线程由于会被复用,所以线程池中的每一条线程在执行task结束后,要清理掉其ThreadLocalMap及其中的各个Entry,否则,当这条线程在下一次被复用的时候,其ThreadLocalMap信息还存储着上一次被使用的时的信息;另外,假设这条线程不再被使用,但是这个线程有可能不会被销毁(与线程池的类型和配置相关),那么其上的ThreadLocal将发生了资源泄露。所以netty的io线程池使用FastThreadLocalRunnable wrap了Runnable任务,当任务执行结束后,会做InternalThreadLocalMap和FastThreadLocal的清理操作。
网友评论