美文网首页
关于@Async异步执行及TransmittableThread

关于@Async异步执行及TransmittableThread

作者: 二哈_8fd0 | 来源:发表于2022-05-01 15:24 被阅读0次

    @Async 是spring提供的非常方便的异步执行的注解,非常方便,可以指定线程池执行,但是它不是动态代理实现,也就是和其它动态代理注解(例如@Transactional)放在一起会导致动态代理失效。因为spring在拿到 @Async注解后直接委托给 AnnotationAsyncExecutionInterceptor 来执行@Async目标方法,而不是执行代理方法会走层层动态代理。
    然后包装一个callable提交给TaskExecutor 来执行。
    我们不会具体讨论@Async和线程池以及threadLocal的具体实现,只跟随我们的使用场景涉及到的源码

    使用场景及问题
    // 定义个 ttl threadLocal用于存储一些信息
        private static final TransmittableThreadLocal<FlowContext> FLOW_CONTEXT = new TransmittableThreadLocal<>();
    //使用spring mvc 提供的HandlerInterceptor 接口的拦截能力做请求前放入threadLocal中一些信息 然后在 afterCompletion 时机清理掉
    //如下例子
        @Override
        public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
            String processInstanceIdStr = request.getHeader(METADATA_FLOW_PROCESS_INSTANCE_ID);
            String processIdStr = request.getHeader(METADATA_FLOW_FIRST_PROCESS_ID);
            try {
                Long processInstanceId = null;
                Long processId = null;
                if (StringUtils.isNotBlank(processInstanceIdStr)) {
                    processInstanceId = Long.valueOf(processInstanceIdStr);
                }
                if (StringUtils.isNotBlank(processIdStr)) {
                    processId = Long.valueOf(processIdStr);
                }
                doFill(processInstanceId, processId);
            } catch (Exception e) {
                FLOW_CONTEXT.remove();
            }
            return true;
        }
    
    
        @Override
        public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
            super.afterCompletion(request, response, handler, ex);
            FLOW_CONTEXT.remove();
        }
    
    

    如上代码非常简单的 通过header透传一些信息。在servlet线程周期管理 theadLocal信息。但是我们在请求进来后有一些异步操作也想要获取threadLocal信息如下

        @Async
        public void test(Long a, TransmittableThreadLocal<FlowContext> local) throws InterruptedException {
            Long processId = MetadataFlowContextHandler.getFirstProcessId();
            if (MetadataFlowContextHandler.getLocal() == local) {
                System.out.println("会进入到这里");
            }
            if (!Objects.equals(a, processId)) {
                System.out.println("有问题,除了第一次都不相等");
            }
        }
    

    在我们本地进行一次测试会发现ThreadLocal信息如预想般获取到了正确的值,但是如果你仔细测试,并发情况,或者你测试几下,然后等一会再测试就会出现错误的情况,那么下面列出了错误的情况和简略原因,然后分析一下源码

    影响因素
    1. 内存足够
    2. @Aync 线程池core线程数量都已经创建
    3. @Aync 线程池任务队列没有排满
    会出现theadLocal错误的情况
    1. 满足上述条件1,2,3新的请求进入就会错误
    2. 1不满足,满足2,3则可能会在子线程(也就是@Async)获取到null(未验证凭借猜想)
    原理就是因为主线程在第一次传递theadLocal对象的引用给子线程后放到当前线程的threadLocalMap中,后续子线程由于线程复用会在get时先通过当前线程对象去theadLocalMap中获取缓存的值,如果获取到直接返回,那么大部分时候会一直返回第一次主线程传递过来的引用。而主线程remove是不会传递的。

    为什么要满足上述3个影响因素,如果1不满足,jvm的gc会将theadLocalMap对象清理,因为他是一个弱引用 WeakReference,而TTL主线程传递给子线程时也是存入主线程的theadLocal对象weakHashMap返回,如果内存不足会清理掉后在子线程会调用ThreadLocal#setInitialValue方法委托到子类TransmittableThreadLocal#initialValue其实也是返回一个空的map就会获取失败(未验证),那么2,3也是这个道理,如果服务刚启动线程池可能会new新的thead那么主线程也一定传递正确的

    @Async的执行代码

    我们直接看ReflectiveMethodInvocation类进入拦截逻辑

        @Override
        @Nullable
        public Object proceed() throws Throwable {
            // We start with an index of -1 and increment early.
            if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
                return invokeJoinpoint();
            }
    // 这里会获取到一个AnnotationAsyncExecutionInterceptor,它不属于动态代理,在下面不会执行其它所有动态代理了,至于这个@Async的排序是不是最前面的index,如果是后面的index其实前面的动态代理也是可以执行的,这里不详细研究了
            Object interceptorOrInterceptionAdvice =
                    this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
            if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
                // Evaluate dynamic method matcher here: static part will already have
                // been evaluated and found to match.
                InterceptorAndDynamicMethodMatcher dm =
                        (InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
                Class<?> targetClass = (this.targetClass != null ? this.targetClass : this.method.getDeclaringClass());
    // 这里算是将动态代理的方法作为一个适配器去匹配
                if (dm.methodMatcher.matches(this.method, targetClass, this.arguments)) {
    // 如果匹配成功则会执行动态代理,而后又会执行到当前方法体内
                    return dm.interceptor.invoke(this);
                }
                else {
                    // Dynamic matching failed.
                    // Skip this interceptor and invoke the next in the chain.
                    return proceed();
                }
            }
            else {
                // It's an interceptor, so we just invoke it: The pointcut will have
                // been evaluated statically before this object was constructed.
    // 如果不是动态代理这里直接执行目标方法拦截器,那么不会重复进入当前方法体内了,其它的动态代理会失效
                return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
            }
        }
    

    下面是AnnotationAsyncExecutionInterceptor#invoke方法

        @Override
        @Nullable
        public Object invoke(final MethodInvocation invocation) throws Throwable {
            Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
            Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
            final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
    
            AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
    // 这里必须是 异步的
            if (executor == null) {
                throw new IllegalStateException(
                        "No executor specified and no default executor set on AsyncExecutionInterceptor either");
            }
    
            Callable<Object> task = () -> {
                try {
                    Object result = invocation.proceed();
                    if (result instanceof Future) {
                        return ((Future<?>) result).get();
                    }
                }
                catch (ExecutionException ex) {
                    handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
                }
                catch (Throwable ex) {
                    handleError(ex, userDeclaredMethod, invocation.getArguments());
                }
                return null;
            };
    // 提交到线程池执行
            return doSubmit(task, executor, invocation.getMethod().getReturnType());
        }
    

    下面是 doSubmit方法

        @Nullable
        protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
    // 这里根据返回值进行了封装,如果是CompletableFuture 则将这个callable 封装为CompletableFuture 返回给客户端自由操作
            if (CompletableFuture.class.isAssignableFrom(returnType)) {
                return CompletableFuture.supplyAsync(() -> {
                    try {
                        return task.call();
                    }
                    catch (Throwable ex) {
                        throw new CompletionException(ex);
                    }
                }, executor);
            }
    // 这是 spring提供的一个可以添加监听的Future,也就是将返回值设置为ListenableFuture的子类便可以添加一些监听例如异步方法成功后,或者抛出异常的后进行一些信息收集和逻辑判断日志打印之类
    // 例如 StompBrokerRelayMessageHandler#forward 方法,这是spring-messaging中的stomp协议的一个future监听实现
    
            else if (ListenableFuture.class.isAssignableFrom(returnType)) {
                return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
            }
    // 这是一个基础的 Future封装返回
            else if (Future.class.isAssignableFrom(returnType)) {
                return executor.submit(task);
            }
            else {
    // 其它返回值或者没有返回值的
                executor.submit(task);
                return null;
            }
        }
    

    再下面进入了线程池的逻辑,为什么要知道线程池的逻辑,因为影响了ttl传递threadLocal的逻辑,因为在子线程是new的情况下会将当前主线程的threadLocal的引用传递给异步的子线程,如果是复用时则什么也不会做!那为什么你在测试代码时已经复用的线程还是好用呢,因为子线程通过弱引用的threadLocalMap保存了第一次在new Thread时的主线程threadLocal信息,你换个信息的值再试试!
    /*
    *分三步进行:

    • 1。如果运行的线程少于corePoolSize,则尝试
      *以给定的命令作为第一个启动一个新线程
      *任务。对addWorker的调用自动地检查runState和
    • workerCount,这样可以防止添加错误警报
      *当它不应该返回false线程。
    • 2。如果一个任务可以成功排队,那么我们仍然需要
      *再次检查我们是否应该添加一个线程
      *(因为已经存在的线程在上次检查后已经死亡)
      *池在进入该方法后关闭。所以我们
      *重新检查状态,必要时回滚排队
      *停止,或启动一个新的线程,如果没有。
    • 3。如果不能对任务进行排队,则尝试添加一个新的
      *线程。如果失败,我们就知道系统关闭或饱和了
      *等拒绝该任务。
      */
      上面复制于源码的注释,具体大家可以百度其它的文章来学习,或者直接看源码ThreadPoolExecutor#execute(Runnable command)的代码
    ttl在子线程为new Thread时传递逻辑

    直接看Thread#init代码

      private void init(ThreadGroup g, Runnable target, String name,
                          long stackSize, AccessControlContext acc,
                          boolean inheritThreadLocals) {
          // 省略部分代码...
            this.group = g;
            this.daemon = parent.isDaemon();
            this.priority = parent.getPriority();
            if (security == null || isCCLOverridden(parent.getClass()))
                this.contextClassLoader = parent.getContextClassLoader();
            else
                this.contextClassLoader = parent.contextClassLoader;
            this.inheritedAccessControlContext =
                    acc != null ? acc : AccessController.getContext();
            this.target = target;
            setPriority(priority);
    // 这里判断主线程是否含有inheritableThreadLocals && 当前子线程是否可以传递线程私有变量
            if (inheritThreadLocals && parent.inheritableThreadLocals != null)
    // 子线程创建 threadLocal 并传入父线程的 threadLocalMap
                this.inheritableThreadLocals =
                  ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
            /* Stash the specified stack size in case the VM cares */
            this.stackSize = stackSize;
    
            /* Set thread ID */
            tid = nextThreadID();
        }
    

    下面会调用ThreadLocal#ThreadLocalMap(ThreadLocalMap parentMap) -> ThreadLocal#childValue(T parentValue)

    // 这是ThreadLocal 的代码,会直接报错不支持,只有InheritableThreadLocal及其子类支持,而TTL继承了InheritableThreadLocal类
    T childValue(T parentValue) {
            throw new UnsupportedOperationException();
        }
    // 而InheritableThreadLocal的实现如下直接返回主线程的值,虽然传递了但是客户端不容易拿到
      protected T childValue(T parentValue) {
            return parentValue;
        }
    // TransmittableThreadLocal 的实现是返回一个以TransmittableThreadLocal对象为key的weakHashMap,作为InheritableThreadLocal的增强保持了弱引用的语义及传入主线程值的引用,并且可以在子线程通过TTL的get时从这个weakHashMap直接获取到从主线程传递过来的引用
        private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder = new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
            protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
                return new WeakHashMap();
            }
    //这里看出TTL 不单单是将主线程的 threadLocal的值引用传递,并且将主线程的TransmittableThreadLocal 对象作为key传入到子线程的ThreadLocalMap中
            protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
                return new WeakHashMap(parentValue);
            }
        };
    
    下面时TTL 的get方法
       @Override
        public final T get() {
    // 调用父类 ThreadLocal的get
            T value = super.get();
    // 这里很重要,
            if (null != value) addValue();
            return value;
        }
    //  threadLocal 的get方法
        public T get() {
            Thread t = Thread.currentThread();
    // 获取当前线程的 ThreadLocalMap 
            ThreadLocalMap map = getMap(t);
            if (map != null) {
    // 这里的this其实是当前TransmittableThreadLocal 对象
                ThreadLocalMap.Entry e = map.getEntry(this);
                if (e != null) {
                    @SuppressWarnings("unchecked")
                    T result = (T)e.value;
                    return result;
                }
            }
    // 如果没有获取到值会初始化一下新的theadLocal中的对象,先会调用子类中的initialValue然后如果ThreadLocalMap没有被回收直接返回init的值并返回,如果被回收了会create新的map,实际TTL也只会new一个空的map返回
            return setInitialValue();
        }
         private Entry getEntry(ThreadLocal<?> key) {
                int i = key.threadLocalHashCode & (table.length - 1);
    // 通过threadLocal对象的hashCode从ThreadLocalMap获取到缓存的对象
                Entry e = table[i];
                if (e != null && e.get() == key)
                    return e;
                else
                    return getEntryAfterMiss(key, i, e);
            }
    

    我们尽量不讨论 TTL之外的代码,上述代码是一个标准的从ThreadLocal中get对象的流程,但是TTL的get有一个addValue的操作

    // 这里的逻辑是在TTL 传递childValue时的 map重新灌入的逻辑,目前还不知道为什么这样做,后续文章会仔细探讨
        private void addValue() {
            if (!holder.get().containsKey(this)) {
                holder.get().put(this, null); // WeakHashMap supports null value.
            }
        }
    

    如上述这些代码,虽然使用TTL在new Thread时将主线程的引用灌入了子线程中,并处理业务对象本身还放入了一个weakHashMap以threadLocal对象为key,但是在get时候并没有什么不同啊,我们通过测试发现了问题后debug这里也看不出什么猫腻,但是发现就是子线程一直获取第一次传递过来的对象引用,实际实现的逻辑也没有用到TTL重写的childValue方法中构造的map,而是直接使用了InheritableThreadLocal实现的业务对象的直接引用。然后看TTL的代码中内部类Transmitter有大量的复制,重放,还原的逻辑如下

           @Nonnull
            public static Object replay(@Nonnull Object captured) {
    // 这个方法一看就是在复制替换数据,实际就是在各种线程池的工作线程执行前的重放(替换threadLocal变量)具体逻辑后续文章探讨,那么看来这个动作在debug中一直没有执行,所以没有产生TTL线程私有变量的正确传递,我们看看是谁在调用它
                @SuppressWarnings("unchecked")
                Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
                Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();
                for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
                     iterator.hasNext(); ) {
                    Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
                    TransmittableThreadLocal<?> threadLocal = next.getKey();
                    // backup
                    backup.put(threadLocal, threadLocal.get());
                    // clear the TTL values that is not in captured
                    // avoid the extra TTL values after replay when run task
                    if (!capturedMap.containsKey(threadLocal)) {
                        iterator.remove();
                        threadLocal.superRemove();
                    }
                }
                // set values to captured TTL
                setTtlValuesTo(capturedMap);
                // call beforeExecute callback
                // 执行时机 为 目标方法执行前
                doExecuteCallback(true);
                return backup;
            }
    

    看看是谁在调用这个replay方法


    谁在调用replay

    我们进入一个Runnable的地方TtlRunnable类,看的出来是一个装饰器做了增强,然后对目标Runnable执行前执行后对threadLocal进行了重放,还原工作 如下 TtlRunnable

        @Override
        public void run() {
            Object captured = capturedRef.get();
            if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
                throw new IllegalStateException("TTL value reference is released after run!");
            }
            Object backup = replay(captured);
            try {
                runnable.run();
            } finally {
                restore(backup);
            }
        }
    

    那么继续,TtlRunnable又是谁搞的呢


    image.png

    原来是ExecutorServiceTtlWrapper 类,另一个先忽略一看就是和定时相关的。那么这是一个ExecutorService的装饰器,也是做了增强,目的是可以使用TtlRunnable这个增强再往下看


    image.png

    TtlExecutors 这个类有一堆静态方法,都是返回传入目标对象返回其装饰器的方法,那就是我们在构造ExecutorService线程池时可以直接使用这个类的返回装饰器应该就可以了

        @Bean(name = "taskExecutor")
        public ExecutorService threadPoolTaskExecutor() {
    // 返回装饰器
            return TtlExecutors.getTtlExecutorService(
                new ThreadPoolExecutor(50, 300, 300, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy()));
        }
    

    经过测试没有问题,嗐,原来是自己不知道TTL还需要结合线程池的装饰器来实现threadLocal的正确传递!菜是原罪啊T.T
    网上很多关于TTL的实现原理的讲解,我们后续也会通过这次经验来详细了解一下TTL的实现机制和设计思想

    TransmittableThreadLocal线程间传递逻辑 - 简书 (jianshu.com)

    相关文章

      网友评论

          本文标题:关于@Async异步执行及TransmittableThread

          本文链接:https://www.haomeiwen.com/subject/glpyyrtx.html