美文网首页Project Reactor
Reactor异步编程中设置跨线程上下文

Reactor异步编程中设置跨线程上下文

作者: mxwgong | 来源:发表于2020-06-25 17:32 被阅读0次

    在Reactor编程中,IO的请求线程和响应线程往往是不同的,这个机制会导致Java中ThreadLocal失效,也意味着用ThreadLocal来保存上下文的方案失效,例如日志框架中的DMC。

    一种解决方案是,我们利用Reactor Context保存上下文,用Reactor Global Hooks拦截代理每一个Operator,在可能发生线程切换的地方,将上下文信息赋值到新线程的ThreadLocal中。


    设置MDC示例代码

    添加全局Hook
    @Component
    @SuppressWarnings("all")
    public final class Hooks {
    
      private Function<? super Publisher<Object>, ? extends Publisher<Object>> mdcHook = Operators.lift((scannable, coreSubscriber) -> new CoreSubscriber() {
        @Override
        public void onSubscribe(Subscription s) {
          coreSubscriber.onSubscribe(s);
        }
    
        @Override
        public void onNext(Object o) {
          Context context = coreSubscriber.currentContext();
          if (!context.isEmpty()) {
            MDC.setContextMap(context.stream().collect(Collectors.toMap(entry -> entry.getKey().toString(), entry -> entry.getValue().toString())));
          } else {
            MDC.clear();
          }
          coreSubscriber.onNext(o);
        }
    
        @Override
        public void onError(Throwable throwable) {
          coreSubscriber.onError(throwable);
        }
    
        @Override
        public void onComplete() {
          coreSubscriber.onComplete();
        }
    
        @Override
        public Context currentContext() {
          return coreSubscriber.currentContext();
        }
      });
    
    
      @PostConstruct
      public void setHook() {
        reactor.core.publisher.Hooks.onEachOperator(THREAD_LOCAL_HOOKS_KEY, mdcHook);
      }
    
      @PreDestroy
      public void resetHook() {
        reactor.core.publisher.Hooks.resetOnEachOperator(THREAD_LOCAL_HOOKS_KEY);
      }
    }
    
    设置上下文信息
    Flux.just(1, 2, 3)
                ...
                .subscriberContext(ctx -> ctx.put("trance_id", UUID.randomUUID()))
                .subscribe();
    
    

    相关文章

      网友评论

        本文标题:Reactor异步编程中设置跨线程上下文

        本文链接:https://www.haomeiwen.com/subject/wfosfktx.html