Hystrix的用途以及使用场景就不在这里赘述了,这里只关注源码级别的实现原理。
1、AbstractCommand、HystrixCommand、HystrixObservableCommand
1.1 AbstractCommand
Hystrix会把对服务的请求封装成命令并通过执行命令的方式进行请求调用,抽象类AbstractCommand即是这些命令的核心父类接口,此接口完成了大部分的Hystrix的逻辑实现。
1.1.1 核心属性
-
HystrixCircuitBreaker circuitBreaker
Hystrix在执行具体的调用操作之前会调用HystrixCircuitBreaker的相关方法判断熔断器是否打开,如果熔断器已经打开则直接fallback快速失败,否则才会进行实际的调用。下面的章节会详细介绍HystrixCircuitBreaker接口。 -
HystrixThreadPool threadPool
Hystrix线程池,Hystrix命令执行隔离策略有两种:线程池和信号量。默认是线程池,即命令会异步运行在线程池中。信号量则使用原有的线程。使用线程池有一定的延迟,但是可以有较高的吞吐量。 -
HystrixCommandKey commandKey
命令唯一标识,Hystrix会为每一个命令缓存一个HystrixCircuitBreaker对象以及一个HystrixCommandMetrics对象,这些对象被缓存在的静态属性Map中,命令通过HystrixCommandKey标识其唯一性,在使用Feign接口时,命令唯一标识即为方法签名。 -
HystrixCommandGroupKey commandGroup
命令组唯一标识,每一个依赖服务对应一个命令组,每一个命令组对应一个线程池,例如,某应用依赖了S1和S2两个服务,则会有两个线程池,服务的调用运行在自己的线程池中,在使用Feign接口时,命令组唯一标识即为服务的名称。 -
HystrixThreadPoolKey threadPoolKey
线程池唯一标识,和命令组唯一标识保持一致。 -
HystrixThreadPool threadPool
用于执行Hystrix命令的线程池。此线程池是服务级别的,即每一个服务对应一个线程池,那么对这个服务的所有调用请求都会在此线程池中执行。 -
HystrixCommandMetrics metrics
命令执行情况的统计度量,Hystrix通过HystrixCommandMetrics记录和统计命令执行的总次数、失败次数等信息。 -
ExecutionResult executionResult
命令执行结果,命令执行过程中会不断的把执行情况汇总至ExecutionResult,当命令完成后会把ExecutionResult交给HystrixCommandMetrics做统计之用。
1.1.2 核心方法
-
Observable<R> toObservable()
注册命令执行完成事件以及触发事件源。此方法返回一个Cold Observable对象,即在没有订阅者时不进行事件发布,而是等待,直到有订阅者时才发布事件。 -
Observable<R> observable()
注册命令执行完成事件以及触发事件源。此方法返回一个Hot Observable对象,即不管是否有订阅者都会发布事件,这种情况下有可能出现订阅者只观察到整个过程的一部分的现象。 -
Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd)
执行熔断语义,是熔断判断的核心方法,Hystrix首先会从缓存中获取结果,如果从缓存中获取结果失败则调用此方法执行熔断语义,此方法首先调用HystrixCircuitBreaker#allowRequest()判断是否允许请求,如果不允许则直接调用handleShortCircuitViaFallback()方法执行快速失败的逻辑,如果允许则判断信号量资源是否能获取,如果获取失败则调用handleSemaphoreRejectionViaFallback()方法执行信号量资源拒绝快速失败逻辑,如果资源获取成功则执行命令。其源码如下:private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) { // 熔断器是否允许请求 if (circuitBreaker.allowRequest()) { final TryableSemaphore executionSemaphore = getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false); final Action0 singleSemaphoreRelease = new Action0() { @Override public void call() { if (semaphoreHasBeenReleased.compareAndSet(false, true)) { executionSemaphore.release(); } } }; final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() { @Override public void call(Throwable t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); } }; // 信号量获取资源是否成功 if (executionSemaphore.tryAcquire()) { try { /* used to track userThreadExecutionTime */ executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); // 成功则执行命令 return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { // 执行信号量获取资源拒绝快速失败逻辑 return handleSemaphoreRejectionViaFallback(); } } else { // 执行熔断器开启是的快速失败逻辑 return handleShortCircuitViaFallback(); } }
-
Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd)
注册熔断相关事件,用于收集执行结果情况,执行结果情况记录在ExecutionResult中。 -
Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd)
此方法会判断是需要在线程池中执行命令还是使用原有线程执行命令,如果是前者则注册线程完成相关的事件。 -
void handleCommandEnd(boolean commandExecutionStarted)
命令执行结束会调用此方法,此方法会将ExecutionResult对象交由HystrixCommandMetrics做执行情况的统计。 -
Observable<R> handleSemaphoreRejectionViaFallback()
信号量资源拒绝时的快速失败逻辑实现,最终会执行用户指定的fallback逻辑。 -
Observable<R> handleShortCircuitViaFallback()
熔断器打开时的快速失败逻辑实现,最终会执行用户指定的fallback逻辑。
1.2. HystrixCommand、HystrixObservableCommand
Hystrix的两个核心命令,在使用Hystrix时第一步是创建命令,那么这里说的命令即是HystrixCommand和HystrixObservableCommand或者是他们的子类。
1.2.1 HystrixCommand
AbstractCommand的子类,用于返回一个单一的结果,同时提供同步和异步两种方式执行命令:
- Future<R> queue() 异步方式执行命令。
- public R execute() 同步方式执行命令。
HystrixCommand是一个抽象类,其中run()方法为抽象方法,在使用时一般需要实现run()和getFallback()两个方法,如下:
- protected abstract R run() throws Exception
- protected R getFallback()
这两个方法分别对应正常的结果返回和快速失败的结果返回,在调用queue()和execute()时会回调到这两个方法。
1.2.2 HystrixObservableCommand
AbstractCommand的子类,用于返回多个结果,同时提供Hot Observable和Cold Observable两种执行方式:
- Observable<R> observable() 返回一个Hot Observable。
- Observable<R> toObservable() 返回一个Cold Observable。
值得注意的是这两个方法并不是HystrixObservableCommand中的,而是其父类AbstractCommand中的方法。
在使用HystrixObservableCommand时需要实现construct()和resumeWithFallback()两个方法:
- protected Observable<String> construct()
- protected Observable<String> resumeWithFallback()
这两个方法分别返回正常的Observable以及快速失败的Observable。在调用observable()和toObservable()时会回调到这两个方法。
所谓返回多个结果,是说可以通过Observable的相关方法获取到多个结果的迭代器,如下是一个使用HystrixObservableCommand的小例子:
package com.zws.cloud.consumer;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixObservableCommand;
import rx.Emitter;
import rx.Observable;
import java.util.Iterator;
/**
* @Author wensh.zhu
* @Date 2019/5/7 16:55
*/
public class HystrixObservableCommandTest {
public static void main(String[] args) {
HystrixObservableCommandHello commandHello = new HystrixObservableCommandHello();
// 得到结果的迭代器
Iterator<String> iterator = commandHello.observe().toBlocking().getIterator();
while (iterator.hasNext()) {
System.out.println(iterator.next());
}
}
}
class HystrixObservableCommandHello extends HystrixObservableCommand<String> {
public HystrixObservableCommandHello() {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("TestGroup")).andCommandKey(HystrixCommandKey.Factory.asKey("Hello")));
}
@Override
protected Observable<String> construct() {
return Observable.<String>create(emitter -> {
emitter.onNext("abc");
/**
// 模拟异常发生
String name = null;
System.out.println(name.length());
**/
emitter.onNext("efg");
emitter.onCompleted();
}, Emitter.BackpressureMode.NONE).doOnError(throwable -> System.out.println("哎呀,观察到异常啦"));
}
@Override
protected Observable<String> resumeWithFallback() {
return Observable.create(emitter -> {
emitter.onNext("error");
emitter.onNext("ERROR");
emitter.onCompleted();
}, Emitter.BackpressureMode.NONE);
}
}
2、HystrixCircuitBreaker
HystrixCircuitBreaker是Hystrix的核心接口,其实现类为HystrixCircuitBreaker.HystrixCircuitBreakerImpl,重要方法如下:
-
boolean isOpen()
熔断器是否打开,类HystrixCircuitBreakerImpl中有一类型为AtomicBoolean的属性circuitOpen,表示熔断器是否打开。此方法首先判断circuitOpen是否为true,如果为true则直接返回true。否则判断请求次数是否达到熔断请求次数阈值,此阈值默认为20,如果没有达到则直接返回false,如果请求次数大于等于20,则判断这些请求中失败的请求次数占比是否小于50%(默认),如果小于则返回false,否则将circuitOpen设置为true,并更新熔断时间戳,熔断时间戳记录在类HystrixCircuitBreakerImpl的属性circuitOpenedOrLastTestedTime中,类型为AtomicLong。所以综上可以得出熔断器打开需要满足两个条件:- 请求次数大于等于20(默认)
- 失败请求次数占比大于等于50%(默认)
-
boolean allowSingleTest()
熔断器是否处于半打开状态。此方法的逻辑是:如果熔断器没有打开(即circuitOpen为false)则直接返回false,如果熔断器已经打开则判断当前时间戳是否大于熔断器打开的时间戳+熔断窗口时间(默认5秒),如果是则更新熔断器打开的时间戳为当前时间并返回true。也就是说:- 熔断器打开后需要等待至少5秒才可以再次尝试请求。
- 如果此次尝试失败则至少再等待5秒才可以尝试。
-
void markSuccess()
此方法会在熔断器打开的状态下:-
关闭熔断器
即把circuitOpen设置为false。 -
并将统计数据置零
调用HystrixCommandMetrics的resetStream()方法。
-
关闭熔断器
-
boolean allowRequest()
是否允许请求,Hystrix在执行具体的调用之前通过调用此方法判断是否允许请求,如果不允许则直接fullback快速失败。此方法调用isOpen()和allowSingleTest()返回熔断器是否关闭或半开,具体逻辑:首先判断调用isOpen()方法判断熔断器是否打开,如果熔断器打开则直接返回false,否则调用allowSingleTest()方法判断是否允许尝试请求,如果是则返回true,否则返回false。HystrixCircuitBreakerImpl源码如下:
static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker { private final HystrixCommandProperties properties; private final HystrixCommandMetrics metrics; // 保存熔断器状态:开启or关闭 private AtomicBoolean circuitOpen = new AtomicBoolean(false); // 保存熔断器开启或最后更新时间戳 private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong(); protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) { this.properties = properties; this.metrics = metrics; } public void markSuccess() { if (circuitOpen.get()) { // 熔断器关闭并重置统计数据 if (circuitOpen.compareAndSet(true, false)) { metrics.resetStream(); } } } @Override public boolean allowRequest() { if (properties.circuitBreakerForceOpen().get()) { return false; } if (properties.circuitBreakerForceClosed().get()) { isOpen(); return true; } // 熔断器是否关闭或者半开 return !isOpen() || allowSingleTest(); } public boolean allowSingleTest() { long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get(); // 熔断器开启且开启时长大于等于5秒则为半开状态 if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) { // 更新熔断器开启时间戳 if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) { return true; } } return false; } @Override public boolean isOpen() { if (circuitOpen.get()) { return true; } HealthCounts health = metrics.getHealthCounts(); // 请求次数小于20直接返回false if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) { return false; } if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) { return false; } else { // 请求失败次数占比大于等于50%开启熔断器、记录开启时间戳 if (circuitOpen.compareAndSet(false, true)) { circuitOpenedOrLastTestedTime.set(System.currentTimeMillis()); return true; } else { return true; } } } }
熔断器对象是命令级别的,也就是说每一个命令都会对应一个HystrixCircuitBreaker对象,这些HystrixCircuitBreaker对象被缓存在类HystrixCircuitBreaker.Factory的属性circuitBreakersByCommand中,如下:
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
3、HystrixCommandMetrics、HealthCountsStream、HystrixEventStream、HystrixCommandMetrics.HealthCounts
这些都是和Hystrix的命令执行情况统计相关的类,这些统计信息直接影响到Hystrix的熔断断判。
3.1 HystrixCommandMetrics.HealthCounts
此类保存了命令执行的总次数、失败次数以及失败次数的占比,plus()方法则用于更新这些数据,HystrixCircuitBreaker就是根据此类的统计信息做熔断器的打开和半开判断的,源码如下:
public static class HealthCounts {
private final long totalCount;
private final long errorCount;
private final int errorPercentage;
HealthCounts(long total, long error) {
this.totalCount = total;
this.errorCount = error;
if (totalCount > 0) {
this.errorPercentage = (int) ((double) errorCount / totalCount * 100);
} else {
this.errorPercentage = 0;
}
}
// 略
public HealthCounts plus(long[] eventTypeCounts) {
long updatedTotalCount = totalCount;
long updatedErrorCount = errorCount;
long successCount = eventTypeCounts[HystrixEventType.SUCCESS.ordinal()];
long failureCount = eventTypeCounts[HystrixEventType.FAILURE.ordinal()];
long timeoutCount = eventTypeCounts[HystrixEventType.TIMEOUT.ordinal()];
long threadPoolRejectedCount = eventTypeCounts[HystrixEventType.THREAD_POOL_REJECTED.ordinal()];
long semaphoreRejectedCount = eventTypeCounts[HystrixEventType.SEMAPHORE_REJECTED.ordinal()];
updatedTotalCount += (successCount + failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
updatedErrorCount += (failureCount + timeoutCount + threadPoolRejectedCount + semaphoreRejectedCount);
return new HealthCounts(updatedTotalCount, updatedErrorCount);
}
}
3.2 HealthCountsStream
从上面我们知道HystrixCommandMetrics.HealthCounts保存了命令执行的总次数、失败次数,那么这些统计数字是怎么来的呢?答案就是HealthCountsStream类,首先此类的父类BucketedCounterStream中有一个BehaviorSubject<Output>类型的属性counterSubject,Output是泛型,这里即是HystrixCommandMetrics.HealthCounts。HealthCountsStream保存了一个回调函数,用于通过调用HystrixCommandMetrics.HealthCounts的plus()方法累计命令执行的总次数和失败次数。每当HealthCountsStream接受到事件时都会回调此函数。那么事件时在哪里触发的呢?答案是HystrixEventStream。
3.3 HystrixEventStream
HystrixEventStream用于发布命令相关事件,其实现类有HystrixCommandCompletionStream、HystrixThreadPoolStartStream、HystrixCollapserEventStream等,每一个实现类都有一个write()方法,此方法发布一种命令相关事件,例如:HystrixCommandCompletionStream负责发布命令完成事件,HystrixThreadPoolStartStream发布线程池开始事件。这些实现类发布的事件会被上面说的类HealthCountsStream中的一个回调函数监听到。
3.4 HystrixCommandMetrics
此类用于重置统计信息以及接受命令执行完成和开始的通知。当命令开始执行和执行结束时分别调用此类的markCommandStart()方法和markCommandDone()方法,这两个方法分别通过HystrixCommandStartStream和HystrixCommandCompletionStream发布命令开始和结束事件。另外HystrixCircuitBreaker的markSuccess()方法被调用时会调用HystrixCommandMetrics#resetStream()方法,此方法会删除缓存中命令对应的HystrixCommandMetrics并创建一个新的HystrixCommandMetrics。相关源码如下:
void markCommandStart(HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties.ExecutionIsolationStrategy isolationStrategy) {
int currentCount = concurrentExecutionCount.incrementAndGet();
HystrixThreadEventStream.getInstance().commandExecutionStarted(commandKey, threadPoolKey, isolationStrategy, currentCount);
}
void markCommandDone(ExecutionResult executionResult, HystrixCommandKey commandKey, HystrixThreadPoolKey threadPoolKey, boolean executionStarted) {
HystrixThreadEventStream.getInstance().executionDone(executionResult, commandKey, threadPoolKey);
if (executionStarted) {
concurrentExecutionCount.decrementAndGet();
}
}
synchronized void resetStream() {
healthCountsStream.unsubscribe();
HealthCountsStream.removeByKey(key);
healthCountsStream = HealthCountsStream.getInstance(key, properties);
}
4、Feign与Hystrix
4.1 Hystrix依赖与配置
在我们使用Spring Cloud时,更多是在使用Feign接口时使用Hystrix,那么在Feign接口中使用Hystrix需要那些配置呢?很简单:
- 引入Hystrix依赖,如下:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> </dependency>
- 配置项eign.hystrix.enabled设置为true。
feign: hystrix: enabled: true
4.2 HystrixInvocationHandler
当配置项eign.hystrix.enabled设置为true时,配置类FeignClientsConfiguration的内部静态类HystrixFeignConfiguration会向Spring上下文中注入类型为HystrixFeign.Builder的Bean,如下:
@Configuration
@ConditionalOnClass({ HystrixCommand.class, HystrixFeign.class })
protected static class HystrixFeignConfiguration {
@Bean
@Scope("prototype")
@ConditionalOnMissingBean
@ConditionalOnProperty(name = "feign.hystrix.enabled")
public Feign.Builder feignHystrixBuilder() {
return HystrixFeign.builder();
}
}
public final class HystrixFeign {
public static Builder builder() {
return new Builder();
}
public static final class Builder extends Feign.Builder {
Feign build(final FallbackFactory<?> nullableFallbackFactory) {
super.invocationHandlerFactory(new InvocationHandlerFactory() {
@Override
public InvocationHandler create(Target target,
Map<Method, MethodHandler> dispatch) {
// 这里返回HystrixInvocationHandler对象
return new HystrixInvocationHandler(target, dispatch, setterFactory,
nullableFallbackFactory);
}
});
super.contract(new HystrixDelegatingContract(contract));
return super.build();
}
// 略
}
}
HystrixFeign.builder()方法返回HystrixFeign的内部类HystrixFeign.Builder,当为Feign接口生成代理对象时,HystrixFeign.Builder类的build()方法被调用,此方法会创建执行处理器HystrixInvocationHandler,当Feign接口执行时就会调用HystrixInvocationHandler的invoke()方法,那么接下来就简单了,invoke()方法创建HystrixCommand对象并执行其execute()方法,如下:
public Object invoke(final Object proxy, final Method method, final Object[] args)
throws Throwable {
// 略
HystrixCommand<Object> hystrixCommand =
new HystrixCommand<Object>(setterMethodMap.get(method)) {
@Override
protected Object run() throws Exception {
try {
return HystrixInvocationHandler.this.dispatch.get(method).invoke(args);
} catch (Exception e) {
throw e;
} catch (Throwable t) {
throw (Error) t;
}
}
@Override
protected Object getFallback() {
if (fallbackFactory == null) {
return super.getFallback();
}
try {
Object fallback = fallbackFactory.create(getExecutionException());
Object result = fallbackMethodMap.get(method).invoke(fallback, args);
if (isReturnsHystrixCommand(method)) {
return ((HystrixCommand) result).execute();
} else if (isReturnsObservable(method)) {
// Create a cold Observable
return ((Observable) result).toBlocking().first();
} else if (isReturnsSingle(method)) {
// Create a cold Observable as a Single
return ((Single) result).toObservable().toBlocking().first();
} else if (isReturnsCompletable(method)) {
((Completable) result).await();
return null;
} else {
return result;
}
} catch (IllegalAccessException e) {
throw new AssertionError(e);
} catch (InvocationTargetException e) {
throw new AssertionError(e.getCause());
}
}
};
if (Util.isDefault(method)) {
return hystrixCommand.execute();
} else if (isReturnsHystrixCommand(method)) {
return hystrixCommand;
} else if (isReturnsObservable(method)) {
return hystrixCommand.toObservable();
} else if (isReturnsSingle(method)) {
return hystrixCommand.toObservable().toSingle();
} else if (isReturnsCompletable(method)) {
return hystrixCommand.toObservable().toCompletable();
}
return hystrixCommand.execute();
}
5 注解的方式使用Hystrix
当不使用Feign接口时我们还可以使用注解@HystrixCommand和@HystrixCollapse的方式使用Hystrix。前者执行单一命令,后者则可以聚合多个请求最终以批量的方式执行命令。
5.1 注解的配置与使用
-
配置
在SpringBoot启动类添加@EnableCircuitBreaker注解。 -
使用
下面是一个使用注解的小例子:@Service public class HelloService { @Resource private RestTemplate restTemplate; @HystrixCommand(groupKey = "gTest", commandKey = "hi", fallbackMethod = "fallBack") public String hi() { return restTemplate.getForEntity("http://Eureka-Producer/hi", String.class).getBody(); } public String fallBack() { return "熔断发生"; } @HystrixCollapser(batchMethod = "getMembers", collapserProperties = { // 收集1秒内的请求 @HystrixProperty(name = "timerDelayInMilliseconds", value = "1000") }) public Future<String> getMember(Integer id) { System.out.println("执行单个查询的方法"); return null; } @HystrixCommand public List<String> getMembers(List<Integer> ids) { System.out.println("执行合并操作调用"); List<String> mems = new ArrayList<>(); for(Integer id : ids) { mems.add("name" + id); } return mems; } } @RestController public class HelloController { @Resource private HelloService helloService; @GetMapping("/hi") public String hi() { return helloService.hi(); } @GetMapping("/coll") public String coll() throws ExecutionException, InterruptedException { HystrixRequestContext.initializeContext(); // 这里模拟短时间内3次调用 Future<String> f1 = helloService.getMember(1); Future<String> f2 = helloService.getMember(2); Future<String> f3 = helloService.getMember(3); return f1.get() + ", " + f2.get() + ", " + f3.get(); } }
5.2 注解的实现原理
@EnableCircuitBreaker注解会使配置类HystrixCircuitBreakerConfiguration会被加载,此配置类向Spring上下文中注入类一个切面Bean:HystrixCommandAspect,源码如下:
@Configuration
public class HystrixCircuitBreakerConfiguration {
@Bean
public HystrixCommandAspect hystrixCommandAspect() {
return new HystrixCommandAspect();
}
// 略
}
此切面会拦截标记了@HystrixCommand和@HystrixCollapse的方法,并根据不同注解以及相关配置选择对应的命令并执行,如下:
@Aspect
public class HystrixCommandAspect {
private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;
static {
META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
.put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
.put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
.build();
}
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {
}
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
public void hystrixCollapserAnnotationPointcut() {
}
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
Method method = getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
"annotations at the same time");
}
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
Object result;
try {
if (!metaHolder.isObservable()) {
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause();
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
}
// 略
}
public class HystrixCommandFactory {
private static final HystrixCommandFactory INSTANCE = new HystrixCommandFactory();
private HystrixCommandFactory() {
}
public static HystrixCommandFactory getInstance() {
return INSTANCE;
}
public HystrixInvokable create(MetaHolder metaHolder) {
HystrixInvokable executable;
// 选择创建对应的命令
if (metaHolder.isCollapserAnnotationPresent()) {
executable = new CommandCollapser(metaHolder);
} else if (metaHolder.isObservable()) {
executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
} else {
executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder));
}
return executable;
}
// 略
}
6、总结
Hystrix会把请求封装成命令(HystrixCommand或HystrixObservableCommand),并以执行命令的方式进行请求的调用,整个过程如下:
- 创建命令HystrixCommand或者HystrixObservableCommand。
- 调用命令对应的方法,HystrixCommand:execute()或queue(),HystrixObservableCommand:toObservable()或observe()。
- 判断缓存是否开启缓存以及缓存中是否已经缓存了请求结果,如果有则直接返回。
- HystrixCircuitBreaker熔断器是否允许请求,如果不允许则执行快速失败逻辑。
- 信号量/线程池资源获取是否成功,如果失败则执行快速失败逻辑。
- 执行run()方法或者construct()方法。
- 返回结果更新统计数据。
放个彩蛋:
image
网友评论