美文网首页
TransmittableThreadLocal源码分析

TransmittableThreadLocal源码分析

作者: 捞月亮的阿汤哥 | 来源:发表于2020-09-07 18:14 被阅读0次

1. 类继承关系

类继承关系

2. 时序图

时序图直接用作者画的,非常详细


TransmittableThreadLocal-sequence-diagram.png

3. 流程说明

1. createTtl 这一步最重要的就是初始化了holder

private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder =
        new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
            @Override
            protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
                return new WeakHashMap<TransmittableThreadLocal<?>, Object>();
            }
            @Override
            protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
                return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue);
            }
        };

holder的两个重写方法说明:

  • initialValue调用的时机是在holder.get()的时候如果当前线程的ThreadLocalMap为空,则调用这个方法初始化,见单测代码 TtlRunnableTest.test_ttlRunnable_inSameThread
  • childValue调用的时机是子线程初始化的时候 见单测代码 TtlRunnableTest.test_ttlRunnable_asyncWithNewThread
    holder可以说是ttl最核心的变量之一了,理解了这个变量状态的流转,就理解了这个框架。为啥使用WeakHashMap应该是为了内存回收♻️

2. setTtlValue 这一步主要做了两件事

(1)先在ttl的父类ThreadLocal设置该线程的值

@Override
public final void set(T value) {
​    // 先调用ThreadLocal的set方法
    super.set(value);
    if (null == value) { // may set null to remove value
        removeValue();
    } else {
        addValue();
    }
}
​
//ThreadLocal 的set方法
public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}​​​

(2) 往holder中添加当前的ttl

private void addValue() {
    if (!holder.get().containsKey(this)) {
        holder.get().put(this, null); // WeakHashMap supports null value.
        // System.out.println(Thread.currentThread().getName()+" "+holder.get()); //可以加下这个输出下加深理解
    }
}

(3) 调用ThreadLocal的get方法

public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        //调用ttl的holder重写的initialValue方法
        return setInitialValue();
    }

所以有了如下的映射关系,通过当前thread获取获取holder中的keySet,通过遍历keySet获取ttl,通过ttl委托ThreadLocal维护变量的值
holder运行时的结构:

main {com.alibaba.Utils$newTtlInstanceAndPut$ttl$1@1fc2b765(parent-create-unmodified-in-child)=null}

3. createBizTaskRunnable

实现runnable接口, 自己的业务逻辑

4. createTtlRunnableWrapper(Runnable runnable)

这一步最重要的是把当前线程的threadLocal的值给拷贝到了capturedRef ,为啥用AtomicRef,需要原子更新操作TtlRunnable line47

4.1 TtlRunnable get(Runnable runnable)

//runnable 提交的业务runnable
//releaseTtlValueReferenceAfterRun 是否在value返回后释放,避免内存泄漏
//idempotent 是否需要幂等
public static TtlRunnable get(Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
        if (null == runnable) {
            return null;
        }

        if (runnable instanceof TtlRunnable) {
            if (idempotent) {
                // avoid redundant decoration, and ensure idempotency
                return (TtlRunnable) runnable;
            } else {
                throw new IllegalStateException("Already TtlRunnable!");
            }
        }
        return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
    }

4.2 创建新的TtlRunnable包装runnable

private TtlRunnable(Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
        //该线程包含的所有ttl的变量
        //类型是 Map<TransmittableThreadLocal<?>, Object>
        this.capturedRef = new AtomicReference<Object>(capture());
        this.runnable = runnable;
        //是否需要在使用后释放
        this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
    }

4.3 获取当前线程所有的ttl的实例和对应的threadLocal的值

重点: capture的是父线程的ttl, 也就是创建TtlRunnable的线程,这个很重要很容易误解。

public static Object capture() {
            Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>();
            //holder.get().keySet()是当前线程使用的ttl
            //在set这一步 holder添加了ttl的关系,同时ttl中set了当前线程的value
            for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) {
                captured.put(threadLocal, threadLocal.copyValue());
            }
            return captured;
        }

5. submitTtlRunnableToThreadPool

提交任务给线程池

6. 线程池执行run方法

步骤6之后的逻辑已经和业务代码没有关系了

(1) TtlRunnable重写了Runnable的run方法

