Hystrix是刺猬的意思,为了保护自己,不能让被调用者引起本身系统的不可用,甚至一直向上影响到整个微服务系统,Hystrix基于Rxjava实现,不过是老的1.x版本的Rxjava,新的2.x的版本还未使用
Hystrix采取了线程池隔离(默认)和信号量隔离,简而言之,当你调用一个第三方系统或者dubbo服务A的时候,这部分操作会全部放入到一个线程池中去处理,然后监控这个线程池的情况,比如池满了就会报错了(当然配了QueueSize的除外),信号量是简单一个变量来监控当前的并发量,当达到一定值的之后就会报错,当然我们可以处理这个错误然后返回自己规定的数据(gatFallBack),看下图
image.png
Hystrix有自己的监控机制,统计算法,参考下面这张图,数据是在向左滚动的,我们可以配置滚动统计的时间间隔,比如3秒钟统计一次,通过统计的数据Hystrix能做出对应的保护机制,比如调用第三方的失败率达到百分之50%的时候会做快速失败(即直接失败),然后在5秒钟之后又去尝试一下看看是不是恢复了
image.png
下面我们来测试一下Hystrix的效果
①测试线程池满了
MyCommand
public class MyCommand extends HystrixCommand<String> {
private final String group;
private MyService service;
private String thing;
public MyCommand(String group, String thing) {
// super(HystrixCommandGroupKey.Factory.asKey(group));
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(group))
.andCommandKey(HystrixCommandKey.Factory.asKey("myCommand"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("myCommandThreadPool"))
.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter()
.withCircuitBreakerRequestVolumeThreshold(5)
.withCircuitBreakerErrorThresholdPercentage(60)
// .withExecutionTimeoutInMilliseconds(2000)
.withExecutionTimeoutEnabled(false)//配合下方MyService的等待15s,防止超时直接报错
.withMetricsRollingStatisticalWindowInMilliseconds(1000))
.andThreadPoolPropertiesDefaults(
HystrixThreadPoolProperties.Setter()
.withCoreSize(10))//这里我们设置了线程池大小为10
);
this.group = group;
this.thing = thing;
}
@Override
protected String run() throws Exception {
service.doSth(thing);
return thing + " over";
}
@Override
protected String getFallback() {
return thing + " Failure! ";
}
public void setService(MyService service) {
this.service = service;
}
}
MyService
public class MyService {
public void doSth(String thing) {
System.out.println("doSth->" + thing + "->begin");
String threadName = Thread.currentThread().getName();
System.out.println(threadName + ":doSth->" + thing + "->doing");
try {
Thread.sleep(15000);//这里让线程等待15s,保持线程不释放
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("doSth->" + thing + "->end");
}
}
MyTest
public class MyTest {
@Test
public void test() throws Exception {
MyService service = new MyService();
List<Future<String>> fl = new ArrayList<>(10);
for (int i = 0; i < 15; i++) {
Thread.sleep(1000);
MyCommand command = new MyCommand("TestGroup", "fishing" + (i * i));
command.setService(service);
Observable<String> observable = command.toObservable();
observable.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
System.out.println("执行command发生错误!");
e.printStackTrace();
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
}
}
}
输出结果
doSth->fishing0->begin
hystrix-myCommandThreadPool-1:doSth->fishing0->doing
doSth->fishing1->begin
hystrix-myCommandThreadPool-2:doSth->fishing1->doing
doSth->fishing4->begin
hystrix-myCommandThreadPool-3:doSth->fishing4->doing
doSth->fishing9->begin
hystrix-myCommandThreadPool-4:doSth->fishing9->doing
doSth->fishing16->begin
hystrix-myCommandThreadPool-5:doSth->fishing16->doing
doSth->fishing25->begin
hystrix-myCommandThreadPool-6:doSth->fishing25->doing
doSth->fishing36->begin
hystrix-myCommandThreadPool-7:doSth->fishing36->doing
doSth->fishing49->begin
hystrix-myCommandThreadPool-8:doSth->fishing49->doing
doSth->fishing64->begin
hystrix-myCommandThreadPool-9:doSth->fishing64->doing
doSth->fishing81->begin
hystrix-myCommandThreadPool-10:doSth->fishing81->doing
fishing100 Failure!
fishing121 Failure!
fishing144 Failure!
fishing169 Failure!
fishing196 Failure!
可以看到从第10个开始就失败了,因为线程池的10个线程已经被全部占满了
②测试失败率导致快速失败
PercentCommand
public class PercentCommand extends HystrixCommand<String> {
private final String group;
private PercentService service;
private int seed;
public PercentCommand(String group, int seed) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(group))
.andCommandPropertiesDefaults(HystrixPropertiesCommandDefault.Setter()
.withCircuitBreakerErrorThresholdPercentage(50)//错误率超过50%,快速失败
.withExecutionTimeoutInMilliseconds(2000)
.withCircuitBreakerRequestVolumeThreshold(5)
.withCircuitBreakerSleepWindowInMilliseconds(30000)//30s后放部分流量过去重试
.withMetricsRollingPercentileBucketSize(10)
.withMetricsRollingPercentileWindowInMilliseconds(5000)));
this.group = group;
this.seed = seed;
}
@Override
protected String run() throws Exception {
return service.ex(seed);
}
public void setService(PercentService service) {
this.service = service;
}
@Override
protected String getFallback() {
return seed + "失败了";
}
}
PercentService
public class PercentService {
public String ex(int seed) {
if (seed < 10) {
throw new RuntimeException("抛个异常测试提高异常率!");
}
return seed + "想返回吗?";
}
}
PercentTest
public class PercentTest {
@Test
public void test() throws Exception {
PercentService service = new PercentService();
for (int i = 0; i < 15; i++) {
Thread.sleep(1000);
PercentCommand command = new PercentCommand("TestGroup", i);
command.setService(service);
Observable<String> observable = command.toObservable();
observable.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
System.out.println("eeeeeeeeeeeeee");
}
@Override
public void onNext(String s) {
System.out.println(s);
}
});
}
}
}
输出结果
......
09:35:04.057 [hystrix-TestGroup-5] DEBUG com.netflix.hystrix.AbstractCommand - Error executing HystrixCommand.run(). Proceeding to fallback logic ...
java.lang.RuntimeException: 抛个异常测试提高异常率!
at com.helian.techaggrhystrix.errorpercent.PercentService.ex(PercentService.java:15)
at com.helian.techaggrhystrix.errorpercent.PercentCommand.run(PercentCommand.java:31)
at com.helian.techaggrhystrix.errorpercent.PercentCommand.run(PercentCommand.java:10)
at com.netflix.hystrix.HystrixCommand$2.call(HystrixCommand.java:302)
at com.netflix.hystrix.HystrixCommand$2.call(HystrixCommand.java:298)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:46)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10151)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:51)
at rx.internal.operators.OnSubscribeDefer.call(OnSubscribeDefer.java:35)
at rx.Observable.unsafeSubscribe(Observable.java:10151)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:41)
at rx.internal.operators.OnSubscribeDoOnEach.call(OnSubscribeDoOnEach.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.unsafeSubscribe(Observable.java:10151)
at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94)
at com.netflix.hystrix.strategy.concurrency.HystrixContexSchedulerAction$1.call(HystrixContexSchedulerAction.java:56)
at com.netflix.hystrix.strategy.concurrency.HystrixContexSchedulerAction$1.call(HystrixContexSchedulerAction.java:47)
at com.netflix.hystrix.strategy.concurrency.HystrixContexSchedulerAction.call(HystrixContexSchedulerAction.java:69)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
4失败了
5失败了
6失败了
7失败了
8失败了
9失败了
10失败了
11失败了
12失败了
13失败了
14失败了
---------------------------------------------------------------------------
在service里面我们直接抛出异常,test里每隔1秒发出一个请求,前面的5个请求都执行了service的代码,
抛出了异常,到达第五个的时候,熔断器监控到失败率高达100%,那么熔断器打开,执行快速失败,
直接执行getFailBack方法了,配合上面的withCircuitBreakerSleepWindowInMilliseconds(30000),
默认是5s的,也就是30s才会去重试,所以后面的全部快速失败了,试着更改这个值为3000看看,
结果应该不一样
网友评论