美文网首页GitHub 中文社区
TransmittableThreadLocal 源码解析

TransmittableThreadLocal 源码解析

作者: _晓__ | 来源:发表于2018-09-30 11:13 被阅读0次

    1、介绍

    TransmittableThreadLocal(TTL) 是 Alibaba 开源的,用于解决在使用线程池等会池化复用线程的组件情况下,提供 ThreadLocal 值的传递功能,解决异步执行时上下文传递的问题。TransmittableThreadLocal 需要配合 TTL 提供的 TtlExecutors、TtlRunnable 和 TtlCallable使用,也可以使用 Java Agent 无侵入式实现线程池的传递。

    2、使用场景

    • 调用链中的 traceId 传递
    • 框架层封装的业务工具在业务线程池中传递
    • Slf4j 日志框架中 MDC 传递

    3、TTL 执行流程

    TTL 执行流程.png

    4、源码解析

    TransmittableThreadLocal 继承于 InheritableThreadLocal,并拥有了 InheritableThreadLocal 对子线程传递上下文的特性,只需解决线程池上下文传递问题。

    4.1、项目结构

    TTL 项目结构.png

    4.2、核心部分源码

    在 TransmittableThreadLocal 中,定义了一个全局静态变量 holder,用于存储使用 TransmittableThreadLocal set 的上下文。

    private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder =
                new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
                    protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
                        return new WeakHashMap<TransmittableThreadLocal<?>, Object>();
                    }
    
                    protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
                        return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue);
                    }
                };
    
    • initialValue 方法会在 InheritableThreadLocal 创建时被调用,默认创建一个 WeakHashMap。
    • childValue 方法会在创建子线程时,Thread 调用 init 方法,会调用 ThreadLocal.createInheritedMap(parent.inheritableThreadLocals),createInheritedMap 中会创建 ThreadLocalMap,ThreadLocalMap 的构造方法中会调用 childValue 方法。
    public class Thread implements Runnable {
    
        public Thread() {
            init(null, null, "Thread-" + nextThreadNum(), 0);
        }
    
        private void init(ThreadGroup g, Runnable target, String name,
                          long stackSize, AccessControlContext acc,
                          boolean inheritThreadLocals) {
    
            Thread parent = currentThread();
            // TODO 忽略其他源码
            if (inheritThreadLocals && parent.inheritableThreadLocals != null)
                this.inheritableThreadLocals =
                    ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
    
            // TODO 忽略其他源码
        }
    }
    
    public class ThreadLocal<T> {
        static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
            return new ThreadLocalMap(parentMap);
        }
    
        private ThreadLocalMap(ThreadLocalMap parentMap) {
            // TODO 忽略部分代码
    
            for (int j = 0; j < len; j++) {
                if (e != null) {
                    ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
                    if (key != null) {
                        Object value = key.childValue(e.value);
                        // TODO 忽略部分代码
                    }
                }
            }
        }
    }
    

    可能有人有疑问,为什么使用 WeakHashMap。关于 WeakHashMap 不了解的,大家可以自行查询一下。这里只是阐述一下,为什么 TTL 会使用 WeakHashMap。

    • 使用 WeakHashMap 是 “继承” ThreadLocalMap.Entry 的“优良传统”,在没有其它强引用的情况下,下一次GC 时才会被垃圾回收,避免内存泄露。
    • 程序中可能会使用到多个 ThreadLocal,所以需要使用 Map 作为容器储存,使用 Map 还能快速 remove 当前 ThreadLocal。

    在使用线程池时,需要使用 TTL 提供的 TtlExecutors 包装,如:

    TtlExecutors.getTtlExecutor(Executors.newCachedThreadPool());
    

    让我们继续跟进 TtlExecutors.getTtlExecutor 方法中,探究下这个方法里面究竟做了什么?

    public final class TtlExecutors {
      public static Executor getTtlExecutor(Executor executor) {
            if (null == executor || executor instanceof ExecutorTtlWrapper) {
                return executor;
            }
            return new ExecutorTtlWrapper(executor);
        }
    }
    
    • 使用 ExecutorTtlWrapper 包装 Executor。

    使用 ExecutorTtlWrapper 包装有什么用呢?那么就继续看看 ExecutorTtlWrapper 里面的实现:

    class ExecutorTtlWrapper implements Executor {
        private final Executor executor;
    
        ExecutorTtlWrapper(Executor executor) {
            this.executor = executor;
        }
    
        public void execute(Runnable command) {
            executor.execute(TtlRunnable.get(command));
        }
    
        // TODO 忽略部分代码
    }
    
    • 重点是在执行 execute 方法的时候,使用 TtlRunnable 做了线程上下文的处理,再执行真正的 Runnable run 方法。

    现在重点介绍一下 TtlRunnable 里面做了什么处理:

    public final class TtlRunnable implements Runnable {
        private final AtomicReference<Object> capturedRef;
        private final Runnable runnable;
        private final boolean releaseTtlValueReferenceAfterRun;
    
        private TtlRunnable(Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
            this.capturedRef = new AtomicReference<Object>(TransmittableThreadLocal.Transmitter.capture());
            // TODO 忽略部分代码
        }
    
        public void run() {
            Object captured = capturedRef.get();
            // TODO 忽略部分代码
    
            Object backup = TransmittableThreadLocal.Transmitter.replay(captured);
            try {
                runnable.run();
            } finally {
                TransmittableThreadLocal.Transmitter.restore(backup);
            }
        }
    
         public static TtlRunnable get(Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
            // TODO 忽略部分代码
            return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
        }
    }
    
    public static class Transmitter {
            
            public static Object capture() {
                Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>();
                for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) {
                    captured.put(threadLocal, threadLocal.copyValue());
                }
                return captured;
            }
    
            public static Object replay(Object captured) {
                Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
                Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();
    
                for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
                     iterator.hasNext(); ) {
                    Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
                    TransmittableThreadLocal<?> threadLocal = next.getKey();
    
                    // backup
                    backup.put(threadLocal, threadLocal.get());
    
                    // clear the TTL values that is not in captured
                    // avoid the extra TTL values after replay when run task
                    if (!capturedMap.containsKey(threadLocal)) {
                        iterator.remove();
                        threadLocal.superRemove();
                    }
                }
    
                // set values to captured TTL
                setTtlValuesTo(capturedMap);
    
                // call beforeExecute callback
                doExecuteCallback(true);
    
                return backup;
            }
    
            public static void restore(Object backup) {
                Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup;
                // call afterExecute callback
                doExecuteCallback(false);
    
                for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
                     iterator.hasNext(); ) {
                    Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
                    TransmittableThreadLocal<?> threadLocal = next.getKey();
    
                    // clear the TTL values that is not in backup
                    // avoid the extra TTL values after restore
                    if (!backupMap.containsKey(threadLocal)) {
                        iterator.remove();
                        threadLocal.superRemove();
                    }
                }
    
                // restore TTL values
                setTtlValuesTo(backupMap);
            }
    
            private static void setTtlValuesTo(Map<TransmittableThreadLocal<?>, Object> ttlValues) {
                for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : ttlValues.entrySet()) {
                    TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
                    threadLocal.set(entry.getValue());
                }
            }
    }
    
    • TtlRunnable 是实现于 Runnable,所以线程池执行的是 TtlRunnable,但是在 TtlRunnable run 方法中会执行 Runnable run 方法。
    • 线程池执行时,执行了 ExecutorTtlWrapper execute 方法,execute 方法中调用了 TtlRunnable.get(command) ,get 方法中创建了一个 TtlRunnable 对象返回了。
    • TtlRunnable 构造方法中,调用了 TransmittableThreadLocal.Transmitter.capture() 获取当前线程中所有的上下文,并储存在 AtomicReference 中。
    • 当线程执行时,调用 TtlRunnable run 方法,TtlRunnable 会从 AtomicReference 中获取出调用线程中所有的上下文,并把上下文给 TransmittableThreadLocal.Transmitter.replay 方法把上下文复制到当前线程。并把上下文备份。
    • 当线程执行完,调用 TransmittableThreadLocal.Transmitter.restore 并把备份的上下文传入,恢复备份的上下文,把后面新增的上下文删除,并重新把上下文复制到当前线程。

    PS:

    • Log4j2 MDC 集成 TTL
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>log4j2-ttl-thread-context-map</artifactId>
        <version>1.2.0</version>
    </dependency>
    
    • Logback MDC 集成 TTL
    <dependency>
        <groupId>com.ofpay</groupId>
        <artifactId>logback-mdc-ttl</artifactId>
        <version>1.0.2</version>
    </dependency>
    

    相关文章

      网友评论

        本文标题:TransmittableThreadLocal 源码解析

        本文链接:https://www.haomeiwen.com/subject/jycboftx.html