HelloService
定义如下。
@Slf4j
@Service
public class HelloService {
@Autowired
RestTemplate restTemplate;
@HystrixCommand(fallbackMethod = "helloFallback")
public String helloService() {
long start = System.currentTimeMillis();
// 消费服务的逻辑
String result = restTemplate.getForEntity("http://HELLO-SERVICE/hello", String.class).getBody();
long end = System.currentTimeMillis();
log.info("Spend time: {}", (end - start));
return result;
}
public String helloFallback() {
return "error";
}
}
@HystrixCommand
注解在 HystrixCommandAspect
中被处理。
CommandActions
类中封装需要执行的方法以及降级的方法
public class CommandActions {
// commandAction 封装 HelloService.helloService()
private final CommandAction commandAction;
// fallbackAction 封装 HelloService.helloFallback()
private final CommandAction fallbackAction;
}
HystrixCommand
中表示正常方法的 Observable
和降级方法的 Observable
。
public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
@Override
final protected Observable<R> getExecutionObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
return Observable.just(run());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
}).doOnSubscribe(new Action0() {
@Override
public void call() {
// Save thread on which we get subscribed so that we can interrupt it later if needed
executionThread.set(Thread.currentThread());
}
});
}
@Override
final protected Observable<R> getFallbackObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
return Observable.just(getFallback());
} catch (Throwable ex) {
return Observable.error(ex);
}
}
});
}
}
GenericCommand
中的 run
和 getFallback
方法。
-
run()
:执行正常逻辑的方法。 -
getFallback()
:执行降级逻辑的方法。
public class GenericCommand extends AbstractHystrixCommand<Object> {
@Override
protected Object run() throws Exception {
LOGGER.debug("execute command: {}", getCommandKey().name());
return process(new Action() {
@Override
Object execute() {
return getCommandAction().execute(getExecutionType());
}
});
}
@Override
protected Object getFallback() {
final CommandAction commandAction = getFallbackAction();
if (commandAction != null) {
try {
return process(new Action() {
@Override
Object execute() {
MetaHolder metaHolder = commandAction.getMetaHolder();
Object[] args = createArgsForFallback(metaHolder, getExecutionException());
return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
}
});
} catch (Throwable e) {
LOGGER.error(FallbackErrorMessageBuilder.create()
.append(commandAction, e).build());
throw new FallbackInvocationException(unwrapCause(e));
}
} else {
return super.getFallback();
}
}
}
AbstractCommand
类中的 toObservable
方法解析。
public Observable<R> toObservable() {
final AbstractCommand<R> _cmd = this;
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
// 在这里创建 Observable
return applyHystrixSemantics(_cmd);
}
};
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
Observable<R> afterCache = hystrixObservable;
return afterCache
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
}
});
}
接下来继续跟踪 AbstractCommand
类中 applyHystrixSemantics
方法。
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
/* determine if we're allowed to execute */
if (circuitBreaker.allowRequest()) {
if (executionSemaphore.tryAcquire()) {
try {
// executeCommandAndObserve 方法创建 Observable
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
return handleShortCircuitViaFallback();
}
}
接下来继续跟踪 AbstractCommand
类中 executeCommandAndObserve
方法。
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd) // 执行正常逻辑
.lift(new HystrixObservableTimeoutOperator<R>(_cmd)); // 添加超时处理机制
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback) // 降级处理
.doOnEach(setRequestContext);
}
AbstractCommand
类中 executeCommandWithSpecifiedIsolation
方法中的第一个核心方法是 getUserExecutionObservable
。
用来生成执行 HelloService.helloService() 逻辑的 Observable。
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
return getUserExecutionObservable(_cmd);
}
});
}
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
// 调用 AbstractCommand 子类 HystrixCommand 的 getExecutionObservable 方法
return getExecutionObservable();
}
AbstractCommand
类中 executeCommandWithSpecifiedIsolation
方法中的第二个核心方法是 handleFallback
。
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
return handleFailureViaFallback(e);
}
}
};
handle*ViaFallback
方法底层都调用了 getFallbackOrThrowException
方法。
下面继续跟进 getFallbackOrThrowException
方法。
private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
// 调用 AbstractCommand 子类 HystrixCommand 的 getFallbackObservable 方法
Observable<R> fallbackExecutionChain = getFallbackObservable();
return fallbackExecutionChain;
}
至此,@HystrixCommand
注解转换成 Observable
的大致流程已经分析完毕。
网友评论