美文网首页
TransmittableThreadLocal源码解析

TransmittableThreadLocal源码解析

作者: 牧羊人刘俏 | 来源:发表于2021-01-11 16:13 被阅读0次

上一篇我们分析InheritableThreadLocal可以解决父子线程之间的变量继承问题,但是对于线程池模式,是无能为力,所以ali开源了一个TransmittableThreadLocal来解决这个问题。
在TransmittableThreadLocal定义了一个static的变量holder,这个hold封装了当前Thread中的所有的inheritableThreadLocals,等于说没有增加一层代理解决不了的事情,如下

 private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder =
            new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
                @Override
                protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
                    //在这个变量中封装了当前线程的所有的InheritableThreadLocal放到map里面,且为Weak引用,防止内存泄漏
                    return new WeakHashMap<TransmittableThreadLocal<?>, Object>();
                }

                @Override
                protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
                    return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue);
                }
            };

然后呢又定义了一个Transmitter对holder进行了进一步的代理封装,其内部封装了几个方法,我们一个一个的看
capture很简单,将此线程关联的所有的InheritableThreadLocal进行获取,也就是holder里面封装的信息

 public static Object capture() {
            Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>();
            for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) {
                captured.put(threadLocal, threadLocal.copyValue());
            }
            return captured;
        }

replay,重放的意思,上一步我们capture到所有的InheritableThreadLocal,我们可以把这个对象应用到当前线程,使得当前线程拥有capture封装的所有的InheritableThreadLocal的信息

 public static Object replay(@Nonnull Object captured) {
            @SuppressWarnings("unchecked")
            Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
            Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();

            for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
                 iterator.hasNext(); ) {
                Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
                TransmittableThreadLocal<?> threadLocal = next.getKey();

                // backup 将此线程里面的InheritableThreadLocal信息先备份,典型的备忘录模式
                backup.put(threadLocal, threadLocal.get());

                // clear the TTL values that is not in captured
                // avoid the extra TTL values after replay when run task
                if (!capturedMap.containsKey(threadLocal)) {
                    iterator.remove();
                    threadLocal.superRemove();
                }
            }

            // set values to captured TTL
            setTtlValuesTo(capturedMap);

            // call beforeExecute callback
            doExecuteCallback(true);

            return backup;
        }
//将capturedMap封装的InheritableThreadLocal设置给当前的线程
 private static void setTtlValuesTo(@Nonnull Map<TransmittableThreadLocal<?>, Object> ttlValues) {
            for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : ttlValues.entrySet()) {
                @SuppressWarnings("unchecked")
                TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
                threadLocal.set(entry.getValue());
            }
        }

既然我们再之前backup了InheritableThreadLocal信息,那么线程之前完成之后,在还原现场,对应的method为restore。源码如下,很简单我们就不讲解了

public static void restore(@Nonnull Object backup) {
            @SuppressWarnings("unchecked")
            Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup;
            // call afterExecute callback
            doExecuteCallback(false);

            for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
                 iterator.hasNext(); ) {
                Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
                TransmittableThreadLocal<?> threadLocal = next.getKey();

                // clear the TTL values that is not in backup
                // avoid the extra TTL values after restore
                if (!backupMap.containsKey(threadLocal)) {
                    iterator.remove();
                    threadLocal.superRemove();
                }
            }

            // restore TTL values
            setTtlValuesTo(backupMap);
        }

针对TransmittableThreadLocal的对象,ali也封装了对应了,TtlRunnable和TtlCallable对象,在new TtlRunnable的时候,也会把父线程的所有InheritableThreadLocal信息全部的传进去,

    private TtlRunnable(@Nonnull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
        this.capturedRef = new AtomicReference<Object>(capture());
        this.runnable = runnable;
        this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
    }

当然要用到ali提供的ExecutorTtlWrapper线程池,将Executor包了一下,源码如下

class ExecutorTtlWrapper implements Executor, TtlEnhanced {
    private final Executor executor;

    ExecutorTtlWrapper(@Nonnull Executor executor) {
        this.executor = executor;
    }
   //重点在这里,将Runnable封装成一个TtlRunnable,而TtlRunnable封装了父线程的InheritableThreadLocal信息。
    @Override
    public void execute(@Nonnull Runnable command) {
        executor.execute(TtlRunnable.get(command));
    }

    @Nonnull
    public Executor unwrap() {
        return executor;
    }
}

而TtlRunnable在run的前后对当前线程进行了replay 和 restore
如下

@Override
    public void run() {
    //得到主线程里面的所有的InheritableThreadLocal信息
        Object captured = capturedRef.get();
        if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
            throw new IllegalStateException("TTL value reference is released after run!");
        }
        //将主线程的InheritableThreadLocal信息应用到当前线程
        Object backup = replay(captured);
        try {
            runnable.run();
        } finally {
           //还原当前线程的InheritableThreadLocal信息
            restore(backup);
        }
    }

相关文章

网友评论

      本文标题:TransmittableThreadLocal源码解析

      本文链接:https://www.haomeiwen.com/subject/thydaktx.html