@Override
public void run() {
    //获取任务提交时,即包装runnable时的ttl
    Object captured = capturedRef.get();
    if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
        throw new IllegalStateException("TTL value reference is released after run!");
    }
    //执行前 获取backup
    Object backup = replay(captured);
    try {
        //执行run方法
        runnable.run();
    } finally {
        //通过backup恢复当前线程的ttl,避免run方法中修改了ttl
        //避免线程复用造成父线程的ttl丢失
        restore(backup);
    }
}

(2) replay(captured)方法

  • 声明两个局部变量 capturedMap和backup
  • 遍历holder.get().entrySet().iterator()
    • 将threadLocal的值放到backup变量
    • 如果capturedMap不包含holder中该线程的threadLocal的key,将holder中多余的threadLocal key remove, 将其父类的threadLocal变量移除,避免线程运行时获取ttl的干扰
public static Object replay(Object captured) {
            @SuppressWarnings("unchecked")
            Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
            Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();

            for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
                 iterator.hasNext(); ) {
                Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
                TransmittableThreadLocal<?> threadLocal = next.getKey();

                // backup
                backup.put(threadLocal, threadLocal.get());

                // clear the TTL values that is not in captured
                // avoid the extra TTL values after replay when run task
                if (!capturedMap.containsKey(threadLocal)) {
                    iterator.remove();
                    threadLocal.superRemove();
                }
            }

            // set values to captured TTL
            setTtlValuesTo(capturedMap);

            // call beforeExecute callback
            doExecuteCallback(true);

            return backup;
        }

将capturedMap的值拷贝到当前线程的threadLocal,并更新holder,这一步的精髓就是通过这个Map<TransmittableThreadLocal<?>, Object>类型的值将ttl从父线程往子线程中塞数据。类似的用法其实还有打破java双亲委托往threadContext中塞数据,典型应用就是jdbc的驱动的实现

private static void setTtlValuesTo(Map<TransmittableThreadLocal<?>, Object> ttlValues) {
            for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : ttlValues.entrySet()) {
                @SuppressWarnings("unchecked")
                TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
                threadLocal.set(entry.getValue());
            }
        }

doExecuteCallback 自定义扩展逻辑,可以做前置拦截

(3) 执行run方法

(4) 执行后 restore(backup)

将当前线程的ttl恢复到任务提交时

  • doExecuteCallback(false) 后置处理方法,用户可以自定义
  • 遍历holder.get().entrySet().iterator()
    • 如果backupMap不包含holder中该线程的threadLocal的key,将holder中多余的threadLocal key remove
  • setTtlValuesTo(backupMap)

4. 重点

  • 理解TransmittableThreadLocal的holder变量的流转

    • holder是一个InheritableThreadLocal静态变量, InheritableThreadLocal可以让threadLocal的值在子线程init的时候从父线程传递到子线程
    • holder其实维护了两层关系
      • 当前线程本地变量和ttl的映射关系
      • 任务提交时holder和所有提交时的ttl的映射关系
  • 状态流转图


    transmittable holder.png
  • 框架需要实现的目的是啥
    • 把 任务提交给线程池时的ThreadLocal值传递到 任务执行时

5. 学习的地方

  • 核心代码非常少, 王垠在博文《如何阅读别人的代码》说到的"造就我今天的编程能力和洞察力的,不是几百万行的大型项目,而是小到几行,几十行之短的练习。不要小看了这些短小的代码,它们就是编程最精髓的东西。反反复复琢磨这些短小的代码,不断改进和提炼里面的结构,磨砺自己的思维"。 个人觉得ttl框架精髓主要是holder变量的设计和维护,使用capturedRef实现父子(线程池)的ttl值传递,深刻理解ThreadLocal和InheritableThreadLocal的继承关系,以及Thread和ThreadLocalMap的关系。
  • 代码的注释非常的详细,比如方法的描述,参数的具体的意义,功能点是哪个版本之后才有的,当然作者的markdown的文档写的不错很生动
  • 用到了很多设计模式,比如模版方法模式,装饰器模式,委托模式等
  • 用到了类似AOP的概念 可以实现自己的前置和后置逻辑
  • 通过中间变量进行上层和下层的传递
  • 测试用例写的非常详尽,遇到不理解的可以通过单测来加深理解,算是我看过的非常不错的注释详尽的单测代码了👍

相关文章

网友评论

      本文标题:TransmittableThreadLocal源码分析

      本文链接:https://www.haomeiwen.com/subject/lhklektx.html