美文网首页Project Reactor
Reactor异步编程中设置跨线程上下文

Reactor异步编程中设置跨线程上下文

作者: mxwgong | 来源:发表于2020-06-25 17:32 被阅读0次

在Reactor编程中,IO的请求线程和响应线程往往是不同的,这个机制会导致Java中ThreadLocal失效,也意味着用ThreadLocal来保存上下文的方案失效,例如日志框架中的DMC。

一种解决方案是,我们利用Reactor Context保存上下文,用Reactor Global Hooks拦截代理每一个Operator,在可能发生线程切换的地方,将上下文信息赋值到新线程的ThreadLocal中。


设置MDC示例代码

添加全局Hook
@Component
@SuppressWarnings("all")
public final class Hooks {

  private Function<? super Publisher<Object>, ? extends Publisher<Object>> mdcHook = Operators.lift((scannable, coreSubscriber) -> new CoreSubscriber() {
    @Override
    public void onSubscribe(Subscription s) {
      coreSubscriber.onSubscribe(s);
    }

    @Override
    public void onNext(Object o) {
      Context context = coreSubscriber.currentContext();
      if (!context.isEmpty()) {
        MDC.setContextMap(context.stream().collect(Collectors.toMap(entry -> entry.getKey().toString(), entry -> entry.getValue().toString())));
      } else {
        MDC.clear();
      }
      coreSubscriber.onNext(o);
    }

    @Override
    public void onError(Throwable throwable) {
      coreSubscriber.onError(throwable);
    }

    @Override
    public void onComplete() {
      coreSubscriber.onComplete();
    }

    @Override
    public Context currentContext() {
      return coreSubscriber.currentContext();
    }
  });


  @PostConstruct
  public void setHook() {
    reactor.core.publisher.Hooks.onEachOperator(THREAD_LOCAL_HOOKS_KEY, mdcHook);
  }

  @PreDestroy
  public void resetHook() {
    reactor.core.publisher.Hooks.resetOnEachOperator(THREAD_LOCAL_HOOKS_KEY);
  }
}
设置上下文信息
Flux.just(1, 2, 3)
            ...
            .subscriberContext(ctx -> ctx.put("trance_id", UUID.randomUUID()))
            .subscribe();

相关文章

网友评论

    本文标题:Reactor异步编程中设置跨线程上下文

    本文链接:https://www.haomeiwen.com/subject/wfosfktx.html