关于ThreadLocal以及InheritedThreadLocal基本原理的介绍已经非常多,但是感觉threadlocal设计的精髓还是在于ThreadLocalMap,所以这里详细分析一下ThreadLocalMap的实现。
成员变量
整体的成员变量和HashMap差不多,主要不同就是扩容阈值是2/3而非0.75,Entry的key是WeakReference<ThreadLocal<?>>。
//初始容量16
private static final int INITIAL_CAPACITY = 16;
//Entry数组
private Entry[] table;
//容量
private int size = 0;
//扩容阈值
private int threshold; // Default to 0
//扩容阈值固定为Entry数组长度的2/3,比HashMap要低,因为ThreadLocalMap使用开放定址-线性探测法处理hash冲突,Entry占用更快
private void setThreshold(int len) {
threshold = len * 2 / 3;
}
初始化
//首次调用ThreadLocal的set方法时,ThreadLocalMap会懒加载,存入一个元素,初始容量置1
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}
//Thread初始化,InheritedThreadLocal不为空时,会调用该构造函数创建子线程InheritedThreadLocalMap
private ThreadLocalMap(ThreadLocalMap parentMap) {
Entry[] parentTable = parentMap.table;
int len = parentTable.length;
setThreshold(len);
table = new Entry[len];
for (int j = 0; j < len; j++) {
Entry e = parentTable[j];
if (e != null) {
@SuppressWarnings("unchecked")
ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
if (key != null) {
Object value = key.childValue(e.value);
Entry c = new Entry(key, value);
int h = key.threadLocalHashCode & (len - 1);
while (table[h] != null)
h = nextIndex(h, len);
table[h] = c;
size++;
}
}
}
}
查询、插入、删除
和HashMap的实现有很大不同,hashkey冲突时采用线性地址法而非链地址法,并且在所有可能的地方都做了过期Entry的回收工作,并对因为hash冲突而偏移的Entry进行整理。
//这里分两种情况处理,一种是e不为空且key相等,直接返回结果,另一种调用getEntryAfterMiss
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);
}
//如果在hash槽位没有找到该节点,说名不存在或者冲突后偏移了,因此需要向后顺序搜索,知道出现空槽位为止(空槽为说明后面不可能再存在了)。同时还会顺便做Entry的清理工作
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
while (e != null) {
ThreadLocal<?> k = e.get();
if (k == key)
return e;
if (k == null)
expungeStaleEntry(i);
else
i = nextIndex(i, len);
e = tab[i];
}
return null;
}
//从hashcode位置开始寻找空位并调用replaceStaleEntry插入,或者遇到相同key则更新并直接return
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
//nextIndex向后唤醒搜索数组
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
if (k == key) {
e.value = value;
return;
}
//如果key为null,就用新key、value覆盖,同时清理旧数据
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
//如果执行到这里,说明i位置是空的,并且需要直接插入
tab[i] = new Entry(key, value);
int sz = ++size;
//进行清理,如果没有清理出空间,就判断是否需要扩容
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;
//由于GC会批量回收无效引用,所以set()的循环中发现一个过期槽位,就意味着这个key前面也可能出现了新的过期槽位,所以向前搜索并记录可能存在的可用槽位。这样可以增加空槽位的利用率,从而避免频繁触发rehash。
int slotToExpunge = staleSlot;
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;
//向后搜索,直到null Entry或者有重复key
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
//如果发现重复key,就和过期槽位做替换,从而维持hashtable的顺序性(每个entry离散列位置尽可能更近)
if (k == key) {
e.value = value;
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
// 从i位置或者slotToExpunge位置进行批量清理
if (slotToExpunge == staleSlot)
slotToExpunge = i;
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
// 如果既没有向前搜索到过期槽位,也没有向后搜索到重复key
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}
// 如果没有找到重复key,就直接替换过期键
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);
// 如果有其它过期键,就进行批量清理操作
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
//移除操作,计算hashcode,然后从该位置向后遍历直到遇到null槽位,逐个比较key值是否匹配,如果匹配就调用Entry的clear方法,并从散列位置清理旧数据
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
e.clear();
expungeStaleEntry(i);
return;
}
}
}
值得注意的是这里调用了key.threadLocalHashCode获取了ThreadLocal对象的hashcode,而其hashcode是通过静态AtomicInteger以HASH_INCREMENT步长递增生成的的。
private final int threadLocalHashCode = nextHashCode();
private static AtomicInteger nextHashCode =
new AtomicInteger();
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
实际上这里使用的是斐波那契散列法,这是一种乘数散列法,只不过这个乘数比较特殊,是32位整型上限2^32-1乘以黄金分割比例0.618....的值2654435769,用有符号整型表示就是-1640531527,去掉符号后16进制表示为0x61c88647。使用0x61c88647为步长进行散列,结果会更均匀。
清理
清理过程是基于热度衰减的方式,从指定位置向后每搜索一位热度减半,发现过期键则重置热度,直到热度归0结束清理。Redis扫描回收过期键时也与此比较类似,不过是按比例进行采样回收,直到采样中过期键比例小于某个阈值。
//以i为起点向后搜索log2(n)位,如果发现过期Entry则置n为len。换个说法就是先做小范围扫描,如果发现过期Entry就做大范围扫描,直到大范围搜索也找不到过期Entry为止
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
i = nextIndex(i, len);
Entry e = tab[i];
if (e != null && e.get() == null) {
n = len;
removed = true;
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);
return removed;
}
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
//回收staleSlot槽位
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
//对后续位置rehash,直到遇到null槽位
Entry e;
int i;
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
//如果可以清理,就清理
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
//计算hashSlot位置
int h = k.threadLocalHashCode & (len - 1);
//如果entry没有在hashSlot位置,则从hashSlot开始搜索空槽位并插入(相当于整理,避免冲突后entry偏离原位置过远)
if (h != i) {
tab[i] = null;
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
return i;
}
扩容
和HashMap逻辑基本一致,扩容一倍并重新计算Hashslot
private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
int count = 0;
for (int j = 0; j < oldLen; ++j) {
Entry e = oldTab[j];
if (e != null) {
ThreadLocal<?> k = e.get();
//Entry过期,直接清理
if (k == null) {
e.value = null;
} else {
int h = k.threadLocalHashCode & (newLen - 1);
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}
setThreshold(newLen);
size = count;
table = newTab;
}
网友评论