FastThreadLocal的引入背景和原理简介
既然jdk已经有ThreadLocal,为何netty还要自己造个FastThreadLocal?FastThreadLocal快在哪里?
这需要从jdk ThreadLocal的本身说起。如下图:
jdk ThreadLocal
在java线程中,每个线程都有一个ThreadLocalMap实例变量(如果不使用ThreadLocal,不会创建这个Map,一个线程第一次访问某个ThreadLocal变量时,才会创建)。该Map是使用线性探测的方式解决hash冲突的问题,如果没有找到空闲的slot,就不断往后尝试,直到找到一个空闲的位置,插入entry,这种方式在经常遇到hash冲突时,影响效率。
FastThreadLocal(下文简称ftl)直接使用数组避免了hash冲突的发生,具体做法是:每一个FastThreadLocal实例创建时,分配一个下标index;分配index使用AtomicInteger实现,每个FastThreadLocal都能获取到一个不重复的下标。当调用ftl.get()方法获取值时,直接从数组获取返回,如return array[index],如下图:
netty FastThreadLocal
FastThreadLocalThread
在Netty中,要使用 FastThreadLocal 实现线程本地变量需要将线程包装成 FastThreadLocalThread ,如果不是 FastThreadLocalThread ,会使用 slowThreadLocalMap的 ThreadLocal 来存储变量副本。
================io.netty.util.concurrent.DefaultThreadFactory =============
@Override
public Thread newThread(Runnable r) {
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
// 一般daemon为false,意思是不设置为守护线程
if (t.isDaemon() != daemon) {
t.setDaemon(daemon);
}
// 优先级 默认为5
if (t.getPriority() != priority) {
t.setPriority(priority);
}
return t;
}
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
FastThreadLocalThread 继承自Thread类,有如下成员变量:
===============io.netty.util.concurrent.FastThreadLocalThread============
// 任务执行完,是否清除FastThreadLocal的标记
private final boolean cleanupFastThreadLocals;
// 类似于Thread类中ThreadLocalMap,为了实现FastThreadLocal
private InternalThreadLocalMap threadLocalMap;
InternalThreadLocalMap分析
ftl的实现,涉及到InternalThreadLocalMap。InternalThreadLocalMap 类的继承关系图如下:
在这里插入图片描述
UnpaddedInternalThreadLocalMap的主要属性
static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
static final AtomicInteger nextIndex = new AtomicInteger();
Object[] indexedVariables;
数组 indexedVariables 就是用来存储 ftl 的 value 的,使用下标的方式直接访问。nextIndex在ftl实例创建时用来给每个ftl实例分配一个下标,slowThreadLocalMap在线程不是 FastThreadLocalThread 时使用到。
InternalThreadLocalMap 底层数据结构
InternalThreadLocalMap中指定了 indexedVariables 的初始化的值UNSET。
public static final Object UNSET = new Object();// 用于标识数组的槽位还未使用
private BitSet cleanerFlags;//标记indexedVariables 索引的值是否被
注意:
有些情况下,我们自己创建的线程并没有使用FastThreadLocalThread,为了适配这种情况,Netty通过Thread原生的ThreadLocal来保存InternalThreadLocalMap。关系图如下:
netty
即Thread中的属性threadLocalMap中key是上面定义的:ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap
,注意这是一个静态变量。
如果使用了FastThreadLocalThread,那么存储数据的结构直接使用Netty定义的InternalThreadLocalMap,通过Object[] indexedVariables保存key(FastThreadLocal)和value。nextIndex是一个自增的数组下表。JDK的ThreadLocal是通过Hash计算到下表,遇到冲突通过线性探测法解决。Netty确定了每一个key对应的value在数组中的下标,因此不会有冲突发生。
Netty
下面看一下它的构造方法
private InternalThreadLocalMap() {
super(newIndexedVariableTable());
}
private static Object[] newIndexedVariableTable() {
// 创建数据,填充Object对象
Object[] array = new Object[32];
Arrays.fill(array, UNSET);
return array;
}
FastThreadLocal
FastThreadLocal 的主要属性是 index,作为 InternalThreadLocalMap 的 indexedVariables 数组的下标,在创建时有 InternalThreadLocalMap的 nextIndex 原子类生成一个全局唯一的递增下标。
==============================FastThreadLocal================================
// 常量0,存放FastThreadLocal集合的下标
private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
// 每一个FastThreadLocal对应的数组的下标
private final int index;
public FastThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
}
set方法
set方法的目的是以当前FastThreadLocal为key,设置value到当前FastThreadLocalThread类型线程的 InternalThreadLocalMap实例中。
==============================FastThreadLocal================================
public final void set(V value) {
// UNSET为空的Object对象,不允许为空
if (value != InternalThreadLocalMap.UNSET) {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
setKnownNotUnset(threadLocalMap, value);
} else {
remove();
}
}
InternalThreadLocalMap.get:
==============InternalThreadLocalMap====================================
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();//获取当前
if (thread instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) thread);
} else {
return slowGet();
}
}
判断当前线程的类型,如果是FastThreadLocalThread类型,则使用该类内部属性InternalThreadLocalMap,否则使用原生Thread对象中的ThreadLocalMap属性来,然后在此map中保存的key为JDK中的ThreadLocal对象,value为InternalThreadLocalMap(Netty中的类)。
fastGet:
获取InternalThreadLocalMa,如果不存在那么新创建一个:
==============InternalThreadLocalMap====================================
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
// 设置到FastThreadLocalThread对象内部的属性中
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}
slowGet:
获取当前线程ThreadLocaMap中key为slowThreadLocalMap(定义声明在UnpaddedInternalThreadLocalMap的常量),value是InternalThreadLocalMap。
==============InternalThreadLocalMap====================================
private static InternalThreadLocalMap slowGet() {
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
InternalThreadLocalMap ret = slowThreadLocalMap.get();
if (ret == null) {
ret = new InternalThreadLocalMap();
slowThreadLocalMap.set(ret);
}
return ret;
}
setKnownNotUnset:
当 InternalThreadLocalMap.get() 返回了 一个 InternalThreadLocalMap,接下来就可以保存数据了。
=======================FastThreadLocal==================================
private boolean setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
// 设置value
if (threadLocalMap.setIndexedVariable(index, value)) {
//将key(FastThreadLoca)添加到数组的第一个元素(Set集合)中
addToVariablesToRemove(threadLocalMap, this);
return true;
}
return false;
}
setIndexedVariable:
======================InternalThreadLocalMap==============================
public boolean setIndexedVariable(int index, Object value) {
// indexedVariables是构造函数中创建的Object数组
Object[] lookup = indexedVariables;
// index是从FastThreadLcal中传入的值,初始为1(自增),因为0被静态变量variablesToRemoveIndex抢占了
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
return oldValue == UNSET;
} else {
expandIndexedVariableTableAndSet(index, value);
return true;
}
}
整体逻辑比较直观,如果index没有越界,那么进行覆盖,如果old value为 UNSET,那么返回true,否则返回false。如果越界则进行扩容并返回true。
注意: 只有一种情况会返回fasle,即某个元素已经被赋值过了。那么返回false直接影响的是addToVariablesToRemove方法不会执行。即key不会再次被添加到Set集合中。因为已经被添加过了。
expandIndexedVariableTableAndSet:
======================InternalThreadLocalMap===========================
private void expandIndexedVariableTableAndSet(int index, Object value) {
Object[] oldArray = indexedVariables;
final int oldCapacity = oldArray.length;
int newCapacity = index;
newCapacity |= newCapacity >>> 1;
newCapacity |= newCapacity >>> 2;
newCapacity |= newCapacity >>> 4;
newCapacity |= newCapacity >>> 8;
newCapacity |= newCapacity >>> 16;
newCapacity ++;
Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
newArray[index] = value;
indexedVariables = newArray;
}
这段代码的作用就是按原来的容量扩容2倍。并且保证结果是2的幂次方。JDK中HashMap的扩容也类似。扩容完成之后填充UNSET,并且将保存value。至此保存value的操作已经完毕。
addToVariablesToRemove:
=========================InternalThreadLocalMap===========================
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
// 获取到下表为0的数组元素
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
Set<FastThreadLocal<?>> variablesToRemove;
// 如果未设置过或者为null
if (v == InternalThreadLocalMap.UNSET || v == null) {
// 创建一个IdentityHashMap的实现,IdentityHashMap和HashMap类似,只是key的相同判断方式以 ‘==’来决定
variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
// 将这个 Set 放到这个 Map 数组的下标 0 处
threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
} else {
// 如果拿到的不是 UNSET ,说明这是第二次操作了,因此可以强转为 Set
variablesToRemove = (Set<FastThreadLocal<?>>) v;
}
// 将 FastThreadLocal 放置到 Set 中
variablesToRemove.add(variable);
}
这个方法的目的是将一条线程的所有FastThreadLocal对象保存到一个 Set 中,静态方法 removeAll 就需要使用到这个 Set,可以快速的删除线程 Map 里的所有 FTL 对应的 Value。 如果不使用 Set,那么就需要遍历 InternalThreadLocalMap。
get方法
get方法极为简单,实现如下:
===========================FastThreadLocal==========================
public final V get() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
return initialize(threadLocalMap);
}
首先获取当前线程的map,然后根据 FastThreadLocal的index 获取value,然后返回,如果是空对象,则通过 initialize 返回,initialize 方法会将返回值设置到 map 的槽位中,并放进 Set 中。
initialize
============================FastThreadLocal==========================
private V initialize(InternalThreadLocalMap threadLocalMap) {
V v = null;
try {
//1、获取初始值
v = initialValue();
} catch (Exception e) {
throw new RuntimeException(e);
}
// 2、设置value到InternalThreadLocalMap中
threadLocalMap.setIndexedVariables(index, v);
// 3、添加当前的FastThreadLocal到InternalThreadLocalMap的Set<FastThreadLocal<?>>中
addToVariablesToRemove(threadLocalMap, this);
return v;
}
//初始化参数:由子类复写
protected V initialValue() throws Exception {
return null;
}
ftl的资源回收机制
netty中ftl的两种回收机制回收机制:
-
自动:使用ftlt执行一个被FastThreadLocalRunnable wrap的Runnable任务,在任务执行完毕后会自动进行ftl的清理。
-
手动:ftl和InternalThreadLocalMap都提供了remove方法,在合适的时候用户可以(有的时候也是必须,例如普通线程的线程池使用ftl)手动进行调用,进行显示删除。
FastThreadLocalRunnable
final class FastThreadLocalRunnable implements Runnable {
private final Runnable runnable;
@Override
public void run() {
try {
runnable.run();
} finally {
FastThreadLocal.removeAll();
}
}
static Runnable wrap(Runnable runnable) {
return runnable instanceof FastThreadLocalRunnable
? runnable : new FastThreadLocalRunnable(runnable);
}
}
如果将线程执行的任务包装成 FastThreadLocalRunnable,那么在任务执行完后自动删除ftl的资源。
===============================FastThreadLocal===========================
public static void removeAll() {
// 获取到map
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
if (threadLocalMap == null) {
return;
}
try {
// 获取到Set<FastThreadLocal>集合
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v != null && v != InternalThreadLocalMap.UNSET) {
@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
// 将Set转换为数组
FastThreadLocal<?>[] variablesToRemoveArray =
variablesToRemove.toArray(new FastThreadLocal[variablesToRemove.size()]);
// 遍历数组,删除每一个FastThreadLocal对应的value
for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
tlv.remove(threadLocalMap);
}
}
} finally {
// 删除当前线程的InternalThreadLocalMap
InternalThreadLocalMap.remove();
}
}
public static void remove() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
// 将FastThreadLocalThread 内部的map置位null
((FastThreadLocalThread) thread).setThreadLocalMap(null);
} else {
// 将 ThreadLocal内部ThreadLocalMap 中的value置位null
slowThreadLocalMap.remove();
}
}
remove方法:
===============================FastThreadLocal==========================
private void remove() {
remove(InternalThreadLocalMap.getIfSet());
}
private void remove(InternalThreadLocalMap threadLocalMap) {
if (threadLocalMap == null) {
return;
}
// 从 InternalThreadLocalMap 中删除当前的FastThreadLocal对应的value并设UNSET
Object v = threadLocalMap.removeIndexedVariable(index);
// 从 InternalThreadLocalMap 中的Set<FastThreadLocal<?>>中删除当前的FastThreadLocal对象
removeFromVariablesToRemove(threadLocalMap, this);
// 如果删除的是有效值,则进行onRemove方法的回调
if (v != InternalThreadLocalMap.UNSET) {
try {
// 回调子类复写的onRemoved方法,默认为空实现
onRemoved((V) v);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
网友评论