上一篇我们分析InheritableThreadLocal可以解决父子线程之间的变量继承问题,但是对于线程池模式,是无能为力,所以ali开源了一个TransmittableThreadLocal来解决这个问题。
在TransmittableThreadLocal定义了一个static的变量holder,这个hold封装了当前Thread中的所有的inheritableThreadLocals,等于说没有增加一层代理解决不了的事情,如下
private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder =
new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
@Override
protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
//在这个变量中封装了当前线程的所有的InheritableThreadLocal放到map里面,且为Weak引用,防止内存泄漏
return new WeakHashMap<TransmittableThreadLocal<?>, Object>();
}
@Override
protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue);
}
};
然后呢又定义了一个Transmitter对holder进行了进一步的代理封装,其内部封装了几个方法,我们一个一个的看
capture很简单,将此线程关联的所有的InheritableThreadLocal进行获取,也就是holder里面封装的信息
public static Object capture() {
Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>();
for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) {
captured.put(threadLocal, threadLocal.copyValue());
}
return captured;
}
replay,重放的意思,上一步我们capture到所有的InheritableThreadLocal,我们可以把这个对象应用到当前线程,使得当前线程拥有capture封装的所有的InheritableThreadLocal的信息
public static Object replay(@Nonnull Object captured) {
@SuppressWarnings("unchecked")
Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();
for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();
// backup 将此线程里面的InheritableThreadLocal信息先备份,典型的备忘录模式
backup.put(threadLocal, threadLocal.get());
// clear the TTL values that is not in captured
// avoid the extra TTL values after replay when run task
if (!capturedMap.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// set values to captured TTL
setTtlValuesTo(capturedMap);
// call beforeExecute callback
doExecuteCallback(true);
return backup;
}
//将capturedMap封装的InheritableThreadLocal设置给当前的线程
private static void setTtlValuesTo(@Nonnull Map<TransmittableThreadLocal<?>, Object> ttlValues) {
for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : ttlValues.entrySet()) {
@SuppressWarnings("unchecked")
TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
threadLocal.set(entry.getValue());
}
}
既然我们再之前backup了InheritableThreadLocal信息,那么线程之前完成之后,在还原现场,对应的method为restore。源码如下,很简单我们就不讲解了
public static void restore(@Nonnull Object backup) {
@SuppressWarnings("unchecked")
Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup;
// call afterExecute callback
doExecuteCallback(false);
for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();
// clear the TTL values that is not in backup
// avoid the extra TTL values after restore
if (!backupMap.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// restore TTL values
setTtlValuesTo(backupMap);
}
针对TransmittableThreadLocal的对象,ali也封装了对应了,TtlRunnable和TtlCallable对象,在new TtlRunnable的时候,也会把父线程的所有InheritableThreadLocal信息全部的传进去,
private TtlRunnable(@Nonnull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
this.capturedRef = new AtomicReference<Object>(capture());
this.runnable = runnable;
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
当然要用到ali提供的ExecutorTtlWrapper线程池,将Executor包了一下,源码如下
class ExecutorTtlWrapper implements Executor, TtlEnhanced {
private final Executor executor;
ExecutorTtlWrapper(@Nonnull Executor executor) {
this.executor = executor;
}
//重点在这里,将Runnable封装成一个TtlRunnable,而TtlRunnable封装了父线程的InheritableThreadLocal信息。
@Override
public void execute(@Nonnull Runnable command) {
executor.execute(TtlRunnable.get(command));
}
@Nonnull
public Executor unwrap() {
return executor;
}
}
而TtlRunnable在run的前后对当前线程进行了replay 和 restore
如下
@Override
public void run() {
//得到主线程里面的所有的InheritableThreadLocal信息
Object captured = capturedRef.get();
if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after run!");
}
//将主线程的InheritableThreadLocal信息应用到当前线程
Object backup = replay(captured);
try {
runnable.run();
} finally {
//还原当前线程的InheritableThreadLocal信息
restore(backup);
}
}
网友评论