在Reactor编程中,IO的请求线程和响应线程往往是不同的,这个机制会导致Java中ThreadLocal失效,也意味着用ThreadLocal来保存上下文的方案失效,例如日志框架中的DMC。
一种解决方案是,我们利用Reactor Context保存上下文,用Reactor Global Hooks拦截代理每一个Operator,在可能发生线程切换的地方,将上下文信息赋值到新线程的ThreadLocal中。
设置MDC示例代码
添加全局Hook
@Component
@SuppressWarnings("all")
public final class Hooks {
private Function<? super Publisher<Object>, ? extends Publisher<Object>> mdcHook = Operators.lift((scannable, coreSubscriber) -> new CoreSubscriber() {
@Override
public void onSubscribe(Subscription s) {
coreSubscriber.onSubscribe(s);
}
@Override
public void onNext(Object o) {
Context context = coreSubscriber.currentContext();
if (!context.isEmpty()) {
MDC.setContextMap(context.stream().collect(Collectors.toMap(entry -> entry.getKey().toString(), entry -> entry.getValue().toString())));
} else {
MDC.clear();
}
coreSubscriber.onNext(o);
}
@Override
public void onError(Throwable throwable) {
coreSubscriber.onError(throwable);
}
@Override
public void onComplete() {
coreSubscriber.onComplete();
}
@Override
public Context currentContext() {
return coreSubscriber.currentContext();
}
});
@PostConstruct
public void setHook() {
reactor.core.publisher.Hooks.onEachOperator(THREAD_LOCAL_HOOKS_KEY, mdcHook);
}
@PreDestroy
public void resetHook() {
reactor.core.publisher.Hooks.resetOnEachOperator(THREAD_LOCAL_HOOKS_KEY);
}
}
设置上下文信息
Flux.just(1, 2, 3)
...
.subscriberContext(ctx -> ctx.put("trance_id", UUID.randomUUID()))
.subscribe();
网友评论