美文网首页Java
ThreadLocal系列之——父子线程传递线程私有数据(四)

ThreadLocal系列之——父子线程传递线程私有数据(四)

作者: ZX_周雄 | 来源:发表于2020-09-13 10:23 被阅读0次

前情回顾

前文,介绍了ThreadLocal作者们(Josh Bloch and Doug Lea)为内存泄露做的努力,将内存泄露造成的影响降到了最低,且着重分享了软件设计上的一个Trade Off:如何权衡内存占用与CPU占用之间的关系,该折中思想与Redis的过期淘汰策略一致(知识的迁移)

本文,将会接着分享ThreadLocal的其他局限性,并给出相应的解决方案

局限性

局限性一:父线程无法通过ThreadLocal向子线程传递线程私有数据

首先,通过本系列第二篇文章的学习,我们可以肯定的是:ThreadLocal本意上就是线程私有的数据(从命名上也可以看出来),每个线程维护着自己的一份副本,线程与线程之间数据隔离。父线程若要向子线程传递线程私有数据,肯定是不能通过ThreadLocal自己来实现,否则也就不能叫ThreadLocal

那,我们会有父线程向子线程传递线程私有数据的需求吗?

答案是肯定的,通过本系列第一篇文章的学习,我们也知道不少场景中会用到ThreadLocal来传递数据,但请注意,从请求开始到请求结束使用的都是使用的同一个线程,并没有跨线程传递。假设请求进来的时候是由线程A处理,且将后续流程节点需要使用到的信息放到ThreadLocal里,那么在后续处理节点中,由于性能原因或者是其它原因,需要将一些任务异步执行,就要另外开启线程B(由于B线程是在A线程执行过程中实例化,因此我们称B为A的子线程),但此时线程B无法通过ThreadLocal获取线程A在请求开始处放进去的信息。因此问题就产生了:父线程A需要向子线程B传递线程私有数据,但是TheadLocal做不到

示例代码如下:

ThreadLocal<String> tl = new ThreadLocal<>();
tl.set("foo");

Thread b = new Thread(() -> {
    String str = tl.get();
    // 企图拿到父线程放进去的foo,然而失败了!
    System.out.println(str);
});

b.start();

// 子线程B启动后,父线程A休息3秒,确保子线程执行完毕
TimeUnit.SECONDS.sleep(3);

// 清理
tl.remove();

结果:子线程B打印null,示意图如下:

image
解决方案:使用InheritableThreadLocal

InheritableThreadLocal是ThreadLocal的子类,它就是用来解决父线程向子线程传递线程私有数据问题的。示例代码只改第一行ThreadLocal<String> tl = new ThreadLocal<>();改为ThreadLocal<String> tl = new InheritableThreadLocal<>();,其它不变,如下示:

ThreadLocal<String> tl = new InheritableThreadLocal<>();
tl.set("foo");

Thread b = new Thread(() -> {
    String str = tl.get();
    // 企图拿到父线程放进去的foo,成功了!因为tl是InheritableThreadLocal
    System.out.println(str);
});

b.start();

// 子线程B启动后,父线程A休息3秒,确保子线程执行完毕
TimeUnit.SECONDS.sleep(3);

// 清理
tl.remove();

结果:子线程B打印foo,示意图如下:

image

可以看到,使用InheritableThreadLocal后,子线程能够通过tl获取父线程中set的值,实现了父线程向子线程传递线程私有数据的能力

这背后的原理是什么呢?

首先,Thread类有两个ThreadLocalMap类型的成员变量:threadLocals、inheritableThreadLocals,ThreadLocal使用的是threadLocals,而InheritableThreadLocal使用的是inheritableThreadLocals

/* ThreadLocal values pertaining to this thread. This map is maintained
 * by the ThreadLocal class. */
ThreadLocal.ThreadLocalMap threadLocals = null;

/*
 * InheritableThreadLocal values pertaining to this thread. This map is
 * maintained by the InheritableThreadLocal class.
 */
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;

执行tl.set("foo");时,对应的逻辑如下:

// java.lang.ThreadLocal#set
public void set(T value) {
    Thread t = Thread.currentThread();
    ThreadLocalMap map = getMap(t);
    if (map != null)
        map.set(this, value);
    else
        createMap(t, value);
}

