SpringCloud 命中CAP理论中的AP,
当调用某个服务接口时,Hystrix 会创建一个接口线程池来进行隔离相关服务。需要调用该接口的服务,都是在消费当前服务的线程池。
1 命令模式降级:
需要继承HystrixCommand<>方法
public class CommandForIndex extends HystrixCommand<Object> {
private final RestTemplate restTemplate;
public CommandForIndex(RestTemplate restTemplate) {
super(Setter
//这个是必填项,指定命令分组名,主要意义是用于统计(比如这是商城系统中的商品服务)
.withGroupKey(HystrixCommandGroupKey.Factory.asKey("Prpduct-Group"))
//依赖名称(如果是服务调用,这里就写具体的接口名,如果是自定的操作,就自己命令),默认是command实现类的类名,熔断就是根据这个名称
.andCommandKey(HystrixCommandKey.Factory.asKey("ClientController"))
//线程池命名,默认就是HystrixCommandGroupKey的名称,线程池配置就是根据这个名称
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("UserThreadPool"))
//command 熔断相关参数配置 超时时间1000毫秒
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(1000))
//设置线程池参数
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
//线程池大小为8
.withCoreSize(8)
//允许缓冲区大小1024
.withMaxQueueSize(1024))
);
this.restTemplate = restTemplate;
}
@Override
protected Object run() throws Exception {
System.out.println("-------------command-----------" + Thread.currentThread().getId());
String result = restTemplate.getForObject("http://localhost:8080/server/todo", String.class);
System.out.println("--------------command finish------------result:" + result);
return result;
}
@Override
protected Object getFallback(){
System.out.println("我要降級了");
return "超時降級了";
}
}
如图所示 商城系统中用户在浏览商品,下单商品,支付时,都会有不同的线程池进行隔离。线程池的名称就是分组名,统计主要是指接口的调用情况判断是否需要降级或熔断。
12222.png设置线程池参数实现简单秒杀系统,比如将线程池参数设置中coreSize设置为1,maxQueyeSize设置成2时,每次只能有一个线程请求改服务,多的话放到阻塞队列中阻塞队列可以2个。
//设置线程池参数
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter()
//线程池大小为8
.withCoreSize(8)
//允许缓冲区大小1024
.withMaxQueueSize(1024))
通过测试用列模拟十个线程并发执行时,只有三个线程进了command 方法,其余的都被线程池直接拒绝执行了fallback方法。(是不是可以自定义线程池拒绝时的方法)
降级图示.png
hystrix 配置项
一般只需要在resources下新增一个config.properties配置文件就可以了
# Hystrix 默认加载的配置文件 - 限流、 熔断示例
# 线程池大小
hystrix.threadpool.default.coreSize=1
# 缓冲区大小, 如果为-1,则不缓冲,直接进行降级 fallback
hystrix.threadpool.default.maxQueueSize=200
# 缓冲区大小超限的阈值,超限就直接降级
hystrix.threadpool.default.queueSizeRejectionThreshold=2
# 执行策略
# 资源隔离模式,默认thread。 还有一种叫信号量
hystrix.command.default.execution.isolation.strategy=THREAD
# 是否打开超时
hystrix.command.default.execution.timeout.enabled=true
# 超时时间,默认1000毫秒
hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=1000
# 超时时中断线程(只是中断当前的服务,并没有中断调用端的服务)
hystrix.command.default.execution.isolation.thread.interruptOnTimeout=true
# 取消时候中断线程
hystrix.command.default.execution.isolation.thread.interruptOnFutureCancel=false
# 信号量模式下,最大并发量
hystrix.command.default.execution.isolation.semaphore.maxConcurrentRequests=2
# 降级策略
# 是否开启服务降级
hystrix.command.default.fallback.enabled=true
# fallback执行并发量
hystrix.command.default.fallback.isolation.semaphore.maxConcurrentRequests=100
# 熔断策略
# 启用/禁用熔断机制
hystrix.command.default.circuitBreaker.enabled=true
# 强制开启熔断(设置为true时,表示所有的请求都会熔断调)
hystrix.command.default.circuitBreaker.forceOpen=false
# 强制关闭熔断(设置为true时,表示所有的请求都降级了也不会熔断)
hystrix.command.default.circuitBreaker.forceClosed=false
# 前提条件,一定时间内发起一定数量的请求。 也就是5秒钟内(这个5秒对应下面的滚动窗口长度)至少请求3次,熔断器才发挥起作用。总数 默认20(一定时间内有3个请求)
hystrix.command.default.circuitBreaker.requestVolumeThreshold=3
# 错误百分比。达到或超过这个百分比,熔断器打开。 比如:5秒内有100请求,60个请求超时或者失败,就会自动开启熔断(一定的时间内有 50%的请求失败了)
hystrix.command.default.circuitBreaker.errorThresholdPercentage=50
# 10秒后,进入半打开状态(熔断开启,间隔一段时间后,会让一部分的命令去请求服务提供者,如果结果依旧是失败,则又会进入熔断状态,如果成功,就关闭熔断)。 默认5秒
hystrix.command.default.circuitBreaker.sleepWindowInMilliseconds=10000
# 度量策略
# 5秒为一次统计周期,术语描述:滚动窗口的长度为5秒
hystrix.command.default.metrics.rollingStats.timeInMilliseconds=5000
# 统计周期内 度量桶的数量,必须被timeInMilliseconds整除。作用:(5秒之内请求数量分成10份,也就相当于统计500ms内请求的数量,失败的比率)
hystrix.command.default.metrics.rollingStats.numBuckets=10
# 是否收集执行时间,并计算各个时间段的百分比
hystrix.command.default.metrics.rollingPercentile.enabled=true
# 设置执行时间统计周期为多久,用来计算百分比
hystrix.command.default.metrics.rollingPercentile.timeInMilliseconds=60000
# 执行时间统计周期内,每个度量桶最多统计多少条记录。设置为50,有100次请求,则只会统计最近的10次
hystrix.command.default.metrics.rollingPercentile.bucketSize=100
# 数据取样时间间隔
hystrix.command.default.metrics.healthSnapshot.intervalInMilliseconds=500
# 设置是否缓存请求,request-scope内缓存
hystrix.command.default.requestCache.enabled=false
# 设置HystrixCommand执行和事件是否打印到HystrixRequestLog中
hystrix.command.default.requestLog.enabled=false
######DnUser-ThreadPool特定配置
# hystrix.threadpool.DnUser-ThreadPool.coreSize=20
# hystrix.threadpool.DnUser-ThreadPool.maxQueueSize=1000
# 超过就报错
# hystrix.threadpool.DnUser-ThreadPool.queueSizeRejectionThreshold=800
3 采用注解的方式来实现服务降级
@RequestMapping("todo")
@HystrixCommand(
//线程池相关配置
threadPoolProperties = {@HystrixProperty(name = "coreSize",value = "1"),@HystrixProperty(name="queueSizeRejectionThreshold",value = "1")},
//配置超时时间
commandProperties = {@HystrixProperty(name="execution.isolation.thread.timeoutInMilliseconds",value = "100")})
public String todo() {
}
Hystrix 执行流程
486074-20170223143405070-2032397754.png代码实现:
在AbstractCommand主要的三个初始化分别为:
this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);
1 初始化度量桶initMetrics():主要起到聚合作用,将一段时间内的执行结果进行汇总也就是上图report metrics的过程,具体设置见配置文件中的度量策略的配置项。
2 初始化短路器initCircuitBreaker: HystrixCircuitBreaker接口三个抽象方法分别为:
public boolean allowRequest();
public boolean isOpen();
void markSuccess();
该接口的实现类主要有:HystrixCircuitBreakerImpl 和 NoOpCircuitBreaker,其中NoOpCircuitBreaker实现的是一个固定值,没有具体做任何判断
static class NoOpCircuitBreaker implements HystrixCircuitBreaker {
public boolean allowRequest() {
return true;
}
@Override
public boolean isOpen() {
return false;
}
@Override
public void markSuccess() {
}
}
主要实现熔断机制是通过 HystrixCircuitBreakerImpl
public void markSuccess() {
if (circuitOpen.get()) {
if (circuitOpen.compareAndSet(true, false)) {
metrics.resetStream();
}
}
}
@Override
public boolean allowRequest() {
// 首先判断断路器是否强制打开,如果强制打开就会永远处于断路状态
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
// 首先判断断路器是否强制关闭,如果强制打开就不会永远处于断路状态
if (properties.circuitBreakerForceClosed().get()) {
isOpen();
return true;
}
return !isOpen() || allowSingleTest();
}
//半开启状态判断
public boolean allowSingleTest() {
// 获取上一次断路器打开的时间
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
//断路器是否打开&&断路器上一次打开的时间+系统配置断路器多长时间进入半开启状态>当前时间
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
// CAS操作重新设置半打开的时间
if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
return true;
}
}
return false;
}
//是否打开的判断
@Override
public boolean isOpen() {
if (circuitOpen.get()) {
return true;
}
HealthCounts health = metrics.getHealthCounts();
//度量桶中统计的失败的请求小于配置请求量时,是没有打开的
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
return false;
}
//度量桶中的错误百分比小于配置时,也不会打开
if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
return false;
} else {
//如果错误百分比大于配置时,就要使用CASj将断路器打开,设置断路器打开的时间
if (circuitOpen.compareAndSet(false, true)) {
circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
return true;
} else {
return true;
}
}
}
HystrixCommandAspect:
hystrix是对方法的增强,入口就是在该类上。环绕增加的方法:
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
Method method = getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
"annotations at the same time");
}
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
Object result;
try {
if (!metaHolder.isObservable()) {
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause();
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
}
网友评论