美文网首页
TransmittableThreadLocal源码解析

TransmittableThreadLocal源码解析

作者: 牧羊人刘俏 | 来源:发表于2021-01-11 16:13 被阅读0次

    上一篇我们分析InheritableThreadLocal可以解决父子线程之间的变量继承问题,但是对于线程池模式,是无能为力,所以ali开源了一个TransmittableThreadLocal来解决这个问题。
    在TransmittableThreadLocal定义了一个static的变量holder,这个hold封装了当前Thread中的所有的inheritableThreadLocals,等于说没有增加一层代理解决不了的事情,如下

     private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder =
                new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
                    @Override
                    protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
                        //在这个变量中封装了当前线程的所有的InheritableThreadLocal放到map里面,且为Weak引用,防止内存泄漏
                        return new WeakHashMap<TransmittableThreadLocal<?>, Object>();
                    }
    
                    @Override
                    protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
                        return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue);
                    }
                };
    

    然后呢又定义了一个Transmitter对holder进行了进一步的代理封装,其内部封装了几个方法,我们一个一个的看
    capture很简单,将此线程关联的所有的InheritableThreadLocal进行获取,也就是holder里面封装的信息

     public static Object capture() {
                Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>();
                for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) {
                    captured.put(threadLocal, threadLocal.copyValue());
                }
                return captured;
            }
    

    replay,重放的意思,上一步我们capture到所有的InheritableThreadLocal,我们可以把这个对象应用到当前线程,使得当前线程拥有capture封装的所有的InheritableThreadLocal的信息

     public static Object replay(@Nonnull Object captured) {
                @SuppressWarnings("unchecked")
                Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
                Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();
    
                for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
                     iterator.hasNext(); ) {
                    Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
                    TransmittableThreadLocal<?> threadLocal = next.getKey();
    
                    // backup 将此线程里面的InheritableThreadLocal信息先备份,典型的备忘录模式
                    backup.put(threadLocal, threadLocal.get());
    
                    // clear the TTL values that is not in captured
                    // avoid the extra TTL values after replay when run task
                    if (!capturedMap.containsKey(threadLocal)) {
                        iterator.remove();
                        threadLocal.superRemove();
                    }
                }
    
                // set values to captured TTL
                setTtlValuesTo(capturedMap);
    
                // call beforeExecute callback
                doExecuteCallback(true);
    
                return backup;
            }
    //将capturedMap封装的InheritableThreadLocal设置给当前的线程
     private static void setTtlValuesTo(@Nonnull Map<TransmittableThreadLocal<?>, Object> ttlValues) {
                for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : ttlValues.entrySet()) {
                    @SuppressWarnings("unchecked")
                    TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
                    threadLocal.set(entry.getValue());
                }
            }
    

    既然我们再之前backup了InheritableThreadLocal信息,那么线程之前完成之后,在还原现场,对应的method为restore。源码如下,很简单我们就不讲解了

    public static void restore(@Nonnull Object backup) {
                @SuppressWarnings("unchecked")
                Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup;
                // call afterExecute callback
                doExecuteCallback(false);
    
                for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
                     iterator.hasNext(); ) {
                    Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
                    TransmittableThreadLocal<?> threadLocal = next.getKey();
    
                    // clear the TTL values that is not in backup
                    // avoid the extra TTL values after restore
                    if (!backupMap.containsKey(threadLocal)) {
                        iterator.remove();
                        threadLocal.superRemove();
                    }
                }
    
                // restore TTL values
                setTtlValuesTo(backupMap);
            }
    

    针对TransmittableThreadLocal的对象,ali也封装了对应了,TtlRunnable和TtlCallable对象,在new TtlRunnable的时候,也会把父线程的所有InheritableThreadLocal信息全部的传进去,

        private TtlRunnable(@Nonnull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
            this.capturedRef = new AtomicReference<Object>(capture());
            this.runnable = runnable;
            this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
        }
    

    当然要用到ali提供的ExecutorTtlWrapper线程池,将Executor包了一下,源码如下

    class ExecutorTtlWrapper implements Executor, TtlEnhanced {
        private final Executor executor;
    
        ExecutorTtlWrapper(@Nonnull Executor executor) {
            this.executor = executor;
        }
       //重点在这里,将Runnable封装成一个TtlRunnable,而TtlRunnable封装了父线程的InheritableThreadLocal信息。
        @Override
        public void execute(@Nonnull Runnable command) {
            executor.execute(TtlRunnable.get(command));
        }
    
        @Nonnull
        public Executor unwrap() {
            return executor;
        }
    }
    

    而TtlRunnable在run的前后对当前线程进行了replay 和 restore
    如下

    @Override
        public void run() {
        //得到主线程里面的所有的InheritableThreadLocal信息
            Object captured = capturedRef.get();
            if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
                throw new IllegalStateException("TTL value reference is released after run!");
            }
            //将主线程的InheritableThreadLocal信息应用到当前线程
            Object backup = replay(captured);
            try {
                runnable.run();
            } finally {
               //还原当前线程的InheritableThreadLocal信息
                restore(backup);
            }
        }
    

    相关文章

      网友评论

          本文标题:TransmittableThreadLocal源码解析

          本文链接:https://www.haomeiwen.com/subject/thydaktx.html