配置
<properties>
<!-- 依赖版本 -->
<hystrix.version>1.3.16</hystrix.version>
<hystrix-metrics-event-stream.version>1.1.2</hystrix-metrics-event-stream.version>
</properties>
<dependencies>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>${hystrix.version}</version>
</dependency>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-metrics-event-stream</artifactId>
<version>${hystrix-metrics-event-stream.version}</version>
</dependency>
</dependencies>
使用
超时降级
package com.fulton_shaw;
import com.fulton_shaw.common.util.concurrent.ThreadUtils;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;
/**
* @author xiaohuadong
* @date 2018/11/06
*/
public class FallbackCommand extends HystrixCommand<String> {
public FallbackCommand(String name){
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(name))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationThreadTimeoutInMilliseconds(500)));
}
@Override
protected String getFallback() {
return "FALLBACK";
}
protected String run() throws Exception {
ThreadUtils.sleepRandom(300,700);
return "SUCCEED RUN";
}
}
package com.fulton_shaw;
import com.fulton_shaw.common.util.concurrent.ExpectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
/**
* @author xiaohuadong
* @date 2018/11/06
*/
public class FallbackMain {
private static final Logger LOG = LoggerFactory.getLogger(FallbackMain.class);
public static void main(String[] args) {
ExpectUtils.untilTrueCount(20, new Callable<Boolean>() {
public Boolean call() throws Exception {
FallbackCommand command = new FallbackCommand("will-you-fallback");
String result = command.execute();
LOG.info("{}", result);
return "FALLBACK".equals(result);
}
});
LOG.info("end");
System.exit(0);
}
}
基本设计
接口+Factory+intern构成一个Hystrix的全局注册中心
同时,将UnitTest放到类的后面,以便发布时,将测试包含在内。
很大量是依赖RxJava库,其中,Observable,Operator非常经典。
HystrixCommand运行时,是处于线程池中的。
服务场景
1.单个服务熔断:一个服务正常能够接受2个并发访问,测试4个并发访问,带有超时的场景
2.服务之间存在依赖的熔断:服务A正常接受5个并发访问,服务B接受3个并发访问,服务C接受2个并发访问,服务A依赖B,B依赖C。
单服务模式下Hystrix的使用测试
构造ThroughputService代表一般的服务,该服务能够在一定时间内返回,同时,有一个支持的最大并发量上限
package com.fulton_shaw.common.lang.testing;
import com.fulton_shaw.common.lang.FixedValueGetter;
import com.fulton_shaw.common.lang.ValueGetter;
import com.fulton_shaw.common.util.concurrent.ThreadUtils;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author xiaohuadong
* @date 2018/11/08
*/
@ThreadSafe
public class ThroughPutService implements Service {
private static final Logger LOG = LoggerFactory.getLogger(ThroughPutService.class);
/* 服务时间 , 单位:ms 小于等于0时,无需等待 */
private ValueGetter<Long> serviceTimeStrategy;
/* 最大并发数 , 小于等于0时,无限制*/
private final int maxConcurrent;
/* 服务调用 */
private Runnable beforeService;
private Runnable afterService;
/* 服务拒绝时调用 */
private Runnable onDeny;
/* 服务异常时调用 */
private Function<Exception,Void> onException;
private AtomicInteger currentConcurrent;
/**
* 调用服务
*
* @return 服务是否成功
*/
public boolean tryService() {
boolean canService = false;
LOG.debug("{} --> beginning tryService,current concurrent = {}",ThreadUtils.getCurrentThreadName(),currentConcurrent.get());
if (maxConcurrent <= 0) {
currentConcurrent.incrementAndGet();
canService = true;
} else {
// CAS
while (true) {
int old = currentConcurrent.get();
if(old >= maxConcurrent){
break;
}else if(currentConcurrent.compareAndSet(old, old + 1)){
canService = true;
break;
}
}
}
try {
if (canService) {
LOG.debug("{} --> gain tryService,current concurrent = {}",ThreadUtils.getCurrentThreadName(),currentConcurrent.get());
beforeService.run();
Long sleep = serviceTimeStrategy.get();
LOG.debug("{} --> will sleep {} ms",ThreadUtils.getCurrentThreadName(),sleep);
ThreadUtils.sleep(sleep);
currentConcurrent.decrementAndGet();
afterService.run();
} else {
onDeny.run();
}
}catch (Exception e){
try {
onException.apply(e);
}catch (Exception ae){
LOG.error("error happened while handing {}",e.toString(),ae);
}
}
LOG.debug("{} --> end tryService,current concurrent = {}",ThreadUtils.getCurrentThreadName(),currentConcurrent.get());
return canService;
}
}
ThroughPutService类的tryService方法尝试调用一个服务,如果当前并发量尚未达到上限,则服务,否则,调用onDeny。若有错误发生,则调用onException。
下面是对该Service的一个测试
public class ThroughPutServiceMain {
public static void main(String[] args) {
final ThroughPutService service = ThroughPutService.makeService(new RandomLongGetter(300, 500), 2);
ThreadUtils.startThreadForTimesAndWait(5, new ProcessMethod<Integer>() {
@Nullable
@Override
public void apply(@Nullable Integer input) {
service.tryService();
}
});
}
}
serivce的服务时间在300~500毫秒之间,随机返回。最大允许的并发量是2。测试程序开启了5个线程来消费这个服务,下面的结果显示,只有其中两个线程得到服务,其他均被拒绝。
得到服务的是Thread-2和Thread-3, gain tryService.
输出结果:
15:39:46.789 [Thread-2] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-2 --> beginning tryService,current concurrent = 0
15:39:46.789 [Thread-4] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-4 --> beginning tryService,current concurrent = 0
15:39:46.789 [Thread-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-3 --> beginning tryService,current concurrent = 0
15:39:46.789 [Thread-5] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-5 --> beginning tryService,current concurrent = 0
15:39:46.789 [Thread-1] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-1 --> beginning tryService,current concurrent = 0
15:39:46.791 [Thread-5] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-5 --> end tryService,current concurrent = 2
15:39:46.791 [Thread-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-3 --> gain tryService,current concurrent = 2
15:39:46.791 [Thread-2] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-2 --> gain tryService,current concurrent = 2
15:39:46.791 [Thread-1] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-1 --> end tryService,current concurrent = 2
15:39:46.791 [Thread-4] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-4 --> end tryService,current concurrent = 2
15:39:46.791 [Thread-2] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-2 --> will sleep 415 ms
15:39:46.791 [Thread-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-3 --> will sleep 461 ms
15:39:47.206 [Thread-2] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-2 --> end tryService,current concurrent = 1
15:39:47.252 [Thread-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-3 --> end tryService,current concurrent = 0
ThroughPutService不能服务的情况有一种:就是最大并发量。
但是对于客户端而言,ThroughPutService不能服务还包括一种情况:ThroughPutService本身的服务质量--响应时间超时。
HystrixCommand封装ThroughPutService
我们首先识别出ThroughPutService服务不可用的情况,就是:1.超过服务的最大并发量 2.服务本身超时
在下面的Command的封装中,run会调用service,但是service可能返回false,因此,在返回false的情况下,我们应当抛出异常,表明服务不可用,迫使Hystrix转向getFallback()调用。
此外,即使service正常可用,但是服务时间超时,Hystrix本身会对run进行超时判断,因此,它也会转向getFallback()
package com.fulton_shaw.third_party_demo.hystrix;
public class SingleServiceCommand extends HystrixCommand<Void> {
private static final Logger LOG = LoggerFactory.getLogger(SingleServiceCommand.class);
private ThroughPutService service;
protected SingleServiceCommand(ThroughPutService service,long time) {
super(HystrixCommand.Setter.
withGroupKey(HystrixCommandGroupKey.Factory.asKey(SingleServiceCommand.class.getSimpleName()))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutEnabled(true).withExecutionTimeoutInMilliseconds((int)time))
);
this.service = service;
}
@Override
protected Void run() throws Exception {
boolean succeed = service.tryService();
LOG.info("{} --> tryService succeed? {} ",ThreadUtils.getCurrentThreadName(),succeed);
if(!succeed){
throw new RuntimeException(String.format("%s --> try service rejected", ThreadUtils.getCurrentThreadName()));
}
return null;
}
@Override
protected Void getFallback() {
LOG.info("{} --> getting Fallback",Thread.currentThread().getName());
return null;
}
}
测试封装的HystrixCommand
package com.fulton_shaw.third_party_demo.hystrix;
public class SingleServiceDemo {
private static final Logger LOG = LoggerFactory.getLogger(SingleServiceDemo.class);
public static void main(String[] args) {
final ThroughPutService service = ThroughPutService.makeService(new RandomLongGetter(1000, 3000), 2);
final AtomicInteger denied = new AtomicInteger(0);
service.setOnDeny(new Runnable() {
@Override
public void run() {
denied.incrementAndGet();
}
});
int timeout = 2000;
int commandCount = 4;
final SingleServiceCommand[] commands = new SingleServiceCommand[commandCount];
for (int i = 0; i < commandCount; i++) {
commands[i] = new SingleServiceCommand(service,timeout);
}
ThreadUtils.startThreadForTimesAndWait(4, new ProcessMethod<Integer>() {
@Nullable
@Override
public void apply(@Nullable Integer i) {
commands[i].execute();
}
});
LOG.info("concurrent denied = {}", denied.get());
}
}
在上面的程序中,我们新建了一个服务,随机返回时间是1000-3000ms之间,我们将服务的超时时间设置为2000ms,然后使用denied变量记录服务被拒绝的数量。然后,我们新建了4个对该服务的调用实例,由Hystrix管理。
从下面的输出结果中,我们可以看出4和3获得服务执行,其中4超时,3没有超时,1和2均被拒绝服务。
由于1,2被拒绝服务,因此他们抛出异常之后,Hystrix转向了Fallback,我们可以看到的时,抛出异常之后,getFallback仍然处于同一线程之中。
但是4转向Fallback之后,却是由HystrixTimer-1来调用的,而原来的线程仍然继续运行,我们可以看到4最后也输出了 end tryService.
输出结果:
16:22:04.243 [hystrix-SingleServiceCommand-4] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-4 --> beginning tryService,current concurrent = 0
16:22:04.244 [hystrix-SingleServiceCommand-4] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-4 --> gain tryService,current concurrent = 1
16:22:04.243 [hystrix-SingleServiceCommand-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-3 --> beginning tryService,current concurrent = 0
16:22:04.244 [hystrix-SingleServiceCommand-4] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-4 --> will sleep 2665 ms
16:22:04.244 [hystrix-SingleServiceCommand-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-3 --> gain tryService,current concurrent = 2
16:22:04.244 [hystrix-SingleServiceCommand-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-3 --> will sleep 1284 ms
16:22:04.244 [hystrix-SingleServiceCommand-1] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-1 --> beginning tryService,current concurrent = 2
16:22:04.244 [hystrix-SingleServiceCommand-1] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-1 --> end tryService,current concurrent = 2
16:22:04.244 [hystrix-SingleServiceCommand-1] INFO com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - hystrix-SingleServiceCommand-1 --> tryService succeed? false
16:22:04.244 [hystrix-SingleServiceCommand-2] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-2 --> beginning tryService,current concurrent = 2
16:22:04.244 [hystrix-SingleServiceCommand-2] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-2 --> end tryService,current concurrent = 2
16:22:04.244 [hystrix-SingleServiceCommand-2] INFO com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - hystrix-SingleServiceCommand-2 --> tryService succeed? false
16:22:04.245 [hystrix-SingleServiceCommand-1] DEBUG com.netflix.hystrix.AbstractCommand - Error executing HystrixCommand.run(). Proceeding to fallback logic ...
java.lang.RuntimeException: hystrix-SingleServiceCommand-1 --> try service rejected
at com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand.run(SingleServiceCommand.java:34) ~[classes/:?]
...
16:22:04.245 [hystrix-SingleServiceCommand-2] DEBUG com.netflix.hystrix.AbstractCommand - Error executing HystrixCommand.run(). Proceeding to fallback logic ...
java.lang.RuntimeException: hystrix-SingleServiceCommand-2 --> try service rejected
at com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand.run(SingleServiceCommand.java:34) ~[classes/:?]
...
16:22:04.254 [hystrix-SingleServiceCommand-2] INFO com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - hystrix-SingleServiceCommand-2 --> getting Fallback
16:22:04.257 [hystrix-SingleServiceCommand-1] INFO com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - hystrix-SingleServiceCommand-1 --> getting Fallback
16:22:05.528 [hystrix-SingleServiceCommand-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-3 --> end tryService,current concurrent = 1
16:22:05.528 [hystrix-SingleServiceCommand-3] INFO com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - hystrix-SingleServiceCommand-3 --> tryService succeed? true
16:22:06.231 [hystrix-SingleServiceCommand-4] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-4 --> end tryService,current concurrent = 0
16:22:06.231 [hystrix-SingleServiceCommand-4] INFO com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - hystrix-SingleServiceCommand-4 --> tryService succeed? true
16:22:06.232 [HystrixTimer-1] INFO com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - HystrixTimer-1 --> getting Fallback
16:22:06.233 [main] INFO com.fulton_shaw.third_party_demo.hystrix.SingleServiceDemo - concurrent denied = 2
多服务和服务存在依赖的场景测试
2.服务之间存在依赖的熔断:服务A正常接受5个并发访问,服务B接受3个并发访问,服务C接受2个并发访问,服务A依赖B,B依赖C。
只需要将服务
参考
https://www.jianshu.com/p/fc19f6ed6d0d
https://www.jianshu.com/p/05f3e75b3739
Hystrix中的设计哲学
Hystrix使用总结
execute方法会开启新的线程执行,当前线程阻塞等待那个线程。
所谓信号量隔离,是指当服务并发量大于信号量设定的值时,自动降级
熔断是指降级的终极情况,
- 假设线路内的容量(请求QPS)达到一定阈值(通过 HystrixCommandProperties.circuitBreakerRequestVolumeThreshold() 配置)
- 同时,假设线路内的错误率达到一定阈值(通过 HystrixCommandProperties.circuitBreakerErrorThresholdPercentage() 配置)
- 熔断器将从『闭路』转换成『开路』
- 若此时是『开路』状态,熔断器将短路后续所有经过该熔断器的请求,这些请求直接走『失败回退逻辑』
- 经过一定时间(即『休眠窗口』,通过 HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds() 配置),后续第一个请求将会被允许通过熔断器(此时熔断器处于『半开』状态),若该请求失败,熔断器将又进入『开路』状态,且在休眠窗口内保持此状态;若该请求成功,熔断器将进入『闭路』状态,回到逻辑1循环往复。
其他特性
缓存
组合请求
Dashboard监控
网友评论