美文网首页
HystrixCommand 是如何转换成 Observable

HystrixCommand 是如何转换成 Observable

作者: 蓝笔头 | 来源:发表于2020-04-21 09:43 被阅读0次

HelloService 定义如下。

@Slf4j
@Service
public class HelloService {

    @Autowired
    RestTemplate restTemplate;
    
    @HystrixCommand(fallbackMethod = "helloFallback")
    public String helloService() {
        long start = System.currentTimeMillis();

        // 消费服务的逻辑
        String result = restTemplate.getForEntity("http://HELLO-SERVICE/hello", String.class).getBody();

        long end = System.currentTimeMillis();

        log.info("Spend time: {}", (end - start));

        return result;
    }
    
    public String helloFallback() {
        return "error";
    }
}

@HystrixCommand 注解在 HystrixCommandAspect 中被处理。

CommandActions 类中封装需要执行的方法以及降级的方法

public class CommandActions {
    // commandAction 封装 HelloService.helloService()
    private final CommandAction commandAction;
    // fallbackAction 封装 HelloService.helloFallback()
    private final CommandAction fallbackAction;
}

HystrixCommand 中表示正常方法的 Observable 和降级方法的 Observable

public abstract class HystrixCommand<R> extends AbstractCommand<R> implements HystrixExecutable<R>, HystrixInvokableInfo<R>, HystrixObservable<R> {
    @Override
    final protected Observable<R> getExecutionObservable() {
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                try {
                    return Observable.just(run());
                } catch (Throwable ex) {
                    return Observable.error(ex);
                }
            }
        }).doOnSubscribe(new Action0() {
            @Override
            public void call() {
                // Save thread on which we get subscribed so that we can interrupt it later if needed
                executionThread.set(Thread.currentThread());
            }
        });
    }

    @Override
    final protected Observable<R> getFallbackObservable() {
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                try {
                    return Observable.just(getFallback());
                } catch (Throwable ex) {
                    return Observable.error(ex);
                }
            }
        });
    }
}

GenericCommand 中的 rungetFallback 方法。

  • run():执行正常逻辑的方法。
  • getFallback():执行降级逻辑的方法。
public class GenericCommand extends AbstractHystrixCommand<Object> {

    @Override
    protected Object run() throws Exception {
        LOGGER.debug("execute command: {}", getCommandKey().name());
        return process(new Action() {
            @Override
            Object execute() {
                return getCommandAction().execute(getExecutionType());
            }
        });
    }

    @Override
    protected Object getFallback() {
        final CommandAction commandAction = getFallbackAction();
        if (commandAction != null) {
            try {
                return process(new Action() {
                    @Override
                    Object execute() {
                        MetaHolder metaHolder = commandAction.getMetaHolder();
                        Object[] args = createArgsForFallback(metaHolder, getExecutionException());
                        return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
                    }
                });
            } catch (Throwable e) {
                LOGGER.error(FallbackErrorMessageBuilder.create()
                        .append(commandAction, e).build());
                throw new FallbackInvocationException(unwrapCause(e));
            }
        } else {
            return super.getFallback();
        }
    }
}

AbstractCommand 类中的 toObservable 方法解析。

public Observable<R> toObservable() {
    final AbstractCommand<R> _cmd = this;

    final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                return Observable.never();
            }
            // 在这里创建 Observable
            return applyHystrixSemantics(_cmd);
        }
    };

    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            Observable<R> hystrixObservable =
                    Observable.defer(applyHystrixSemantics)
                            .map(wrapWithAllOnNextHooks);

            Observable<R> afterCache = hystrixObservable;
            return afterCache
                    .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                    .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                    .doOnCompleted(fireOnCompletedHook);
        }
    });
}

接下来继续跟踪 AbstractCommand 类中 applyHystrixSemantics 方法。

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
    /* determine if we're allowed to execute */
    if (circuitBreaker.allowRequest()) {
        if (executionSemaphore.tryAcquire()) {
            try {
                // executeCommandAndObserve 方法创建 Observable
                return executeCommandAndObserve(_cmd)
                        .doOnError(markExceptionThrown)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
            } catch (RuntimeException e) {
                return Observable.error(e);
            }
        } else {
            return handleSemaphoreRejectionViaFallback();
        }
    } else {
        return handleShortCircuitViaFallback();
    }
}

接下来继续跟踪 AbstractCommand 类中 executeCommandAndObserve 方法。

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();

    Observable<R> execution;
    if (properties.executionTimeoutEnabled().get()) {
        execution = executeCommandWithSpecifiedIsolation(_cmd) // 执行正常逻辑
                .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); // 添加超时处理机制
    } else {
        execution = executeCommandWithSpecifiedIsolation(_cmd);
    }

    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback) // 降级处理
            .doOnEach(setRequestContext);
}

AbstractCommand 类中 executeCommandWithSpecifiedIsolation 方法中的第一个核心方法getUserExecutionObservable

用来生成执行 HelloService.helloService() 逻辑的 Observable。

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                return getUserExecutionObservable(_cmd);
            }
        });
}

private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
    // 调用 AbstractCommand 子类 HystrixCommand 的 getExecutionObservable 方法
    return getExecutionObservable();
}

AbstractCommand 类中 executeCommandWithSpecifiedIsolation 方法中的第二个核心方法handleFallback

final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
    @Override
    public Observable<R> call(Throwable t) {
        Exception e = getExceptionFromThrowable(t);
        executionResult = executionResult.setExecutionException(e);
        if (e instanceof RejectedExecutionException) {
            return handleThreadPoolRejectionViaFallback(e);
        } else if (t instanceof HystrixTimeoutException) {
            return handleTimeoutViaFallback();
        } else if (t instanceof HystrixBadRequestException) {
            return handleBadRequestByEmittingError(e); 
        } else {
            return handleFailureViaFallback(e);
        }
    }
};

handle*ViaFallback 方法底层都调用了 getFallbackOrThrowException 方法。

下面继续跟进 getFallbackOrThrowException 方法。

private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
    // 调用 AbstractCommand 子类 HystrixCommand 的 getFallbackObservable 方法
    Observable<R> fallbackExecutionChain = getFallbackObservable();
    return fallbackExecutionChain;
}

至此,@HystrixCommand 注解转换成 Observable 的大致流程已经分析完毕。

相关文章

网友评论

      本文标题:HystrixCommand 是如何转换成 Observable

      本文链接:https://www.haomeiwen.com/subject/amadihtx.html