Netty中的FastThreadLocal
版本:4.1.23
大家都应该接触过Jdk的ThreadLocal,它使用每个Thread中的ThreadLocalMap存储ThreadLocal,ThreadLocalMap内部使用ThreadLocalMap.Entry 数组存储每一个ThreadLocal,存储计算和HashMap类似,要计算key的索引位置=key.threadLocalHashCode&(len-1),中间还需要计算冲突,使用的是线程探测方法(当前索引在被占用下,使用下一个索引)。达到一定条件后,还需扩充数组长度,rehash,可为效率不是太高。另外,Jdk的ThreadLocal,还需要使用者注意内存泄漏问题。作为高性能框架的Netty为了解决上面的两个问题重构了TheadLocal,产生了FastThreadLocal。下面讲解如何具体解决刚才说的问题的。
1、与TheadLocal内部使用类对比
不同对象 | Jdk | Netty | 备注 |
---|---|---|---|
线程 | Thead | FastThreadLocalThread:继成JDK的Thread | netty使用自己的DefaultThreadFactory |
map | ThreadLocalMap | InternalThreadLocalMap | map |
map内部数组 | ThreadLocalMap.entry | UnpaddedInternalThreadLocalMap.indexedVariables | 存储theadLocal |
Runnable | Runnable | FastThreadLocalRunnable | 为了防止内存泄漏,netty的Runnable包装了Runable |
ThreadLocal | ThreadLocal | FastThreadLocalMap |
Thead与FastThreadLocalThread
//继成了Thread,使用InternalThreadLocalMap替代了Thread中的TheadLocal
public class FastThreadLocalThread extends Thread {
// This will be set to true if we have a chance to wrap the Runnable.
private final boolean cleanupFastThreadLocals;
private InternalThreadLocalMap threadLocalMap;
//....省略
}
DefaultThreadFactory
public class DefaultThreadFactory implements ThreadFactory {
//....省略
@Override
public Thread newThread(Runnable r) {
//使用 FastThreadLocalRunnable
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
try {
if (t.isDaemon() != daemon) {
t.setDaemon(daemon);
}
if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}
//使用FastThreadLocal
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}
}
FastThreadLocalRunnable
//继成Runnable
final class FastThreadLocalRunnable implements Runnable {
private final Runnable runnable;
private FastThreadLocalRunnable(Runnable runnable) {
this.runnable = ObjectUtil.checkNotNull(runnable, "runnable");
}
@Override
public void run() {
try {
runnable.run();
} finally {
//线程执行完成。删除theadLocal,防止内存泄漏
FastThreadLocal.removeAll();
}
}
static Runnable wrap(Runnable runnable) {
return runnable instanceof FastThreadLocalRunnable ? runnable : new FastThreadLocalRunnable(runnable);
}
}
UnpaddedInternalThreadLocalMap
class UnpaddedInternalThreadLocalMap {
static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>(); //对没有使用netty的FastThreadLocalThread的使用底层统一使用netty的InternalThreadLocalMap封装V,但使用JDk的ThreadLocal来存储
static final AtomicInteger nextIndex = new AtomicInteger();
/** Used by {@link FastThreadLocal} */
Object[] indexedVariables; //底层存储threadLocal的V的数组
// Core thread-locals
int futureListenerStackDepth;
int localChannelReaderStackDepth;
Map<Class<?>, Boolean> handlerSharableCache;
IntegerHolder counterHashCode;
ThreadLocalRandom random;
Map<Class<?>, TypeParameterMatcher> typeParameterMatcherGetCache;
Map<Class<?>, Map<String, TypeParameterMatcher>> typeParameterMatcherFindCache;
// String-related thread-locals
StringBuilder stringBuilder;
Map<Charset, CharsetEncoder> charsetEncoderCache;
Map<Charset, CharsetDecoder> charsetDecoderCache;
// ArrayList-related thread-locals
ArrayList<Object> arrayList;
UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
this.indexedVariables = indexedVariables;
}
}
2、FastThreadLocal源代码
FastThreadLocal中的三个index
//记录remove index
private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex(); //这里是所有的FastThreadLocal实例使用的删除索引
private final int index; //v 索引
private final int cleanerFlagIndex; //是否放入清除线程队列标记,后面补充
//在构造器内初始化
public FastThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
cleanerFlagIndex = InternalThreadLocalMap.nextVariableIndex();
}
//InternalThreadLocalMap 自增
public static int nextVariableIndex() {
int index = nextIndex.getAndIncrement(); //AtomicInteger,
if (index < 0) {
nextIndex.decrementAndGet();
throw new IllegalStateException("too many thread-local indexed variables");
}
return index;
}
set()
设置v过程是最难得部分,包括创建InternalThreadLocalMap,放入remove Set,非FastThreadLocalThread的线程还需要放入待清楚任务队列
/**
* Set the value for the current thread.
*/
public final void set(V value) {
if (value != InternalThreadLocalMap.UNSET) { //判断是否是要删除threadLocal,InternalThreadLocalMap.UNSET 是Netty内部使用的一个Object,底层数组使用这个默认初始化数据
//获取当前线程的InternalThreadLocalMap
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
//设置v
if (setKnownNotUnset(threadLocalMap, value)) {
//添加清除map的线程,针对使用Jdk的Thread,防止内存泄漏
registerCleaner(threadLocalMap);
}
} else {
remove();//删除对象,清除内存防止内存泄漏
}
}
InternalThreadLocalMap.get()
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) thread); //获取FastThreadLocalThread的
} else {
return slowGet();//获取非FastThreadLocalThread的,一般是Thread
}
}
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) { //没有则创建一个
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}
private static InternalThreadLocalMap slowGet() {
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap; //使用UnpaddedInternalThreadLocalMap的
//ThreadLocal<InternalThreadLocalMap> 存储
//static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
InternalThreadLocalMap ret = slowThreadLocalMap.get();
if (ret == null) {
ret = new InternalThreadLocalMap();//没有则创建一个
slowThreadLocalMap.set(ret);
}
return ret;
}
InternalThreadLocalMap初始化
UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
this.indexedVariables = indexedVariables;
}
private InternalThreadLocalMap() {
super(newIndexedVariableTable());//父类构造方法初始化indexedVariables 存储v的数组
}
private static Object[] newIndexedVariableTable() { //初始化32size的数组 并默认值UNSET
Object[] array = new Object[32];
Arrays.fill(array, UNSET);
return array;
}
setKnownNotUnset
//set值 ,并记录当remove 线程时,或主动删除时要clear的threadLocal
private boolean setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
if (threadLocalMap.setIndexedVariable(index, value)) { //使用索引index记录存储数组索引
addToVariablesToRemove(threadLocalMap, this);
return true;
}
return false;
}
//记录要回收清除的内存
@SuppressWarnings("unchecked")
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); //底层都是用
//UnpaddedInternalThreadLocalMap的 indexedVariables
Set<FastThreadLocal<?>> variablesToRemove;
//v搞成set集合,目的很简单,set里面不会放置重复的 threadLocal,放置同一个threadLocal多次 所有使用TheadLocal都会放到 variablesToRemoveIndex 数组中这个索引位置的
if (v == InternalThreadLocalMap.UNSET || v == null) {
variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
} else {
variablesToRemove = (Set<FastThreadLocal<?>>) v;
}
variablesToRemove.add(variable);//放到要清楚set里面
}
//threadLocalMap//
/**
* @return {@code true} if and only if a new thread-local variable has been created
*/
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables;
if (index < lookup.length) { //判断是否会扩充
Object oldValue = lookup[index];
lookup[index] = value;
return oldValue == UNSET; //只有在覆盖的时候才会返回false
} else {
expandIndexedVariableTableAndSet(index, value);//这个是扩充底层数组,类似hashMap底层扩展
return true;
}
}
//这个是将当前线程的threadLocalmap放入ObjectCleaner清除队里里面,当线程被回收情况下回主动remove threadLocalmap 来回收数据
private void registerCleaner(final InternalThreadLocalMap threadLocalMap) {
Thread current = Thread.currentThread();
//如果是FastThreadLocalThread 线程 则不需要,只需要清除非FastThreadLocalThread的线程的,因为FastThreadLocalThread run中执行的方法在执行完成后会自动remove
//cleanerFlagIndex 记录是否已经放入,保证放入一次
if (FastThreadLocalThread.willCleanupFastThreadLocals(current) ||
threadLocalMap.indexedVariable(cleanerFlagIndex) != InternalThreadLocalMap.UNSET) {
return;
}
// removeIndexedVariable(cleanerFlagIndex) isn't necessary because the finally cleanup is tied to the lifetime
// of the thread, and this Object will be discarded if the associated thread is GCed.
threadLocalMap.setIndexedVariable(cleanerFlagIndex, Boolean.TRUE);
// We will need to ensure we will trigger remove(InternalThreadLocalMap) so everything will be released
// and FastThreadLocal.onRemoval(...) will be called.
ObjectCleaner.register(current, new Runnable() {
@Override
public void run() {
remove(threadLocalMap); //在curent线程被GC回收时执行,用来清除线程的threadLocalMap
// It's fine to not call InternalThreadLocalMap.remove() here as this will only be triggered once
// the Thread is collected by GC. In this case the ThreadLocal will be gone away already.
}
});
}
ObjectCleaner
//这个是防止内存泄漏的核心代码,和FastThreadLocal绑定的线程当被回收时,执行该类中的任务来清除map中的数据
package io.netty.util.internal;
import io.netty.util.concurrent.FastThreadLocalThread;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import static io.netty.util.internal.SystemPropertyUtil.getInt;
import static java.lang.Math.max;
/**
* Allows a way to register some {@link Runnable} that will executed once there are no references to an {@link Object}
* anymore.
*/
public final class ObjectCleaner {
private static final int REFERENCE_QUEUE_POLL_TIMEOUT_MS =
max(500, getInt("io.netty.util.internal.ObjectCleaner.refQueuePollTimeout", 10000));
// Package-private for testing
static final String CLEANER_THREAD_NAME = ObjectCleaner.class.getSimpleName() + "Thread";
// This will hold a reference to the AutomaticCleanerReference which will be removed once we called cleanup()
private static final Set<AutomaticCleanerReference> LIVE_SET = new ConcurrentSet<AutomaticCleanerReference>();
private static final ReferenceQueue<Object> REFERENCE_QUEUE = new ReferenceQueue<Object>();
private static final AtomicBoolean CLEANER_RUNNING = new AtomicBoolean(false);
private static final Runnable CLEANER_TASK = new Runnable() {
@Override
public void run() {
boolean interrupted = false;
for (;;) {
// Keep on processing as long as the LIVE_SET is not empty and once it becomes empty
// See if we can let this thread complete.
while (!LIVE_SET.isEmpty()) {
final AutomaticCleanerReference reference;
try {
reference = (AutomaticCleanerReference) REFERENCE_QUEUE.remove(REFERENCE_QUEUE_POLL_TIMEOUT_MS); //当有线程被GC时,会获取到AutomaticCleanerReference
} catch (InterruptedException ex) {
// Just consume and move on
interrupted = true;
continue;
}
if (reference != null) {
try {
reference.cleanup(); //执行清除threadLocalmap动作
} catch (Throwable ignored) {
// ignore exceptions, and don't log in case the logger throws an exception, blocks, or has
// other unexpected side effects.
}
LIVE_SET.remove(reference);
}
}
CLEANER_RUNNING.set(false);
// Its important to first access the LIVE_SET and then CLEANER_RUNNING to ensure correct
// behavior in multi-threaded environments.
if (LIVE_SET.isEmpty() || !CLEANER_RUNNING.compareAndSet(false, true)) {
// There was nothing added after we set STARTED to false or some other cleanup Thread
// was started already so its safe to let this Thread complete now.
break;
}
}
if (interrupted) {
// As we caught the InterruptedException above we should mark the Thread as interrupted.
Thread.currentThread().interrupt();
}
}
};
/**
* Register the given {@link Object} for which the {@link Runnable} will be executed once there are no references
* to the object anymore.
*
* This should only be used if there are no other ways to execute some cleanup once the Object is not reachable
* anymore because it is not a cheap way to handle the cleanup.
*/
//将线程或要执行的任务放入包装为AutomaticCleanerReference然后放入队列
public static void register(Object object, Runnable cleanupTask) {
//AutomaticCleanerReference继成WeakReference
AutomaticCleanerReference reference = new AutomaticCleanerReference(object,
ObjectUtil.checkNotNull(cleanupTask, "cleanupTask"));
// Its important to add the reference to the LIVE_SET before we access CLEANER_RUNNING to ensure correct
// behavior in multi-threaded environments.
LIVE_SET.add(reference);
// Check if there is already a cleaner running.
if (CLEANER_RUNNING.compareAndSet(false, true)) {
//CAS 如果改线程已经执行则不用启动,没有创建线程去执行CLEANER_TASK任务
final Thread cleanupThread = new FastThreadLocalThread(CLEANER_TASK);
cleanupThread.setPriority(Thread.MIN_PRIORITY); //优先级
// Set to null to ensure we not create classloader leaks by holding a strong reference to the inherited
// classloader.
// See:
// - https://github.com/netty/netty/issues/7290
// - https://bugs.openjdk.java.net/browse/JDK-7008595
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@Override
public Void run() {
cleanupThread.setContextClassLoader(null);
return null;
}
});
cleanupThread.setName(CLEANER_THREAD_NAME);
// Mark this as a daemon thread to ensure that we the JVM can exit if this is the only thread that is
// running.
cleanupThread.setDaemon(true);
cleanupThread.start();
}
}
public static int getLiveSetCount() {
return LIVE_SET.size();
}
private ObjectCleaner() {
// Only contains a static method.
}
private static final class AutomaticCleanerReference extends WeakReference<Object> {
private final Runnable cleanupTask;
//AutomaticCleanerReference继成WeakReference,特点是当referent被回收时会将对应的引用对象放入指定的REFERENCE_QUEUE队列,我们可以使用这个功能来跟踪即将被回收的对象,在被回收之前做些额外的工作 比如复活
AutomaticCleanerReference(Object referent, Runnable cleanupTask) {
super(referent, REFERENCE_QUEUE);
this.cleanupTask = cleanupTask;
}
//执行
void cleanup() {
cleanupTask.run();
}
@Override
public Thread get() {
return null;
}
@Override
public void clear() { //从LIVE_SET移除
LIVE_SET.remove(this);
super.clear();
}
}
}
remove
清除执行动作
public final void remove() {
remove(InternalThreadLocalMap.getIfSet());取到当前线程的InternalThreadLocalMap
}
/**
* Sets the value to uninitialized for the specified thread local map;
* a proceeding call to get() will trigger a call to initialValue().
* The specified thread local map must be for the current thread.
*/
@SuppressWarnings("unchecked")
public final void remove(InternalThreadLocalMap threadLocalMap) {
if (threadLocalMap == null) {
return;
}
Object v = threadLocalMap.removeIndexedVariable(index);//清楚数据
removeFromVariablesToRemove(threadLocalMap, this);
if (v != InternalThreadLocalMap.UNSET) {
try {
onRemoval((V) v); //目前什么也没做
} catch (Exception e) {
PlatformDependent.throwException(e);
}
}
}
//清除当前FastThreadLocal
private static void removeFromVariablesToRemove(
InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v == InternalThreadLocalMap.UNSET || v == null) {
return;
}
@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
variablesToRemove.remove(variable);
}
//InternalThreadLocalMap内的方法
public Object removeIndexedVariable(int index) {
Object[] lookup = indexedVariables;
if (index < lookup.length) { //清除
Object v = lookup[index];
lookup[index] = UNSET; //填充默认
return v;
} else {
return UNSET;
}
}
FastThreadLocal.removeAll()
//在当前线程执行完成后执行的动作调用地方在FastThreadLocalRunnable内 防止内存泄露
public static void removeAll() {
//获取当前线程 InternalThreadLocalMap
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
if (threadLocalMap == null) {
return;
}
try {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v != null && v != InternalThreadLocalMap.UNSET) {
@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
FastThreadLocal<?>[] variablesToRemoveArray =
variablesToRemove.toArray(new FastThreadLocal[variablesToRemove.size()]);
for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
tlv.remove(threadLocalMap); //清除当前FastThreadLocal中的v
}
}
} finally {
InternalThreadLocalMap.remove();
}
}
get
@SuppressWarnings("unchecked")
public final V get() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
V value = initialize(threadLocalMap); //初始化 返回null
registerCleaner(threadLocalMap);//放到清除队列
return value;
}
总结:
1.从代码来看,Netty内部使用了FastThreadLocal关联的一些自定义类,线程,threadLocalMap,runnable等。
2.为防止内存泄露,FastThreadLocal针对Netty内部自己的线程和用户自定义线程在清除map数据有不同的处理方法
3.底层和Jdk使用数组来存储threadLocal的值,但netty直接使用fastThreadLocal的索引来直接定位在数组的位置,高效,但也应清楚,每一个threadLocal都是用了数组两个空间(index,cleanerFlagIndex),所有的threadlocal都使用了variablesToRemoveIndex来存储要清除的threadlocal。相比JDK的ThreadLocal,使用了空间换时间效率。
3.使用非FastThreadLocalThread时,底层也是封装了JDK的thredLocal来存储,如2所述,不管哪类线程,都有对应的防止内存泄露方法。
网友评论