由于是第一次执行,map为空,走createMap的逻辑,而不同的tl实现类(ThreadLocal、InheritableThreadLocal)有着不同的创建map逻辑

image

ThreadLocal的如下:

void createMap(Thread t, T firstValue) {
    t.threadLocals = new ThreadLocalMap(this, firstValue);
}

InheritableThreadLocal的如下:

void createMap(Thread t, T firstValue) {
    t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}

很轻易能够看出区别:ThreadLocal是给当前线程的threadLocals变量赋值,而InheritableThreadLocal则是给当前线程的inheritableThreadLocals变量赋值

其次,在创建线程实例,即new Thread()时,父线程使用自己的inheritableThreadLocals通过调用createInheritedMap方法来构造子线程的ThreadLocalMap并赋值给子线程的inheritableThreadLocals

// java.lang.Thread#init

Thread parent = currentThread();

if (inheritThreadLocals && parent.inheritableThreadLocals != null)
    this.inheritableThreadLocals =
        ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);

static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
    return new ThreadLocalMap(parentMap);
}
private ThreadLocalMap(ThreadLocalMap parentMap) {
    // 父线程的Entry
    Entry[] parentTable = parentMap.table;
    int len = parentTable.length;
    setThreshold(len);
    table = new Entry[len];

    // 依次"拷贝"到子线程的Entry里
    for (int j = 0; j < len; j++) {
        Entry e = parentTable[j];
        if (e != null) {
            @SuppressWarnings("unchecked")
            ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
            if (key != null) {
                Object value = key.childValue(e.value); // 注意这一行代码
                Entry c = new Entry(key, value);
                int h = key.threadLocalHashCode & (len - 1);
                while (table[h] != null)
                    h = nextIndex(h, len);
                table[h] = c;
                size++;
            }
        }
    }
}

注意上边标注的一行:Object value = key.childValue(e.value);,方法childValue是一个可重载方法,对于InheritableThreadLocal而言,直接返回了传进来的值:

protected T childValue(T parentValue) {
    return parentValue;
}

这意味着是:浅拷贝,即默认情况下,父线程传递给子线程的线程私有数据是一份浅拷贝,若拷贝指向个不可变对象,这不是什么问题;若指向一个引用,那尤其需要注意,因为不再有线程封闭,父子线程中任意的变更均会影响到对方。我们说它是可重载方法,意味着若要实现"Deep Copy",需要自行继承InheritableThreadLocal类并重写childValue方法

小结一下解决方案背后的原理:使用InheritableThreadLocal,会将线程私有数据存储在inheritableThreadLocals指向的ThreadLocalMap中;在构造子线程时,将当前线程inheritableThreadLocals里的数据(ThreadLocalMap)"拷贝"给子线程的ThreadLocalMap,子线程因此可以通过tl.get()取到数据,如此便实现了父线程向子线程传递线程私有数据

image
局限性二:父线程无法通过InheritableThreadLocal向池化的子线程(线程池)传递线程私有数据

在日常开发过程中,由于构造与销毁子线程开销大,因此每次在业务代码中重新构造一个子线程的方式并不常见,更常见的方式是将线程池化(线程池),由线程池的调度策略决定线程们如何执行提交给池中的任务,避免了重复构造与销毁线程的开销。上面我们提到,InheritableThreadLocal可以解决父线程向子线程传递线程私有数据的问题,但一旦子线程池化之后,InheritableThreadLocal也将不再起作用

看下面一段示例代码:

ExecutorService executorService = Executors.newFixedThreadPool(1);
ThreadLocal<String> tl = new InheritableThreadLocal<>();

// ------------第一次调用 start -------------------
tl.set("foo");
executorService.execute(() -> {
    String str = tl.get();
    // 企图拿到父线程放进去的foo,成功了!
    System.out.println(str);
});
// 父线程A休息3秒,确保提交给线程池的任务执行完毕
TimeUnit.SECONDS.sleep(3);
// 清理
tl.remove();
// ------------第一次调用 end -------------------



// ------------第二次调用 start -------------------
tl.set("bar");
executorService.execute(() -> {
    String str = tl.get();
    // 企图拿到父线程放进去的bar,失败了!
    System.out.println(str);
});
// 父线程A休息3秒,确保提交给线程池的任务执行完毕
TimeUnit.SECONDS.sleep(3);
// 清理
tl.remove();
// ------------第二次调用 end -------------------

