Hystrix
简要说明
在大中型分布式系统中,通常系统很多依赖。在高并发访问下,这些依赖的稳定性与否对系统的影响非常大,但是依赖有很多不可控问题:如网络连接缓慢,资源繁忙,暂时不可用,服务脱机等。在复杂的分布式架构的应用程序有很多的依赖,都会不可避免地在某些时候失败。高并发的依赖失败时如果没有隔离措施,当前应用服务就有被拖垮的风险。一般来说,随着服务依赖数量的变多,服务不稳定的概率会成指数性提高。
解决的问题
一个依赖30个SOA服务的系统,每个服务99.99%可用。
99.99%的30次方 ≈ 99.7%,
0.3% 意味着一亿次请求 会有 3,000,00次失败, 换算成时间大约每月有2个小时服务不稳定。 解决这个问题的方案是对依赖进行隔离。Hystrix就是处理依赖隔离的框架,同时也是可以帮我们做依赖服务的治理和监控。
前期准备
rxjava
Hystrix引入rxjava 1
四个基本概念:Observable (可观察者,即被观察者)、 Observer (观察者)、 subscribe (订阅)、事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
BlockingObservable<T> ,一个阻塞的Observable 继承普通的Observable类,增加了一些可用于阻塞Observable发射的数据的操作符。BlockingObservable已经在Rxjava2中去掉了,在Rxjava2中已经集成到了Observable
线程池、Future
略
流程图
![](https://img.haomeiwen.com/i11285504/be6fde6717ee8eb8.png)
构建一个HystrixCommand 或 HystrixObservableCommand
HystrixCommand
R run()
R getFallback()
HystrixObservableCommand
Observable<R> construct()
resumeWithFallback()
执行Command
K value = command.execute(); -> queue().get()
Future<K> fValue = command.queue(); -> toObservable().toBlocking().toFuture();( Observable -> BlockingObservable -> Future(delegate) -> Future)
Observable<K> ohValue = command.observe(); //hot observable
Observable<K> ocValue = command.toObservable(); //cold observable
public Observable<R> observe() {
// us a ReplaySubject to buffer the eagerly subscribed-to Observable
ReplaySubject<R> subject = ReplaySubject.create();
// eagerly kick off subscription
final Subscription sourceSubscription = toObservable().subscribe(subject);
// return the subject that can be subscribed to later while the execution has already started
return subject.doOnUnsubscribe(new Action0() {
@Override
public void call() {
sourceSubscription.unsubscribe();
}
});
}
![](https://img.haomeiwen.com/i11285504/d43072380b809878.png)
是否缓存
重写getCacheKey(),用来构造cache key。
HystrixRequestContext.initializeContext()和context.shutdown()构建context。
public class HelloWorld extends HystrixCommand<Boolean> {
@Override
protected String getCacheKey() {
return ?;
}
public static void main(String[] args) throws Exception {
HystrixRequestContext context = HystrixRequestContext.initializeContext();
try {
...
}finally {
context.shutdown();
}
}
}
Circuit是否打开
检测circuit-breaker是否打开,打开则直接进入fallback。否则进入下一步。
HystrixCommandProperties设置circuitBreaker
circuitBreaker.enabled
设置断路器是否生效。
circuitBreaker.requestVolumeThreshold
设置在一个滚动窗口中,打开断路器的最少请求数。
circuitBreaker.sleepWindowInMilliseconds
设置在断路器被打开,拒绝请求到再次尝试请求的时间间隔。
circuitBreaker.errorThresholdPercentage
设置打开断路器并启动回退逻辑的错误比率。(这个参数的效果受到circuitBreaker.requestVolumeThreshold和滚动时间窗口的时间长度影响)
circuitBreaker.forceOpen
强制断路器进入打开状态
circuitBreaker.forceClosed
强制断路器进入关闭状态
线程池、队列、信号是否占满
线程池、队列、信号是否占满的时候,将直接进入fallback
线程池配置
coreSize
心线程池的大小。
maximumSize
设置线程池最大值。
maxQueueSize
设置BlockingQueue最大的队列值。
queueSizeRejectionThreshold
设置队列拒绝的阈值
keepAliveTimeMinutes
设置存活时间,单位分钟。
allowMaximumSizeToDivergeFromCoreSize
该属性允许maximumSize起作用。
metrics.rollingStats.timeInMilliseconds
设置统计的滚动窗口的时间段大小。
metrics.rollingStats.numBuckets
设置滚动的统计窗口被分成的bucket的数目。
![](https://img.haomeiwen.com/i11285504/978f7b8819c66463.png)
run()或者construct()
执行相应的run()或者construct()的方法。
计算Circuit Health
通过计算successes, failures, rejections, and timeouts,确定是否打开circuitBreaker。
getFallback()或者resumeWithFallback()
是否进入fallback
Failure Type | fallback |
---|---|
FAILURE | YES |
TIMEOUT | YES |
SHORT_CIRCUITED | YES |
THREAD_POOL_REJECTED | YES |
SEMAPHORE_REJECTED | YES |
BAD_REQUEST | YES |
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
折叠器
通过继承HystrixCollapser,实现多个请求折叠成单个请求。
![](https://img.haomeiwen.com/i11285504/556f7e063e8acef4.png)
HystrixCollapserProperties
名称 | 描述 | 默认值 |
---|---|---|
maxRequestsInBatch | 允许的最大请求数 | Integer.MAX_VALUE |
timerDelayInMilliseconds | 多少毫秒后出发执行 | 10毫秒 |
requestCache.enabled | 是否缓存 | true |
网友评论