美文网首页Java
ThreadLocal系列之——父子线程传递线程私有数据(四)

ThreadLocal系列之——父子线程传递线程私有数据(四)

作者: ZX_周雄 | 来源:发表于2020-09-13 10:23 被阅读0次

    前情回顾

    前文,介绍了ThreadLocal作者们(Josh Bloch and Doug Lea)为内存泄露做的努力,将内存泄露造成的影响降到了最低,且着重分享了软件设计上的一个Trade Off:如何权衡内存占用与CPU占用之间的关系,该折中思想与Redis的过期淘汰策略一致(知识的迁移)

    本文,将会接着分享ThreadLocal的其他局限性,并给出相应的解决方案

    局限性

    局限性一:父线程无法通过ThreadLocal向子线程传递线程私有数据

    首先,通过本系列第二篇文章的学习,我们可以肯定的是:ThreadLocal本意上就是线程私有的数据(从命名上也可以看出来),每个线程维护着自己的一份副本,线程与线程之间数据隔离。父线程若要向子线程传递线程私有数据,肯定是不能通过ThreadLocal自己来实现,否则也就不能叫ThreadLocal

    那,我们会有父线程向子线程传递线程私有数据的需求吗?

    答案是肯定的,通过本系列第一篇文章的学习,我们也知道不少场景中会用到ThreadLocal来传递数据,但请注意,从请求开始到请求结束使用的都是使用的同一个线程,并没有跨线程传递。假设请求进来的时候是由线程A处理,且将后续流程节点需要使用到的信息放到ThreadLocal里,那么在后续处理节点中,由于性能原因或者是其它原因,需要将一些任务异步执行,就要另外开启线程B(由于B线程是在A线程执行过程中实例化,因此我们称B为A的子线程),但此时线程B无法通过ThreadLocal获取线程A在请求开始处放进去的信息。因此问题就产生了:父线程A需要向子线程B传递线程私有数据,但是TheadLocal做不到

    示例代码如下:

    ThreadLocal<String> tl = new ThreadLocal<>();
    tl.set("foo");
    
    Thread b = new Thread(() -> {
        String str = tl.get();
        // 企图拿到父线程放进去的foo,然而失败了!
        System.out.println(str);
    });
    
    b.start();
    
    // 子线程B启动后,父线程A休息3秒,确保子线程执行完毕
    TimeUnit.SECONDS.sleep(3);
    
    // 清理
    tl.remove();
    

    结果:子线程B打印null,示意图如下:

    image
    解决方案:使用InheritableThreadLocal

    InheritableThreadLocal是ThreadLocal的子类,它就是用来解决父线程向子线程传递线程私有数据问题的。示例代码只改第一行ThreadLocal<String> tl = new ThreadLocal<>();改为ThreadLocal<String> tl = new InheritableThreadLocal<>();,其它不变,如下示:

    ThreadLocal<String> tl = new InheritableThreadLocal<>();
    tl.set("foo");
    
    Thread b = new Thread(() -> {
        String str = tl.get();
        // 企图拿到父线程放进去的foo,成功了!因为tl是InheritableThreadLocal
        System.out.println(str);
    });
    
    b.start();
    
    // 子线程B启动后,父线程A休息3秒,确保子线程执行完毕
    TimeUnit.SECONDS.sleep(3);
    
    // 清理
    tl.remove();
    

    结果:子线程B打印foo,示意图如下:

    image

    可以看到,使用InheritableThreadLocal后,子线程能够通过tl获取父线程中set的值,实现了父线程向子线程传递线程私有数据的能力

    这背后的原理是什么呢?

    首先,Thread类有两个ThreadLocalMap类型的成员变量:threadLocals、inheritableThreadLocals,ThreadLocal使用的是threadLocals,而InheritableThreadLocal使用的是inheritableThreadLocals

    /* ThreadLocal values pertaining to this thread. This map is maintained
     * by the ThreadLocal class. */
    ThreadLocal.ThreadLocalMap threadLocals = null;
    
    /*
     * InheritableThreadLocal values pertaining to this thread. This map is
     * maintained by the InheritableThreadLocal class.
     */
    ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
    
    

    执行tl.set("foo");时,对应的逻辑如下:

    // java.lang.ThreadLocal#set
    public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }
    

    由于是第一次执行,map为空,走createMap的逻辑,而不同的tl实现类(ThreadLocal、InheritableThreadLocal)有着不同的创建map逻辑

    image

    ThreadLocal的如下:

    void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocalMap(this, firstValue);
    }
    

    InheritableThreadLocal的如下:

    void createMap(Thread t, T firstValue) {
        t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
    }
    

    很轻易能够看出区别:ThreadLocal是给当前线程的threadLocals变量赋值,而InheritableThreadLocal则是给当前线程的inheritableThreadLocals变量赋值

    其次,在创建线程实例,即new Thread()时,父线程使用自己的inheritableThreadLocals通过调用createInheritedMap方法来构造子线程的ThreadLocalMap并赋值给子线程的inheritableThreadLocals

    // java.lang.Thread#init
    
    Thread parent = currentThread();
    
    if (inheritThreadLocals && parent.inheritableThreadLocals != null)
        this.inheritableThreadLocals =
            ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
    
    
    static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
        return new ThreadLocalMap(parentMap);
    }
    
    private ThreadLocalMap(ThreadLocalMap parentMap) {
        // 父线程的Entry
        Entry[] parentTable = parentMap.table;
        int len = parentTable.length;
        setThreshold(len);
        table = new Entry[len];
    
        // 依次"拷贝"到子线程的Entry里
        for (int j = 0; j < len; j++) {
            Entry e = parentTable[j];
            if (e != null) {
                @SuppressWarnings("unchecked")
                ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
                if (key != null) {
                    Object value = key.childValue(e.value); // 注意这一行代码
                    Entry c = new Entry(key, value);
                    int h = key.threadLocalHashCode & (len - 1);
                    while (table[h] != null)
                        h = nextIndex(h, len);
                    table[h] = c;
                    size++;
                }
            }
        }
    }
    
    

    注意上边标注的一行:Object value = key.childValue(e.value);,方法childValue是一个可重载方法,对于InheritableThreadLocal而言,直接返回了传进来的值:

    protected T childValue(T parentValue) {
        return parentValue;
    }
    
    

    这意味着是:浅拷贝,即默认情况下,父线程传递给子线程的线程私有数据是一份浅拷贝,若拷贝指向个不可变对象,这不是什么问题;若指向一个引用,那尤其需要注意,因为不再有线程封闭,父子线程中任意的变更均会影响到对方。我们说它是可重载方法,意味着若要实现"Deep Copy",需要自行继承InheritableThreadLocal类并重写childValue方法

    小结一下解决方案背后的原理:使用InheritableThreadLocal,会将线程私有数据存储在inheritableThreadLocals指向的ThreadLocalMap中;在构造子线程时,将当前线程inheritableThreadLocals里的数据(ThreadLocalMap)"拷贝"给子线程的ThreadLocalMap,子线程因此可以通过tl.get()取到数据,如此便实现了父线程向子线程传递线程私有数据

    image
    局限性二:父线程无法通过InheritableThreadLocal向池化的子线程(线程池)传递线程私有数据

    在日常开发过程中,由于构造与销毁子线程开销大,因此每次在业务代码中重新构造一个子线程的方式并不常见,更常见的方式是将线程池化(线程池),由线程池的调度策略决定线程们如何执行提交给池中的任务,避免了重复构造与销毁线程的开销。上面我们提到,InheritableThreadLocal可以解决父线程向子线程传递线程私有数据的问题,但一旦子线程池化之后,InheritableThreadLocal也将不再起作用

    看下面一段示例代码:

    ExecutorService executorService = Executors.newFixedThreadPool(1);
    ThreadLocal<String> tl = new InheritableThreadLocal<>();
    
    // ------------第一次调用 start -------------------
    tl.set("foo");
    executorService.execute(() -> {
        String str = tl.get();
        // 企图拿到父线程放进去的foo,成功了!
        System.out.println(str);
    });
    // 父线程A休息3秒,确保提交给线程池的任务执行完毕
    TimeUnit.SECONDS.sleep(3);
    // 清理
    tl.remove();
    // ------------第一次调用 end -------------------
    
    
    
    // ------------第二次调用 start -------------------
    tl.set("bar");
    executorService.execute(() -> {
        String str = tl.get();
        // 企图拿到父线程放进去的bar,失败了!
        System.out.println(str);
    });
    // 父线程A休息3秒,确保提交给线程池的任务执行完毕
    TimeUnit.SECONDS.sleep(3);
    // 清理
    tl.remove();
    // ------------第二次调用 end -------------------
    
    

    如上,仍使用InheritableThreadLocal,且构造了只有1个线程的线程池,然后模拟两次外部调用:第一次在父线程中将tl赋值为foo,然后子线程中获取,使用完毕之后remove;第二次在父线程中将tl赋值为bar,然后子线程中获取,使用完毕之后remove

    我们期望的结果是: foo bar

    然而实际的结果却是: foo foo

    我们得到了与期望不符的结果,原因很简单,在局限性一解决方案原理中已经阐述:子线程的ThreadLocalMap数据是在创建线程的那一刻从父线程中"拷贝"而来,此后再也没有促使其变化的地方,而子线程由于池化复用的缘故,ThreadLocalMap一直持有的是线程创建时刻的数据(即foo),此后无论进行多少次方法调用,在(池化)子线程中通过tl.get()取出来的永远是foo

    解决方案:使用transmittable-thread-local(alibaba)

    暂不提开源框架是如何解决这个问题的,我们先自己推导,进而得出与开源框架一致的结论,帮助大家理好地理解解决方案背后的原理

    一个基本认知:无论执行execute或者submit方法,向线程池提交的是任务(task)

    image image

    仔细推敲琢磨一下,我们需要的并不是创建线程的那一刻父线程的ThreadLocal值,而是提交任务时父线程的ThreadLocal值。或者换种表述方式,需要把任务提交给线程池时ThreadLocal值传递到任务执行时

    具体思路是:父线程把任务提交给线程池时一同附上此刻自己的ThreadLocalMap,包装在task里,待线程池中某个线程执行到该任务时,用task里的ThreadLocalMap赋盖当前线程ThreadLocalMap,这样就完成了父线程向池化的子线程传递线程私有数据的目标。为了避免数据污染,待任务执行完后,线程归还回线程池之前,还需要还原ThreadLocalMap,如下示:

    image

    上面的步骤一共有6步,其中2、4、6是线程池本身的提供的能力,不需要改动,只有1、3、5有所不同,我们逐一剖析

    第1步疑问:提交Task的时候如何将ThreadLocalMap一同提交上去?此处难点在于如何获取当前线程的ThreadLocalMap

    本系列第二篇文章中提到,ThreadLocalMap是线程私有变量,只会被ThreadLocal维护,对于外部类而言是不可见的,因此要操作ThreadLocalMap就得通过ThreadLocal(操作ThreadLocal本质上是在操作当前线程的ThreadLocalMap)

    首先自定义Task,用于包装维护父线程ThreadLocalMap

    public class MyTask implements Runnable {
            // key是ThreadLocal,value是对应父线程的线程私有数据
        private final Map<ThreadLocal<Object>, Object> threadLocalValues;
    
        public MyTask(ThreadLocal<Object>... threadLocals) {
            this.threadLocalValues = new HashMap<>(threadLocals.length);
            capture(threadLocals);
        }
    
        private void capture(ThreadLocal<Object>[] threadLocals) {
            for (ThreadLocal<Object> threadLocal : threadLocals) {
                threadLocalValues.put(threadLocal, threadLocal.get());
            }
        }
    
        @Override
        public void run() {
            // todo
        }
    }
    
    

    使用时:

    ThreadLocal<Object> tl1 = new ThreadLocal<>();
    tl1.set("111111");
    
    ThreadLocal<Object> tl2 = new ThreadLocal<>();
    tl2.set("222222");
    
    
    ExecutorService executorService = Executors.newFixedThreadPool(1);
    // 将父线程的ThreadLocal传进去,就能取到相应的值
    executorService.execute(new MyTask(tl1, tl2));
    
    tl1.remove();
    tl2.remove();
    
    

    如此这般,提交到线程池的MyTask就包含了父线程的ThreadLoalMap数据。我们把"拷贝"父线程TheadLocalMap的行为称为capture(拍照),一个很生动形象的词:将父线程提交任务时刻的ThreadLocal值拍个快照并保存起来,后续使用

    第3步疑问:如何用父线程的ThreadLocalMap覆盖当前执行任务线程的ThreadLocalMap?

    我们可以想像到,代码正执行到MyTask#run()方法,在该方法内部,能感知的上下文环境是正执行该方法的线程,以及MyTask维护的threadLocalValues(快照),除此之外,它获取不了任何外界信息--> 这称之为线程封闭

    因此可以比较自然地推理出,是要用MyTask的threadLocalValues(快照)去覆盖当前线程的ThreadLocalMap。我们称这个动作为replay(重放)

    public class MyTask implements Runnable {
       // ...(省略)
    
        @Override
        public void run() {
            replay();
            // do biz
        }
    
        // 重放,用快照去覆盖当前线程的ThreadLocalMap
        private void replay() {
            for (Map.Entry<ThreadLocal<Object>, Object> entry : threadLocalValues.entrySet()) {
                ThreadLocal<Object> threadLocal = entry.getKey();
                threadLocal.set(entry.getValue());
            }
        }
    }
    
    

    第5步疑问:如何把当前线程的ThreadLocalMap还原?

    Task业务逻辑执行完之后,毫无疑问需要将ThreadLocalMap还原,否则可能产生数据污染的风险

    能够还原的前提是,用快照去覆盖当前线程的ThreadLocalMap之前,先将当前的ThreadLocal值保存起来,因此,修改代码如下:

    private Object replay() {
            // 保存当前的ThreadLocal值
        Map<ThreadLocal<Object>, Object> backup = new HashMap<>();
        for (ThreadLocal<Object> threadLocal : threadLocalValues.keySet()) {
            backup.put(threadLocal, threadLocal.get());
        }
    
        for (Map.Entry<ThreadLocal<Object>, Object> entry : threadLocalValues.entrySet()) {
            ThreadLocal<Object> threadLocal = entry.getKey();
            threadLocal.set(entry.getValue());
        }
    
        return backup;
    }
    
    

    业务代码执行完之后,进行restore(还原):

    public class MyTask implements Runnable {
        // ...(省略)
        
        @Override
        public void run() {
            Object backup = replay();
            try {
                // do biz
            } finally {
                restore(backup);
            }
        }
            
            // 还原
        private void restore(Object obj) {
            Map<ThreadLocal<Object>, Object> backup = (Map<ThreadLocal<Object>, Object>) obj;
    
            for (Map.Entry<ThreadLocal<Object>, Object> entry : backup.entrySet()) {
                ThreadLocal<Object> threadLocal = entry.getKey();
                threadLocal.set(entry.getValue());
            }
        }
    }
    
    

    经过上面的1、3、5步分析,我们已经把所有关键问题都分析完毕,因此,做一个小实验看看结果,测试代码:

    ExecutorService executorService = Executors.newFixedThreadPool(1);
    ThreadLocal<Object> tl1 = new ThreadLocal<>();
    ThreadLocal<Object> tl2 = new ThreadLocal<>();
    
    
    // ------------第一次调用 start -------------------
    tl1.set("1111");
    tl2.set("2222");
    
    executorService.execute(new MyTask(tl1, tl2));
    
    tl1.remove();
    tl2.remove();
    // ------------第一次调用 end -------------------
    
    
    // ------------第二次调用 start -------------------
    tl1.set("3333");
    tl2.set("4444");
    
    executorService.execute(new MyTask(tl1, tl2));
    
    tl1.remove();
    tl2.remove();
    // ------------第二次调用 end -------------------
    
    
    public void run() {
        Object backup = replay();
        try {
            // do biz
            doBiz();
        } finally {
            restore(backup);
        }
    }
    
    private void doBiz() {
            // 打印父线程提交任务时的ThreadLocal值
        Set<ThreadLocal<Object>> threadLocals = threadLocalValues.keySet();
        for (ThreadLocal<Object> threadLocal : threadLocals) {
            System.out.println(threadLocal.get());
        }
    }
    
    

    测试代码调用了分为两次调用:第一次分别在父线程中给tl1、tl2赋值1111、2222,然后提交任务;第二次分别给tl1、tl2赋值为3333、4444;MyTask执行的逻辑是打印父线程提交任务时的ThreadLocal值

    预期输出: 1111 2222 3333 4444

    实际输出: 1111 2222 3333 4444

    image

    如此,证明该思路是可行的

    但是这样还不够,稍稍有些经验的朋友应该能感受到,上面的写法仅是达到了"可用"的程度,离"好用"还有一段距离:业务在向线程池提交任务的时候,需要每次都构建自定义的Task,并将ThreadLocal的引用传入,而且Task糅进了TheadLocal管理的逻辑,这样其实形成了"业务侵入性",没有做到与业务解耦,这样的代码是不可维护的

    TheadLocal管理的逻辑,业务代码不应该关心,因此为了与业务解耦,容易想到的一种解决方案是:代理。通过代理可以实现对被代理类的逻辑增强,并将通用的非业务逻辑与业务代码隔离开来(设计模式萦绕心头)

    提及代理,熟悉的同学对于套路了然于心:定义代理类,实现与被代理类相同的接口,并在内部维护被代理类的实例,之后就可以对被代理的方法实现额外逻辑,来增强被代理类,代码变更后如下:

    public class MyTask implements Runnable {
        private Runnable task;
        private final Map<ThreadLocal<Object>, Object> threadLocalValues;
    
        public MyTask(Runnable task, ThreadLocal<Object>... threadLocals) {
            this.task = task;
            this.threadLocalValues = new HashMap<>(threadLocals.length);
            capture(threadLocals);
        }
    
        // ...(省略)
    
        @Override
        public void run() {
            Object backup = replay(); // 增强的逻辑
            try {
                task.run(); // 执行业务代码
            } finally {
                restore(backup); // 增强的逻辑
            }
        }
    }
    
    

    使用方式:

    executorService.execute(new MyTask(() -> {
        // do biz
    }, tl1, tl2));
    
    

    这样提交任务时,业务开发只需要关心()->{// do biz} 业务逻辑,而不需要关心MyTask代理类如何实现的增强逻辑,做到了与业务代码的解耦

    但是,上面的代码书写起来仍然有些别扭:需要业务方主动传递tl1、tl2,说明业务方仍然需要有一定程度的参与,那能不能更彻底一些,连传递tl1、tl2的行为都省去呢?答案是肯定的

    如果要避免父线程"主动"传递ThreadLocal的行为,那么就必须要知道父线程往ThreadLocalMap放数据这件事,并且在事件发生的时候将ThreadLocal引用保存下来;同时,如果父线程调用ThreadLocal#remove方法清除数据,也需要将保存下来的ThreadLocal引用一同清除掉

    因此,需要自定义ThreadLocal类:

    public class MyThreadLocal<T> extends ThreadLocal<T> {
    
        private static MyThreadLocal<HashSet<MyThreadLocal<Object>>> holder =
                new MyThreadLocal<HashSet<MyThreadLocal<Object>>>() {
                    @Override
                    protected HashSet<MyThreadLocal<Object>> initialValue() {
                        return new HashSet<>();
                    }
                };
    
        // 重写set方法
        @Override
        public void set(T value) {
            super.set(value);
            addThisToHolder();
        }
            
            // 将ThreadLocal引用记录下来
        private void addThisToHolder() {
            if (!holder.get().contains(this)) {
                holder.get().add((MyThreadLocal<Object>) this);
            }
        }
        
        // 重写remove方法
        @Override
        public void remove() {
            super.remove();
            removeThisFromHolder();
        }
        
            // 将ThreadLocal引用记录移除
        private void removeThisFromHolder() {
            holder.get().remove(this);
        }
    }
    
    

    重写set方法,在ThreadLocal#set方法执行时将ThreadLocal引用记录下来,保存在类成员变量holder中;重写remove方法,在ThreadLocal#remove方法执行时将ThreadLocal引用一并移除

    接下来,父线程向线程池提交Task,不再传递ThreadLocal的引用,那又怎么完成capture的动作呢?(如果看官们忘记了,请往上翻,前文在执行capture方法时,入参是ThreadLocal引用数组)

    既然我们将ThreadLocal的引用保存在MyThreadLocal#holder这个静态变量中,那我们想办法暴露holder,不就可以得到capture需要的入参了吗?

    image

    这种思路诚然是可行的,但从面向对象设计角度而言却不是最优的:holder是MyThreadLocal的静态成员变量,维护的数据是ThreadLocal集合,它不应该将自身数据暴露出去,而是遵循高内聚的设计原则,提供数据操作的能力(方法),例如提供capture的能力,但操作本身需要维护在类内部。因此应该是MyThreadLocal提供capture的能力,然后由需求方(MyTask)进行调用。代码如下:

    public class MyThreadLocal<T> extends ThreadLocal<T> {
    
        // ...(省略)
    
        public static class DataTransmit {
    
            public static Map<ThreadLocal<Object>, Object> capture() {
                Map<ThreadLocal<Object>, Object> threadLocalValues = new HashMap<>();
                for (MyThreadLocal<Object> threadLocal : holder.get()) {
                    threadLocalValues.put(threadLocal, threadLocal.get());
                }
                return threadLocalValues;
            }
        }
    }
    
    

    此处,我在MyThreadLocal中定义了一个内部类DataTransmit,用于ThreadLocal的数据传输,与MyThreadLocal本身提供的能力相隔离(SRP原则)。然后,将原先定义于MyTask的capture方法搬到了DataTransmit类内,提供capture的能力。此时,MyTask构造函数代码如下:

    public MyTask(Runnable task) {
        this.task = task;
        threadLocalValues = MyThreadLocal.DataTransmit.capture();
    }
    
    

    我们将capture方法搬走之后,仍然还有replay、restore方法,仔细思考,它们都是对ThreadLocal的操作,放在MyThreadLocal.DataTransmit中更合适一些,使得内聚度更高

    最终,MyThreadLocal代码如下:

    public class MyThreadLocal<T> extends ThreadLocal<T> {
    
        private static MyThreadLocal<HashSet<MyThreadLocal<Object>>> holder =
                new MyThreadLocal<HashSet<MyThreadLocal<Object>>>() {
                    @Override
                    protected HashSet<MyThreadLocal<Object>> initialValue() {
                        return new HashSet<>();
                    }
                };
    
    
        @Override
        public void set(T value) {
            super.set(value);
            addThisToHolder();
        }
    
        private void addThisToHolder() {
            if (!holder.get().contains(this)) {
                holder.get().add((MyThreadLocal<Object>) this);
            }
        }
    
        @Override
        public void remove() {
            super.remove();
            removeThisFromHolder();
        }
    
        private void removeThisFromHolder() {
            holder.get().remove(this);
        }
    
        public static class DataTransmit {
    
            public static Map<ThreadLocal<Object>, Object> capture() {
                Map<ThreadLocal<Object>, Object> threadLocalValues = new HashMap<>();
                for (MyThreadLocal<Object> threadLocal : holder.get()) {
                    threadLocalValues.put(threadLocal, threadLocal.get());
                }
                return threadLocalValues;
            }
    
            // 重放,用快照去覆盖当前线程的ThreadLocalMap
            public static Object replay(Object obj) {
                Map<ThreadLocal<Object>, Object> threadLocalValues = (Map<ThreadLocal<Object>, Object>)obj;
    
                Map<ThreadLocal<Object>, Object> backup = new HashMap<>();
                for (ThreadLocal<Object> threadLocal : threadLocalValues.keySet()) {
                    backup.put(threadLocal, threadLocal.get());
                }
    
                for (Map.Entry<ThreadLocal<Object>, Object> entry : threadLocalValues.entrySet()) {
                    ThreadLocal<Object> threadLocal = entry.getKey();
                    threadLocal.set(entry.getValue());
                }
    
                return backup;
            }
    
            public static void restore(Object obj) {
                Map<ThreadLocal<Object>, Object> backup = (Map<ThreadLocal<Object>, Object>) obj;
    
                for (Map.Entry<ThreadLocal<Object>, Object> entry : backup.entrySet()) {
                    ThreadLocal<Object> threadLocal = entry.getKey();
                    threadLocal.set(entry.getValue());
                }
            }
        }
    }
    
    

    MyTask经过精简后的代码如下:

    public class MyTask implements Runnable {
        private Runnable task;
        private final Map<ThreadLocal<Object>, Object> threadLocalValues;
    
        public MyTask(Runnable task) {
            this.task = task;
            threadLocalValues = MyThreadLocal.DataTransmit.capture();
        }
    
        @Override
        public void run() {
            Object backup = MyThreadLocal.DataTransmit.replay(threadLocalValues);
            try {
                task.run();
            } finally {
                MyThreadLocal.DataTransmit.restore(backup);
            }
        }
    }
    
    

    使用代码如下:

    ExecutorService executorService = Executors.newFixedThreadPool(1);
    ThreadLocal<Object> tl1 = new MyThreadLocal<>();
    ThreadLocal<Object> tl2 = new MyThreadLocal<>();
    
    
    // ------------第一次调用 start -------------------
    tl1.set("1111");
    tl2.set("2222");
    
    executorService.execute(new MyTask(() -> {
        // do biz
        System.out.println(tl1.get());
        System.out.println(tl2.get());
    }));
    
    tl1.remove();
    tl2.remove();
    // ------------第一次调用 end -------------------
    
    
    // ------------第二次调用 start -------------------
    tl1.set("3333");
    tl2.set("4444");
    
    executorService.execute(new MyTask(() -> {
        // do biz
        System.out.println(tl1.get());
        System.out.println(tl2.get());
    }));
    
    tl1.remove();
    tl2.remove();
    // ------------第二次调用 end -------------------
    
    

    这样,业务代码就简洁了

    如果还是觉得上述使用姿势有点麻烦:每次提交任务,都要构造一个MyTask,能不能连这一步都省去,变成跟规常的写法一致呢?

    executorService.execute(() -> {
        // do biz
        System.out.println(tl1.get());
        System.out.println(tl2.get());
    });
    
    

    答案是肯定的,首先需要明确的一点是,要增强就意味着要代理;接着稍稍转变一下思路:既然不能对Task进行代理,那么我们对线程池进行代理增强,是不是也可以达到同样的效果?

    代理套路相信大家很熟悉了:

    public class MyExecutorService implements ExecutorService {
        private ExecutorService executorService;
    
        public MyExecutorService(ExecutorService executorService) {
            this.executorService = executorService;
        }
    
        @Override
        public void execute(Runnable command) {
            executorService.execute(new MyTask(command)); // 在内部进行Task的代理
        }
        // ...(省略)
    }
    
    

    使用:

    ExecutorService executorService = new MyExecutorService(Executors.newFixedThreadPool(1));
    
    

    这样,业务代码又精简了一步:只需要对线程池进行代理一次即可,后续提交任务不需要手动构建MyTask

    最后,如果连对线程池的代理都感觉稍显麻烦,只想使用原生的姿势,那就要请出尚方宝剑:Java Agent,在应用启动之初对JDK代码进行修改,植入代理逻辑,如此业务代码不需要进行其它改动,就享有增强后的ExecutorService以及Task(敬请期待Java Agent 相关分享)

    至此,关于父线程向池化的子线程(线程池)传递线程私有数据的方式我们讨论完毕。而上面讨论的思路、推理,全是来自于transmittable-thread-local(TTL),其github地址是: https://github.com/alibaba/transmittable-thread-local

    有了上面的Demo版代码,再去看TTL的源码,相信会简单很多。笔者开始使用该框架时处于早期版本,而目前最新的版本是2.11.5,细节较之前使用的版本已经有较大的变化,但核心思想仍然没变,如果感觉目前版本阅读困难可以找最原始版本进行阅读

    注:上面DEMO版的代码切不可用于商业用途,所有代码仅仅是本文论述需要,主要目的是阐述一整套核心思想,其中有很多细节被省略了,而正所谓魔鬼藏在细节里,transmittable-thread-local正是一款合格的处理了细节的成熟产品,若想在生产环境请使,请用它!

    举几个TTL运用的典型场景:

    1. 分布式跟踪系统 或 全链路压测(即链路打标)
    2. 日志收集记录系统上下文
    3. SessionCache
    4. 应用容器或上层框架跨应用代码给下层SDK传递信息
    小结
    1. 父线程向池化的子线程(线程池)传递线程私有数据的三大步骤: capture、replay、restore
    2. 可以通过代理的方式对被代理类进行增强,代理套路:实现被代理同相同接口、内部维护被理代实例、编写增强逻辑
    3. 软件工程概念:高内聚,低耦合
    4. 请记住SRP原则:Single Resonsibility Principle
    5. Java Agent技术可以在不修改Java源码的情况下对源码进行增强(降维打击,直接动字节码)

    总结

    本文先是提出了ThreadLocal的局限性:父线程无法通过ThreadLocal向子线程传递线程私有数据,并分享了解决方案InheritableThreadLocal及其原理;接着又提出了InheritableThreadLocal的局限性:父线程无法通过InheritableThreadLocal向池化的子线程(线程池)传递线程私有数据,并给出了终级解决方案transmittable-thread-local(TTL)。本文给出的终级解决方案并不是直接拿着TTL源码进行原理剖析,而是通过一步步推导的方式,最终向TTL解决方案逼近的过程,相信通过这个过程,大家也能一步步看清TTL面临什么样的问题,以及它是用怎样的思路去解决这些问题的,这些思路才是我们最大的收获。请记住:知识的迁移很重要

    相关文章

      网友评论

        本文标题:ThreadLocal系列之——父子线程传递线程私有数据(四)

        本文链接:https://www.haomeiwen.com/subject/fsdnektx.html