如上,仍使用InheritableThreadLocal,且构造了只有1个线程的线程池,然后模拟两次外部调用:第一次在父线程中将tl赋值为foo,然后子线程中获取,使用完毕之后remove;第二次在父线程中将tl赋值为bar,然后子线程中获取,使用完毕之后remove

我们期望的结果是: foo bar

然而实际的结果却是: foo foo

我们得到了与期望不符的结果,原因很简单,在局限性一解决方案原理中已经阐述:子线程的ThreadLocalMap数据是在创建线程的那一刻从父线程中"拷贝"而来,此后再也没有促使其变化的地方,而子线程由于池化复用的缘故,ThreadLocalMap一直持有的是线程创建时刻的数据(即foo),此后无论进行多少次方法调用,在(池化)子线程中通过tl.get()取出来的永远是foo

解决方案:使用transmittable-thread-local(alibaba)

暂不提开源框架是如何解决这个问题的,我们先自己推导,进而得出与开源框架一致的结论,帮助大家理好地理解解决方案背后的原理

一个基本认知:无论执行execute或者submit方法,向线程池提交的是任务(task)

image image

仔细推敲琢磨一下,我们需要的并不是创建线程的那一刻父线程的ThreadLocal值,而是提交任务时父线程的ThreadLocal值。或者换种表述方式,需要把任务提交给线程池时ThreadLocal值传递到任务执行时

具体思路是:父线程把任务提交给线程池时一同附上此刻自己的ThreadLocalMap,包装在task里,待线程池中某个线程执行到该任务时,用task里的ThreadLocalMap赋盖当前线程ThreadLocalMap,这样就完成了父线程向池化的子线程传递线程私有数据的目标。为了避免数据污染,待任务执行完后,线程归还回线程池之前,还需要还原ThreadLocalMap,如下示:

image

上面的步骤一共有6步,其中2、4、6是线程池本身的提供的能力,不需要改动,只有1、3、5有所不同,我们逐一剖析

第1步疑问:提交Task的时候如何将ThreadLocalMap一同提交上去?此处难点在于如何获取当前线程的ThreadLocalMap

本系列第二篇文章中提到,ThreadLocalMap是线程私有变量,只会被ThreadLocal维护,对于外部类而言是不可见的,因此要操作ThreadLocalMap就得通过ThreadLocal(操作ThreadLocal本质上是在操作当前线程的ThreadLocalMap)

首先自定义Task,用于包装维护父线程ThreadLocalMap

public class MyTask implements Runnable {
        // key是ThreadLocal,value是对应父线程的线程私有数据
    private final Map<ThreadLocal<Object>, Object> threadLocalValues;

    public MyTask(ThreadLocal<Object>... threadLocals) {
        this.threadLocalValues = new HashMap<>(threadLocals.length);
        capture(threadLocals);
    }

    private void capture(ThreadLocal<Object>[] threadLocals) {
        for (ThreadLocal<Object> threadLocal : threadLocals) {
            threadLocalValues.put(threadLocal, threadLocal.get());
        }
    }

    @Override
    public void run() {
        // todo
    }
}

使用时:

ThreadLocal<Object> tl1 = new ThreadLocal<>();
tl1.set("111111");

ThreadLocal<Object> tl2 = new ThreadLocal<>();
tl2.set("222222");


ExecutorService executorService = Executors.newFixedThreadPool(1);
// 将父线程的ThreadLocal传进去,就能取到相应的值
executorService.execute(new MyTask(tl1, tl2));

tl1.remove();
tl2.remove();

如此这般,提交到线程池的MyTask就包含了父线程的ThreadLoalMap数据。我们把"拷贝"父线程TheadLocalMap的行为称为capture(拍照),一个很生动形象的词:将父线程提交任务时刻的ThreadLocal值拍个快照并保存起来,后续使用

第3步疑问:如何用父线程的ThreadLocalMap覆盖当前执行任务线程的ThreadLocalMap?

我们可以想像到,代码正执行到MyTask#run()方法,在该方法内部,能感知的上下文环境是正执行该方法的线程,以及MyTask维护的threadLocalValues(快照),除此之外,它获取不了任何外界信息--> 这称之为线程封闭

