1、前言
看别人 RPC 框架代码有熔断器的代码,但是对于熔断器并不是很了解,于是了解一下熔断器设计。熔断器跟限流器不同,它主要是保证服务调用者在调用异常服务时,快速返回失败结果,避免大量的同步等待,造成服务雪崩,并且能在一段时间后继续侦测请求执行结果, 提供恢复服务调用的可能。而限流器是单纯的限流,不让大量流量把服务搞挂调,服务本身是正常的。
熔断器的基本原理如下:

熔断器本身就是一个状态机。
1.关闭状态:熔断器的初始化状态,该状态下允许请求通过。当失败超过阀值,转入打开状态,
2.打开状态:熔断状态,该状态下不允许请求通过,当进入该状态经过一段时间,进入半开状态。
3.半开状态:在半开状态期间,允许部分请求通过,在半开期间,观察失败状态是否超过阀值。如果没有超过进入关闭状态,如果超过了进入关闭状态。如此往复
2、设计
整个设计使用状态模式(即外部表现行为是 CircuitBreaker,内部状态变化由各个熔断器的 state 类变化),首先着眼点应该在 CircuitBreaker 上,由他理解各个状态的变化过程。
熔断器对外暴露接口:
package com.example.demo.circulbreaker;
/**
* 熔断器接口
*/
public interface CircuitBreaker {
/**
* 重置熔断器
*/
void reset();
/**
* 是否允许通过熔断器
*/
boolean canPassCheck();
/**
* 统计失败次数
*/
void countFailNum();
}
熔断器公共抽象类:
package com.example.demo.circulbreaker;
/**
* User: Rudy Tan
* Date: 2018/9/21
*
* 基础熔断器
*/
public abstract class AbstractCircuitBreaker implements CircuitBreaker {
/**
* 熔断器当前状态,声明成 volatile 是防止更改状态的时候,其他线程拿着旧状态做操作
*/
private volatile CBState state = new CloseCBState();
/**
* 在熔断器关闭的情况下,在多少秒内失败多少次进入,熔断打开状态(默认10分钟内,失败10次进入打开状态)
*/
public String thresholdFailRateForClose = "10/600";
/**
* 在熔断器打开的情况下,熔断多少秒进入半开状态,(默认熔断30分钟)
*/
public int thresholdIdleTimeForOpen = 1800;
/**
* 在熔断器半开的情况下, 在多少秒内放多少次请求,去试探(默认10分钟内,放10次请求)
*/
public String thresholdPassRateForHalfOpen = "10/600";
/**
* 在熔断器半开的情况下, 试探期间,如果有超过多少次失败的,重新进入熔断打开状态,否者进入熔断关闭状态。
*/
public int thresholdFailNumForHalfOpen = 1;
public CBState getState() {
return state;
}
public void setState(CBState state) {
// 当前状态不能切换为当前状态
CBState currentState = getState();
if (currentState.getStateName().equals(state.getStateName())){
return;
}
// 多线程环境加锁
synchronized (this){
// 二次判断
currentState = getState();
if (currentState.getStateName().equals(state.getStateName())){
return;
}
// 更新状态
this.state = state;
System.out.println("熔断器状态转移:" + currentState.getStateName() + "->" + state.getStateName());
}
}
}
熔断器实现类(各种实现都行,作者是用来简化实现):
package com.example.demo.circulbreaker;
/**
* User: Rudy Tan
* Date: 2018/9/22
*
* 本地熔断器(把它当成了工厂了)
*/
public class LocalCircuitBreaker extends AbstractCircuitBreaker {
public LocalCircuitBreaker(String failRateForClose,
int idleTimeForOpen,
String passRateForHalfOpen, int failNumForHalfOpen){
this.thresholdFailRateForClose = failRateForClose;
this.thresholdIdleTimeForOpen = idleTimeForOpen;
this.thresholdPassRateForHalfOpen = passRateForHalfOpen;
this.thresholdFailNumForHalfOpen = failNumForHalfOpen;
}
public void reset() {
this.setState(new CloseCBState());
}
public boolean canPassCheck() {
return getState().canPassCheck(this);
}
public void countFailNum() {
getState().countFailNum(this);
}
}
熔断器状态接口:
package com.example.demo.circulbreaker;
/**
* 熔断器状态
*/
public interface CBState {
/**
* 获取当前状态名称
*/
String getStateName();
/**
* 检查以及校验当前状态是否需要扭转
*/
void checkAndSwitchState(AbstractCircuitBreaker cb);
/**
* 是否允许通过熔断器
*/
boolean canPassCheck(AbstractCircuitBreaker cb);
/**
* 统计失败次数
*/
void countFailNum(AbstractCircuitBreaker cb);
}
各个熔断器状态:
关闭状态:
package com.example.demo.circulbreaker;
import java.util.concurrent.atomic.AtomicInteger;
/**
* User: Rudy Tan
* Date: 2018/9/21
*
* 熔断器-关闭状态
*/
public class CloseCBState implements CBState {
/**
* 进入当前状态的初始化时间
*/
private long stateTime = System.currentTimeMillis();
/**
* 关闭状态,失败计数器,以及失败计数器初始化时间
*/
private AtomicInteger failNum = new AtomicInteger(0);
private long failNumClearTime = System.currentTimeMillis();
public String getStateName() {
// 获取当前状态名称
return this.getClass().getSimpleName();
}
public void checkAndSwitchState(AbstractCircuitBreaker cb) {
// 阀值判断,如果失败到达阀值,切换状态到打开状态
long maxFailNum = Long.valueOf(cb.thresholdFailRateForClose.split("/")[0]);
if (failNum.get() >= maxFailNum){
cb.setState(new OpenCBState());
}
}
public boolean canPassCheck(AbstractCircuitBreaker cb) {
// 关闭状态,请求都应该允许通过
return true;
}
public void countFailNum(AbstractCircuitBreaker cb) {
// 检查计数器是否过期了,如果过期重新计数
long period = Long.valueOf(cb.thresholdFailRateForClose.split("/")[1]) * 1000;
long now = System.currentTimeMillis();
// 过期
if (failNumClearTime + period <= now){
failNum.set(0);
}
// 失败计数
failNum.incrementAndGet();
// 检查是否切换状态
checkAndSwitchState(cb);
}
}
打开状态:
package com.example.demo.circulbreaker;
/**
* User: Rudy Tan
* Date: 2018/9/21
*
* 熔断器-打开状态
*/
public class OpenCBState implements CBState {
/**
* 进入当前状态的初始化时间
*/
private long stateTime = System.currentTimeMillis();
public String getStateName() {
// 获取当前状态名称
return this.getClass().getSimpleName();
}
public void checkAndSwitchState(AbstractCircuitBreaker cb) {
// 打开状态,检查等待时间是否已到,如果到了就切换到半开状态
long now = System.currentTimeMillis();
long idleTime = cb.thresholdIdleTimeForOpen * 1000L;
if (stateTime + idleTime <= now){
cb.setState(new HalfOpenCBState());
}
}
public boolean canPassCheck(AbstractCircuitBreaker cb) {
// 检测状态
checkAndSwitchState(cb);
return false;
}
public void countFailNum(AbstractCircuitBreaker cb) {
// nothing
}
}
半开状态:
package com.example.demo.circulbreaker;
import java.util.concurrent.atomic.AtomicInteger;
/**
* User: Rudy Tan
* Date: 2018/9/21
*
* 熔断器-半开状态
*/
public class HalfOpenCBState implements CBState {
/**
* 进入当前状态的初始化时间
*/
private long stateTime = System.currentTimeMillis();
/**
* 半开状态,失败计数器
*/
private AtomicInteger failNum = new AtomicInteger(0);
/**
* 半开状态,允许通过的计数器
*/
private AtomicInteger passNum = new AtomicInteger(0);
public String getStateName() {
// 获取当前状态名称
return this.getClass().getSimpleName();
}
public void checkAndSwitchState(AbstractCircuitBreaker cb) {
// 判断半开时间是否结束
long idleTime = Long.valueOf(cb.thresholdPassRateForHalfOpen.split("/")[1]) * 1000L;
long now = System.currentTimeMillis();
if (stateTime + idleTime <= now){
// 如果半开状态已结束,失败次数是否超过了阀值
int maxFailNum = cb.thresholdFailNumForHalfOpen;
if (failNum.get() >= maxFailNum){
// 失败超过阀值,认为服务没有恢复,重新进入熔断打开状态
cb.setState(new OpenCBState());
}else {
// 没超过,认为服务恢复,进入熔断关闭状态
cb.setState(new CloseCBState());
}
}
}
public boolean canPassCheck(AbstractCircuitBreaker cb) {
// 检查是否切换状态
checkAndSwitchState(cb);
// 超过了阀值,不再放量
int maxPassNum = Integer.valueOf(cb.thresholdPassRateForHalfOpen.split("/")[0]);
if (passNum.get() > maxPassNum){
return false;
}
// 检测是否超过了阀值
if (passNum.incrementAndGet() <= maxPassNum){
return true;
}
return false;
}
public void countFailNum(AbstractCircuitBreaker cb) {
// 失败计数
failNum.incrementAndGet();
// 检查是否切换状态
checkAndSwitchState(cb);
}
}
测试类:
package com.example.demo.circulbreaker;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
/**
* User: Rudy Tan
* Date: 2018/8/27
*/
public class App {
public static void main(String[] args) throws InterruptedException {
final int maxNum = 200;
final CountDownLatch countDownLatch = new CountDownLatch(maxNum);
final CircuitBreaker circuitBreaker = new LocalCircuitBreaker("5/20", 10, "5/10", 2);
for (int i=0; i < maxNum; i++){
new Thread(new Runnable() {
public void run() {
// 模拟随机请求
try {
Thread.sleep(new Random().nextInt(20) * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try{
// 过熔断器
if (circuitBreaker.canPassCheck()){
// do something
System.out.println("正常业务逻辑操作");
// 模拟后期的服务恢复状态
if (countDownLatch.getCount() >= maxNum/2){
// 模拟随机失败
if (new Random().nextInt(2) == 1){
throw new Exception("mock error");
}
}
} else {
System.out.println("拦截业务逻辑操作");
}
}catch (Exception e){
System.out.println("业务执行失败了");
// 熔断器计数器
circuitBreaker.countFailNum();
}
countDownLatch.countDown();
}
}).start();
// 模拟随机请求
try {
Thread.sleep(new Random().nextInt(5) * 100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
countDownLatch.await();
System.out.println("end");
}
}
网友评论