因此可以比较自然地推理出,是要用MyTask的threadLocalValues(快照)去覆盖当前线程的ThreadLocalMap。我们称这个动作为replay(重放)

public class MyTask implements Runnable {
   // ...(省略)

    @Override
    public void run() {
        replay();
        // do biz
    }

    // 重放,用快照去覆盖当前线程的ThreadLocalMap
    private void replay() {
        for (Map.Entry<ThreadLocal<Object>, Object> entry : threadLocalValues.entrySet()) {
            ThreadLocal<Object> threadLocal = entry.getKey();
            threadLocal.set(entry.getValue());
        }
    }
}

第5步疑问:如何把当前线程的ThreadLocalMap还原?

Task业务逻辑执行完之后,毫无疑问需要将ThreadLocalMap还原,否则可能产生数据污染的风险

能够还原的前提是,用快照去覆盖当前线程的ThreadLocalMap之前,先将当前的ThreadLocal值保存起来,因此,修改代码如下:

private Object replay() {
        // 保存当前的ThreadLocal值
    Map<ThreadLocal<Object>, Object> backup = new HashMap<>();
    for (ThreadLocal<Object> threadLocal : threadLocalValues.keySet()) {
        backup.put(threadLocal, threadLocal.get());
    }

    for (Map.Entry<ThreadLocal<Object>, Object> entry : threadLocalValues.entrySet()) {
        ThreadLocal<Object> threadLocal = entry.getKey();
        threadLocal.set(entry.getValue());
    }

    return backup;
}

业务代码执行完之后,进行restore(还原):

public class MyTask implements Runnable {
    // ...(省略)
    
    @Override
    public void run() {
        Object backup = replay();
        try {
            // do biz
        } finally {
            restore(backup);
        }
    }
        
        // 还原
    private void restore(Object obj) {
        Map<ThreadLocal<Object>, Object> backup = (Map<ThreadLocal<Object>, Object>) obj;

        for (Map.Entry<ThreadLocal<Object>, Object> entry : backup.entrySet()) {
            ThreadLocal<Object> threadLocal = entry.getKey();
            threadLocal.set(entry.getValue());
        }
    }
}

经过上面的1、3、5步分析,我们已经把所有关键问题都分析完毕,因此,做一个小实验看看结果,测试代码:

ExecutorService executorService = Executors.newFixedThreadPool(1);
ThreadLocal<Object> tl1 = new ThreadLocal<>();
ThreadLocal<Object> tl2 = new ThreadLocal<>();


// ------------第一次调用 start -------------------
tl1.set("1111");
tl2.set("2222");

executorService.execute(new MyTask(tl1, tl2));

tl1.remove();
tl2.remove();
// ------------第一次调用 end -------------------


// ------------第二次调用 start -------------------
tl1.set("3333");
tl2.set("4444");

executorService.execute(new MyTask(tl1, tl2));

tl1.remove();
tl2.remove();
// ------------第二次调用 end -------------------

public void run() {
    Object backup = replay();
    try {
        // do biz
        doBiz();
    } finally {
        restore(backup);
    }
}

private void doBiz() {
        // 打印父线程提交任务时的ThreadLocal值
    Set<ThreadLocal<Object>> threadLocals = threadLocalValues.keySet();
    for (ThreadLocal<Object> threadLocal : threadLocals) {
        System.out.println(threadLocal.get());
    }
}

测试代码调用了分为两次调用:第一次分别在父线程中给tl1、tl2赋值1111、2222,然后提交任务;第二次分别给tl1、tl2赋值为3333、4444;MyTask执行的逻辑是打印父线程提交任务时的ThreadLocal值

预期输出: 1111 2222 3333 4444

实际输出: 1111 2222 3333 4444

image

如此,证明该思路是可行的

但是这样还不够,稍稍有些经验的朋友应该能感受到,上面的写法仅是达到了"可用"的程度,离"好用"还有一段距离:业务在向线程池提交任务的时候,需要每次都构建自定义的Task,并将ThreadLocal的引用传入,而且Task糅进了TheadLocal管理的逻辑,这样其实形成了"业务侵入性",没有做到与业务解耦,这样的代码是不可维护的

TheadLocal管理的逻辑,业务代码不应该关心,因此为了与业务解耦,容易想到的一种解决方案是:代理。通过代理可以实现对被代理类的逻辑增强,并将通用的非业务逻辑与业务代码隔离开来(设计模式萦绕心头)

提及代理,熟悉的同学对于套路了然于心:定义代理类,实现与被代理类相同的接口,并在内部维护被代理类的实例,之后就可以对被代理的方法实现额外逻辑,来增强被代理类,代码变更后如下:

public class MyTask implements Runnable {
    private Runnable task;
    private final Map<ThreadLocal<Object>, Object> threadLocalValues;

    public MyTask(Runnable task, ThreadLocal<Object>... threadLocals) {
        this.task = task;
        this.threadLocalValues = new HashMap<>(threadLocals.length);
        capture(threadLocals);
    }

    // ...(省略)

    @Override
    public void run() {
        Object backup = replay(); // 增强的逻辑
        try {
            task.run(); // 执行业务代码
        } finally {
            restore(backup); // 增强的逻辑
        }
    }
}

使用方式:

executorService.execute(new MyTask(() -> {
    // do biz
}, tl1, tl2));

这样提交任务时,业务开发只需要关心()->{// do biz} 业务逻辑,而不需要关心MyTask代理类如何实现的增强逻辑,做到了与业务代码的解耦

但是,上面的代码书写起来仍然有些别扭:需要业务方主动传递tl1、tl2,说明业务方仍然需要有一定程度的参与,那能不能更彻底一些,连传递tl1、tl2的行为都省去呢?答案是肯定的

如果要避免父线程"主动"传递ThreadLocal的行为,那么就必须要知道父线程往ThreadLocalMap放数据这件事,并且在事件发生的时候将ThreadLocal引用保存下来;同时,如果父线程调用ThreadLocal#remove方法清除数据,也需要将保存下来的ThreadLocal引用一同清除掉

因此,需要自定义ThreadLocal类:

public class MyThreadLocal<T> extends ThreadLocal<T> {

    private static MyThreadLocal<HashSet<MyThreadLocal<Object>>> holder =
            new MyThreadLocal<HashSet<MyThreadLocal<Object>>>() {
                @Override
                protected HashSet<MyThreadLocal<Object>> initialValue() {
                    return new HashSet<>();
                }
            };

    // 重写set方法
    @Override
    public void set(T value) {
        super.set(value);
        addThisToHolder();
    }
        
        // 将ThreadLocal引用记录下来
    private void addThisToHolder() {
        if (!holder.get().contains(this)) {
            holder.get().add((MyThreadLocal<Object>) this);
        }
    }
    
    // 重写remove方法
    @Override
    public void remove() {
        super.remove();
        removeThisFromHolder();
    }
    
        // 将ThreadLocal引用记录移除
    private void removeThisFromHolder() {
        holder.get().remove(this);
    }
}

重写set方法,在ThreadLocal#set方法执行时将ThreadLocal引用记录下来,保存在类成员变量holder中;重写remove方法,在ThreadLocal#remove方法执行时将ThreadLocal引用一并移除

接下来,父线程向线程池提交Task,不再传递ThreadLocal的引用,那又怎么完成capture的动作呢?(如果看官们忘记了,请往上翻,前文在执行capture方法时,入参是ThreadLocal引用数组)

既然我们将ThreadLocal的引用保存在MyThreadLocal#holder这个静态变量中,那我们想办法暴露holder,不就可以得到capture需要的入参了吗?

image

这种思路诚然是可行的,但从面向对象设计角度而言却不是最优的:holder是MyThreadLocal的静态成员变量,维护的数据是ThreadLocal集合,它不应该将自身数据暴露出去,而是遵循高内聚的设计原则,提供数据操作的能力(方法),例如提供capture的能力,但操作本身需要维护在类内部。因此应该是MyThreadLocal提供capture的能力,然后由需求方(MyTask)进行调用。代码如下:

public class MyThreadLocal<T> extends ThreadLocal<T> {

    // ...(省略)

    public static class DataTransmit {

        public static Map<ThreadLocal<Object>, Object> capture() {
            Map<ThreadLocal<Object>, Object> threadLocalValues = new HashMap<>();
            for (MyThreadLocal<Object> threadLocal : holder.get()) {
                threadLocalValues.put(threadLocal, threadLocal.get());
            }
            return threadLocalValues;
        }
    }
}

此处,我在MyThreadLocal中定义了一个内部类DataTransmit,用于ThreadLocal的数据传输,与MyThreadLocal本身提供的能力相隔离(SRP原则)。然后,将原先定义于MyTask的capture方法搬到了DataTransmit类内,提供capture的能力。此时,MyTask构造函数代码如下:

public MyTask(Runnable task) {
    this.task = task;
    threadLocalValues = MyThreadLocal.DataTransmit.capture();
}

我们将capture方法搬走之后,仍然还有replay、restore方法,仔细思考,它们都是对ThreadLocal的操作,放在MyThreadLocal.DataTransmit中更合适一些,使得内聚度更高

最终,MyThreadLocal代码如下:

public class MyThreadLocal<T> extends ThreadLocal<T> {

    private static MyThreadLocal<HashSet<MyThreadLocal<Object>>> holder =
            new MyThreadLocal<HashSet<MyThreadLocal<Object>>>() {
                @Override
                protected HashSet<MyThreadLocal<Object>> initialValue() {
                    return new HashSet<>();
                }
            };


    @Override
    public void set(T value) {
        super.set(value);
        addThisToHolder();
    }

    private void addThisToHolder() {
        if (!holder.get().contains(this)) {
            holder.get().add((MyThreadLocal<Object>) this);
        }
    }

    @Override
    public void remove() {
        super.remove();
        removeThisFromHolder();
    }

    private void removeThisFromHolder() {
        holder.get().remove(this);
    }

    public static class DataTransmit {

        public static Map<ThreadLocal<Object>, Object> capture() {
            Map<ThreadLocal<Object>, Object> threadLocalValues = new HashMap<>();
            for (MyThreadLocal<Object> threadLocal : holder.get()) {
                threadLocalValues.put(threadLocal, threadLocal.get());
            }
            return threadLocalValues;
        }

        // 重放,用快照去覆盖当前线程的ThreadLocalMap
        public static Object replay(Object obj) {
            Map<ThreadLocal<Object>, Object> threadLocalValues = (Map<ThreadLocal<Object>, Object>)obj;

            Map<ThreadLocal<Object>, Object> backup = new HashMap<>();
            for (ThreadLocal<Object> threadLocal : threadLocalValues.keySet()) {
                backup.put(threadLocal, threadLocal.get());
            }

            for (Map.Entry<ThreadLocal<Object>, Object> entry : threadLocalValues.entrySet()) {
                ThreadLocal<Object> threadLocal = entry.getKey();
                threadLocal.set(entry.getValue());
            }

            return backup;
        }

        public static void restore(Object obj) {
            Map<ThreadLocal<Object>, Object> backup = (Map<ThreadLocal<Object>, Object>) obj;

            for (Map.Entry<ThreadLocal<Object>, Object> entry : backup.entrySet()) {
                ThreadLocal<Object> threadLocal = entry.getKey();
                threadLocal.set(entry.getValue());
            }
        }
    }
}

MyTask经过精简后的代码如下:

public class MyTask implements Runnable {
    private Runnable task;
    private final Map<ThreadLocal<Object>, Object> threadLocalValues;

    public MyTask(Runnable task) {
        this.task = task;
        threadLocalValues = MyThreadLocal.DataTransmit.capture();
    }

    @Override
    public void run() {
        Object backup = MyThreadLocal.DataTransmit.replay(threadLocalValues);
        try {
            task.run();
        } finally {
            MyThreadLocal.DataTransmit.restore(backup);
        }
    }
}

使用代码如下:

ExecutorService executorService = Executors.newFixedThreadPool(1);
ThreadLocal<Object> tl1 = new MyThreadLocal<>();
ThreadLocal<Object> tl2 = new MyThreadLocal<>();


// ------------第一次调用 start -------------------
tl1.set("1111");
tl2.set("2222");

executorService.execute(new MyTask(() -> {
    // do biz
    System.out.println(tl1.get());
    System.out.println(tl2.get());
}));

tl1.remove();
tl2.remove();
// ------------第一次调用 end -------------------


// ------------第二次调用 start -------------------
tl1.set("3333");
tl2.set("4444");

executorService.execute(new MyTask(() -> {
    // do biz
    System.out.println(tl1.get());
    System.out.println(tl2.get());
}));

tl1.remove();
tl2.remove();
// ------------第二次调用 end -------------------

这样,业务代码就简洁了

如果还是觉得上述使用姿势有点麻烦:每次提交任务,都要构造一个MyTask,能不能连这一步都省去,变成跟规常的写法一致呢?

executorService.execute(() -> {
    // do biz
    System.out.println(tl1.get());
    System.out.println(tl2.get());
});

答案是肯定的,首先需要明确的一点是,要增强就意味着要代理;接着稍稍转变一下思路:既然不能对Task进行代理,那么我们对线程池进行代理增强,是不是也可以达到同样的效果?

代理套路相信大家很熟悉了:

public class MyExecutorService implements ExecutorService {
    private ExecutorService executorService;

    public MyExecutorService(ExecutorService executorService) {
        this.executorService = executorService;
    }

    @Override
    public void execute(Runnable command) {
        executorService.execute(new MyTask(command)); // 在内部进行Task的代理
    }
    // ...(省略)
}

使用:

ExecutorService executorService = new MyExecutorService(Executors.newFixedThreadPool(1));

这样,业务代码又精简了一步:只需要对线程池进行代理一次即可,后续提交任务不需要手动构建MyTask

最后,如果连对线程池的代理都感觉稍显麻烦,只想使用原生的姿势,那就要请出尚方宝剑:Java Agent,在应用启动之初对JDK代码进行修改,植入代理逻辑,如此业务代码不需要进行其它改动,就享有增强后的ExecutorService以及Task(敬请期待Java Agent 相关分享)

至此,关于父线程向池化的子线程(线程池)传递线程私有数据的方式我们讨论完毕。而上面讨论的思路、推理,全是来自于transmittable-thread-local(TTL),其github地址是: https://github.com/alibaba/transmittable-thread-local

有了上面的Demo版代码,再去看TTL的源码,相信会简单很多。笔者开始使用该框架时处于早期版本,而目前最新的版本是2.11.5,细节较之前使用的版本已经有较大的变化,但核心思想仍然没变,如果感觉目前版本阅读困难可以找最原始版本进行阅读

注:上面DEMO版的代码切不可用于商业用途,所有代码仅仅是本文论述需要,主要目的是阐述一整套核心思想,其中有很多细节被省略了,而正所谓魔鬼藏在细节里,transmittable-thread-local正是一款合格的处理了细节的成熟产品,若想在生产环境请使,请用它!

举几个TTL运用的典型场景:

  1. 分布式跟踪系统 或 全链路压测(即链路打标)
  2. 日志收集记录系统上下文
  3. SessionCache
  4. 应用容器或上层框架跨应用代码给下层SDK传递信息
小结
  1. 父线程向池化的子线程(线程池)传递线程私有数据的三大步骤: capture、replay、restore
  2. 可以通过代理的方式对被代理类进行增强,代理套路:实现被代理同相同接口、内部维护被理代实例、编写增强逻辑
  3. 软件工程概念:高内聚,低耦合
  4. 请记住SRP原则:Single Resonsibility Principle
  5. Java Agent技术可以在不修改Java源码的情况下对源码进行增强(降维打击,直接动字节码)

总结

本文先是提出了ThreadLocal的局限性:父线程无法通过ThreadLocal向子线程传递线程私有数据,并分享了解决方案InheritableThreadLocal及其原理;接着又提出了InheritableThreadLocal的局限性:父线程无法通过InheritableThreadLocal向池化的子线程(线程池)传递线程私有数据,并给出了终级解决方案transmittable-thread-local(TTL)。本文给出的终级解决方案并不是直接拿着TTL源码进行原理剖析,而是通过一步步推导的方式,最终向TTL解决方案逼近的过程,相信通过这个过程,大家也能一步步看清TTL面临什么样的问题,以及它是用怎样的思路去解决这些问题的,这些思路才是我们最大的收获。请记住:知识的迁移很重要

相关文章

网友评论

    本文标题:ThreadLocal系列之——父子线程传递线程私有数据(四)

    本文链接:https://www.haomeiwen.com/subject/fsdnektx.html