CircuitBreaker
CircuitBreaker主要由以下几个部分组成:配置、注册器、熔断器、度量指标、事件发布及熔断器的工作原理。接下来会逐一介绍。
CircuitBreaker配置
基本配置
首先是CircuitBreaker的一些可配置项,在CircuitBreakerConfig中:
CircuitBreaker这是一个独立的类,里面包含熔断器的可配置项,提供了一个内部类Builder来构建配置,主要通过三个方法实现:
- ofDefaults():使用默认配置
- custom():返回一个Builder对象
- from(CircuitBreakerConfig baseConfig):返回一个包含baseConfig的Builder对象
得到Builder对象后就可以根据Builder提供的方法构建配置
SpringBoot配置
CircuitBreakerProperties
首先是CircuitBreakerProperties,整体继承关系如下图:
properties继承关系在CircuitBreakerProperties类中添加了@ConfigurationProperties(prefix = "resilience4j.circuitbreaker")注解,是读取yml或properties文件中配置的入口。
CircuitBreakerConfigurationProperties添加了@Configuration注解,并在其中定义了Aspect的顺序,即之前提到的注解使用熔断器、限流器、重试组件和隔板组件的切入顺序。
CircuitBreakerConfigurationProperties基类的CircuitBreakerConfigurationProperties包含了两个Map<String, InstanceProperties> instances、Map<String, InstanceProperties> configs以及一个内部静态类InstanceProperties。InstanceProperties中除了熔断器的各项配置外,还有一个baseConfig的可配置项,在yml和properties文件中的配置最终都会进入instances、configs这两个map中。例如如下的yml文件:
resilience4j:
circuitbreaker:
configs:
myDefault:
ringBufferSizeInClosedState: 10 # 熔断器关闭时的缓冲区大小
ringBufferSizeInHalfOpenState: 5 # 熔断器半开时的缓冲区大小
waitDurationInOpenState: 10000 # 熔断器从打开到半开需要的时间
failureRateThreshold: 60 # 熔断器打开的失败阈值
eventConsumerBufferSize: 10 # 事件缓冲区大小
registerHealthIndicator: true # 健康监测
automaticTransitionFromOpenToHalfOpenEnabled: false # 是否自动从打开到半开
recordFailurePredicate: com.example.resilience4j.predicate.RecordFailurePredicate # 谓词设置异常是否为失败
recordExceptions: # 记录的异常
- com.example.resilience4j.exceptions.BusinessBException
ignoreExceptions: # 忽略的异常
- com.example.resilience4j.exceptions.BusinessAException
instances:
backendA:
baseConfig: myDefault
waitDurationInOpenState: 5000
failureRateThreshold: 20
myDefault将作为key,failureRateThreshold等将作为InstanceProperties对象中的属性一起存入configs这个Map中,instances也是如此。
CircuitBreakerConfigurationProperties提供了一个createCircuitBreakerConfig方法用于创建CircuitBreakerConfig:
public CircuitBreakerConfig createCircuitBreakerConfig(InstanceProperties instanceProperties) {
// 如果配置项中有baseConfig,就去configs中找到baseConfig
if (StringUtils.isNotEmpty(instanceProperties.getBaseConfig())) {
InstanceProperties baseProperties = configs.get(instanceProperties.getBaseConfig());
if (baseProperties == null) {
throw new ConfigurationNotFoundException(instanceProperties.getBaseConfig());
}
//调用buildConfigFromBaseConfig方法使用instanceProperties覆盖baseProperties
return buildConfigFromBaseConfig(instanceProperties, baseProperties);
}
//如果没有调用buildConfig方法返回CircuitBreakerConfig
return buildConfig(CircuitBreakerConfig.custom(), instanceProperties);
}
private CircuitBreakerConfig buildConfigFromBaseConfig(InstanceProperties instanceProperties, InstanceProperties baseProperties) {
ConfigUtils.mergePropertiesIfAny(instanceProperties, baseProperties);
//覆盖的实现调用了CircuitBreakerConfig的custom()方法和from()方法
CircuitBreakerConfig baseConfig = buildConfig(CircuitBreakerConfig.custom(), baseProperties);
return buildConfig(CircuitBreakerConfig.from(baseConfig), instanceProperties);
}
CircuitBreakerConfiguration
接下来看看这些配置是如何配置到注册器中的,在CircuitBreakerConfiguration中:
CircuitBreakerConfiguration CircuitBreakerConfigurationCircuitBreakerConfiguration中的circuitBreakerRegistry方法使用@Bean注解注入了注册器,方法中主要做了3件事:
-
创建注册器
public CircuitBreakerRegistry createCircuitBreakerRegistry(CircuitBreakerConfigurationProperties circuitBreakerProperties) { //把Map<String, InstanceProperties> 转换为Map<String, CircuitBreakerConfig> Map<String, CircuitBreakerConfig> configs = circuitBreakerProperties.getConfigs() .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> circuitBreakerProperties.createCircuitBreakerConfig(entry.getValue()))); //调用registry的静态of方法新建一个InMemoryCircuitBreakerRegistry,放入configurations,Map作为传入参数 return CircuitBreakerRegistry.of(configs); }
-
注册事件消费者:说到事件的时候再提
-
初始化注册器
public void initCircuitBreakerRegistry(CircuitBreakerRegistry circuitBreakerRegistry) {
//把circuitBreakerProperties中的instances中的配置先转换,再用注册器构建实例放入entryMap
circuitBreakerProperties.getInstances().forEach(
(name, properties) -> circuitBreakerRegistry.circuitBreaker(name, circuitBreakerProperties.createCircuitBreakerConfig(properties)));
}
动态配置
基于以上结果,若使用配置中心完成配置的动态刷新,刷新的内容是CircuitBreakerConfigurationProperties中的内容,若要使配置生效需要把CircuitBreakerConfigurationProperties中的配置添加到circuitBreakerRegistry中,并且替换之前的注册器实例,如下:
//1.获取更新的配置
Map<String, CircuitBreakerConfig> configs = circuitBreakerProperties.getConfigs()
.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
entry -> circuitBreakerProperties.createCircuitBreakerConfig(entry.getValue())));
//2.将新配置加入注册器
configs.forEach(circuitBreakerRegistry::addConfiguration);
//3.替换注册器中的实例
circuitBreakerProperties.getInstances().forEach(
(name, properties) -> circuitBreakerRegistry.replace(name, CircuitBreaker.of(
name, circuitBreakerProperties.createCircuitBreakerConfig(properties))));
CircuitBreaker注册器
InMemoryCircuitBreakerRegistry是注册器的实现类,继承及实现关系如下图:
CircuitBreakerRegistryCircuitBreakerRegistry的创建
承接之前配置读入的过程我们看一下InMemoryCircuitBreakerRegistry的构造方法,因为registry的静态of方法其实就是调用了InMemoryCircuitBreakerRegistry的构造方法,3个of方法对应3个构造方法,如下:
//无参构造方法,其实就是使用默认配置调用第三个构造方法,这个默认配置是内置的,不受spring配置注入的影响
public InMemoryCircuitBreakerRegistry() {
this(CircuitBreakerConfig.ofDefaults());
}
//批量配置,首先会查看configs中是否有名为"default"的配置,如果有则将该配置放入configurations中,没有则放入内置的默认配置
public InMemoryCircuitBreakerRegistry(Map<String, CircuitBreakerConfig> configs) {
this(configs.getOrDefault(DEFAULT_CONFIG, CircuitBreakerConfig.ofDefaults()));
//然后把其他配置放入configurations中
this.configurations.putAll(configs);
}
//使用指定的配置作为默认配置
public InMemoryCircuitBreakerRegistry(CircuitBreakerConfig defaultConfig) {
super(defaultConfig);
}
假设目前我们有3个配置,分别为:
- 内置的默认配置:default
- 在配置文件中配置名为default的配置:myDefault
- 在配置文件中配置名为configA的配置:configA
如果我们配置了2和3,从Bean中拿到的注册器配置属性如下:
configurations1如果我们只配置了3,从Bean中拿到的注册器配置属性如下:
configurations2如果我们不从Bean中拿,而是直接new CircuitBreakerRegistry(),则注册器配置属性如下:
configurations3上面3张图说明了三个问题:
- 通过配置文件注入到Bean的注册器,默认的配置可以被人为覆盖
- 如果不添加名为default的配置,Bean中的注册器会把内置的默认配置放入注册器中
- 直接新建一个注册器会取内置的配置而不会从配置文件中去拿
从CircuitBreakerRegistry中获取CircuitBreaker
从注册器中获取CircuitBreaker有4种方式:
//从entryMap中获取所有的熔断器
@Override
public Seq<CircuitBreaker> getAllCircuitBreakers() {
return Array.ofAll(entryMap.values());
}
//根据名字获取熔断器,实际是使用默认的配置调用下面的方法
@Override
public CircuitBreaker circuitBreaker(String name) {
return circuitBreaker(name, getDefaultConfig());
}
//根据名称及配置获取熔断器
@Override
public CircuitBreaker circuitBreaker(String name, CircuitBreakerConfig config) {
return computeIfAbsent(name, () -> CircuitBreaker
.of(name, Objects.requireNonNull(config, CONFIG_MUST_NOT_BE_NULL)));
}
//根据名称及配置名称获取熔断器
@Override
public CircuitBreaker circuitBreaker(String name, String configName) {
return computeIfAbsent(name, () -> CircuitBreaker.of(name, getConfiguration(configName)
.orElseThrow(() -> new ConfigurationNotFoundException(configName))));
}
这里有个名叫computeIfAbsent的方法,这个方法内部简单来说做了以下的事:如果entryMap中存在名为name的熔断器,就获取该熔断器,如果不存在该熔断器,就使用config这个配置新建一个名为name的熔断器放入entryMap中,然后返回。所以以下三种方法熔断器的逻辑如下:
CircuitBreaker获取还是假设目前我们有3个配置,分别为:
- 内置的默认配置:default
- 在配置文件中配置名为default的配置:myDefault
- 在配置文件中配置名为configA的配置:configA
还配置了一个熔断器实例:backendA,并且baseConfig为configA
如果我们配置了2、3,并使用3个方法分别去Bean中的注册器拿名为backendA的熔断器,得到的熔断器如下:
backendA同样的情况下使用3个方法分别去Bean中的注册器拿名为backendB的熔断器,得到的熔断器如下:
backendB而对于新建的注册器,分别用3个方法去注册器中拿名为backend的熔断器,得到的熔断器如下:
backendCircuitBreaker状态
CircuitBreaker共有5个状态,通过实现CircuitBreakerState接口实现各状态的功能:
CircuitBreakerStateCircuitBreakerState共有7个方法和一个default方法:
// 用于返回是否允许通过熔断器
boolean tryAcquirePermission();
// 主要用于打开状态时熔断器的触发转换和半开状态时限制线程数
void acquirePermission();
// 主要用于半开状态时线程数释放
void releasePermission();
// 调用失败,计算失败率
void onError(Throwable throwable);
// 调用成功,计算失败率
void onSuccess();
// 返回当前状态在CircuitBreaker的枚举值
CircuitBreaker.State getState();
// 返回当前封装了度量指标的类实例
CircuitBreakerMetrics getMetrics();
// 决定是否会发布事件
default boolean shouldPublishEvents(CircuitBreakerEvent event){
return event.getEventType().forcePublish || getState().allowPublish;
}
关闭状态
关闭状态比较简单,所有请求都会通过,在成功或失败的时候分别调用度量指标的onSuccess()或onError()方法,然后检查失败率进行状态装换。
@Override
public boolean tryAcquirePermission() {
return true;
}
@Override
public void onError(Throwable throwable) {
// CircuitBreakerMetrics是线程安全的
checkFailureRate(circuitBreakerMetrics.onError());
}
@Override
public void onSuccess() {
// CircuitBreakerMetrics是线程安全的
checkFailureRate(circuitBreakerMetrics.onSuccess());
}
private void checkFailureRate(float currentFailureRate) {
if (currentFailureRate >= failureRateThreshold) {
transitionToOpenState();
}
}
打开状态
打开状态的构造方法会拿取isAutomaticTransitionFromOpenToHalfOpenEnabled配置,如果为true,就会调用scheduledExecutorService的schedule方法时间达到的时候自动转换,如果为false,就必须等到别的请求调用了acquirePermission方法:
OpenState(CircuitBreakerMetrics circuitBreakerMetrics) {
final Duration waitDurationInOpenState = circuitBreakerConfig.getWaitDurationInOpenState();
this.retryAfterWaitDuration = clock.instant().plus(waitDurationInOpenState);
this.circuitBreakerMetrics = circuitBreakerMetrics;
// 检查是否自动转换
if (circuitBreakerConfig.isAutomaticTransitionFromOpenToHalfOpenEnabled()) {
ScheduledExecutorService scheduledExecutorService = schedulerFactory.getScheduler();
scheduledExecutorService.schedule(CircuitBreakerStateMachine.this::transitionToHalfOpenState, waitDurationInOpenState.toMillis(), TimeUnit.MILLISECONDS);
}
}
@Override
public boolean tryAcquirePermission() {
// 请求进来时检查时间,如果时间到了,就状态转换并且放请求通过
if (clock.instant().isAfter(retryAfterWaitDuration)) {
transitionToHalfOpenState();
return true;
}
circuitBreakerMetrics.onCallNotPermitted();
return false;
}
@Override
public void acquirePermission() {
if(!tryAcquirePermission()){
throw new CallNotPermittedException(CircuitBreakerStateMachine.this);
}
}
// 成功和失败仍然调用Metrics的方法增加成功或失败次数,并且计算失败率
@Override
public void onError(Throwable throwable) {
circuitBreakerMetrics.onError();
}
@Override
public void onSuccess() {
circuitBreakerMetrics.onSuccess();
}
半开状态
半开状态和关闭状态大体相似,但略有不同,请求通过时并不是全都通过,而是使用原子变量testRequestCounter,来限制请求数,testRequestCounter的大小为半开状态环的大小,若超过允许的请求数则直接拒绝:
@Override
public boolean tryAcquirePermission() {
// 查看半开状态环是否装满,没装满就进入,装满就拒绝
if (testRequestCounter.getAndUpdate(current -> current == 0 ? current : --current) > 0) {
return true;
}
circuitBreakerMetrics.onCallNotPermitted();
return false;
}
@Override
public void acquirePermission() {
if(!tryAcquirePermission()){
throw new CallNotPermittedException(CircuitBreakerStateMachine.this);
}
}
// 请求完成会释放拿着的原子变量
@Override
public void releasePermission() {
testRequestCounter.incrementAndGet();
}
// 根据失败率完成两种状态的转换
private void checkFailureRate(float currentFailureRate) {
if(currentFailureRate != -1){
if(currentFailureRate >= failureRateThreshold) {
transitionToOpenState();
}else{
transitionToClosedState();
}
}
}
ForcedOpenState和DisabledState两种状态比较简单,强制开启就直接熔断也不计算失败率,禁止状态什么都不干,只能拿到状态枚举和度量指标。
CircuitBreaker状态机
有了状态还需要状态机来控制状态的转换,CircuitBreakerStateMachine实现了CircuitBreaker接口,同时又一个内部类CircuitBreakerEventProcessor来处理事件,是熔断器的核心部分,继承实现关系如下:
CircuitBreakerStateMachineState和StateTransition是两个CircuitBreaker接口中的内部枚举
State
定义了几个状态的枚举类型,allowPublish设置了每种状态是否允许发布事件
enum State {
DISABLED(3, false),
CLOSED(0, true),
OPEN(1, true),
FORCED_OPEN(4, false),
HALF_OPEN(2, true);
private final int order;
public final boolean allowPublish;
State(int order, boolean allowPublish){
this.order = order;
this.allowPublish = allowPublish;
}
public int getOrder(){
return order;
}
}
StateTransition
定义了状态转换的枚举,存在一个元组<State, State>为键,枚举值为StateTransition的Map中,同时提供一个状态转换的方法,能够根据转换的两个状态的枚举值返回StateTransition,主要便于发布事件。
enum StateTransition {
CLOSED_TO_OPEN(State.CLOSED, State.OPEN),
CLOSED_TO_DISABLED(State.CLOSED, State.DISABLED),
CLOSED_TO_FORCED_OPEN(State.CLOSED, State.FORCED_OPEN),
HALF_OPEN_TO_CLOSED(State.HALF_OPEN, State.CLOSED),
HALF_OPEN_TO_OPEN(State.HALF_OPEN, State.OPEN),
HALF_OPEN_TO_DISABLED(State.HALF_OPEN, State.DISABLED),
HALF_OPEN_TO_FORCED_OPEN(State.HALF_OPEN, State.FORCED_OPEN),
OPEN_TO_CLOSED(State.OPEN, State.CLOSED),
OPEN_TO_HALF_OPEN(State.OPEN, State.HALF_OPEN),
OPEN_TO_DISABLED(State.OPEN, State.DISABLED),
OPEN_TO_FORCED_OPEN(State.OPEN, State.FORCED_OPEN),
FORCED_OPEN_TO_CLOSED(State.FORCED_OPEN, State.CLOSED),
FORCED_OPEN_TO_OPEN(State.FORCED_OPEN, State.OPEN),
FORCED_OPEN_TO_DISABLED(State.FORCED_OPEN, State.DISABLED),
FORCED_OPEN_TO_HALF_OPEN(State.FORCED_OPEN, State.HALF_OPEN),
DISABLED_TO_CLOSED(State.DISABLED, State.CLOSED),
DISABLED_TO_OPEN(State.DISABLED, State.OPEN),
DISABLED_TO_FORCED_OPEN(State.DISABLED, State.FORCED_OPEN),
DISABLED_TO_HALF_OPEN(State.DISABLED, State.HALF_OPEN);
private final State fromState;
private final State toState;
// 将枚举中的值全放入Map中
private static final Map<Tuple2<State, State>, StateTransition> STATE_TRANSITION_MAP = Arrays.stream(StateTransition.values())
.collect(Collectors.toMap(v -> Tuple.of(v.fromState, v.toState), Function.identity()));
// 提供方法根据两个状态返回状态转换的枚举
public static StateTransition transitionBetween(State fromState, State toState){
final StateTransition stateTransition = STATE_TRANSITION_MAP.get(Tuple.of(fromState, toState));
if(stateTransition == null) {
throw new IllegalStateException(
String.format("Illegal state transition from %s to %s", fromState.toString(), toState.toString()));
}
return stateTransition;
}
}
进行状态转换时先用原子变量的操作更新引用,然后根据前后状态是否相同决定是否发布事件。
private void stateTransition(State newState, UnaryOperator<CircuitBreakerState> newStateGenerator) {
// 原子操作获取之前的状态并更新状态
CircuitBreakerState previousState = stateReference.getAndUpdate(currentState -> {
if (currentState.getState() == newState) {
return currentState;
}
return newStateGenerator.apply(currentState);
});
// 如果两个状态不同则发布事件
if (previousState.getState() != newState) {
// 通过两个不同状态的枚举获取StateTransition
publishStateTransitionEvent(StateTransition.transitionBetween(previousState.getState(), newState));
}
}
CircuitBreakerMetrics
度量标的继承实现关系如下:
CircuitBreakerMetricsCircuitBreakerMetrics实现了Metrics接口,并持有RingBitSet,当各个状态下的熔断器调用成功或失败就调用Metrics相应的方法写入环的bit位并计算失败率。
float onError() {
int currentNumberOfFailedCalls = ringBitSet.setNextBit(true);
return getFailureRate(currentNumberOfFailedCalls);
}
float onSuccess() {
int currentNumberOfFailedCalls = ringBitSet.setNextBit(false);
return getFailureRate(currentNumberOfFailedCalls);
}
void onCallNotPermitted() {
numberOfNotPermittedCalls.increment();
}
RingBitSet
RingBitSet内部有一个按位存储的Ring Bit Bufffer(环形缓存区)数据结构BitSetMod,具体实现不用特别关心,只需知道它的大小是由long[]和size决定,逻辑上是如下的环状:
RingBuffer成功的调用会写0,失败的写1,当环满了就计算失败率。例如,如果Ring Bit Buffer的大小设置为10,如果前9次的请求调用都失败也不会计算请求调用失败率。
以CircuitBreaker从初始状态转换到打开状态为例,熔断器的转换如下:
CircuitBreakerStateMachineCircuitBreaker事件
CircuitBreaker的事件采用观察者模式,事件框架如下:
eventProcessor- EventConsumer<T>是事件消费者接口(观察者),是函数式接口,使用event->{......}来创建事件消费函数。
- EventPublisher<T>是事件发布者接口(被观察者),只有一个方法onEvent(EventConsumer<T> onEventConsumer)用于设置事件消费者。
- EventProcessor<T>是EventPublisher<T>的通用实现类,主要完成消费者注册以及调用消费者消费事件。
EventProcessor
EventProcessor主要完成消费者的注册和事件消费,主要代码如下:
// 这列表中的消费者是通用事件消费者,任何类型的事件都会触发列表里的消费者消费
List<EventConsumer<T>> onEventConsumers = new CopyOnWriteArrayList<>();
// ConcurrentMap中存储着特定的消费者,特定类型事件触发时会调用特定消费者
ConcurrentMap<String, List<EventConsumer<T>>> eventConsumerMap = new ConcurrentHashMap<>();
@Override
// 注册通用消费者的方法
public synchronized void onEvent(@Nullable EventConsumer<T> onEventConsumer) {
this.consumerRegistered = true;
this.onEventConsumers.add(onEventConsumer);
}
// 注册特定的消费者
public synchronized void registerConsumer(String className, EventConsumer<? extends T> eventConsumer){
this.consumerRegistered = true;
this.eventConsumerMap.compute(className, (k, consumers) -> {
if(consumers == null){
consumers = new ArrayList<>();
consumers.add((EventConsumer<T>) eventConsumer);
return consumers;
}else{
consumers.add((EventConsumer<T>) eventConsumer);
return consumers;
}
});
}
// 调用消费者消费事件
public <E extends T> boolean processEvent(E event) {
boolean consumed = false;
// 通用消费者消费事件
if(!onEventConsumers.isEmpty()){
onEventConsumers.forEach(onEventConsumer -> onEventConsumer.consumeEvent(event));
consumed = true;
}
// 特定消费者消费事件
if(!eventConsumerMap.isEmpty()){
List<EventConsumer<T>> eventConsumers = this.eventConsumerMap.get(event.getClass().getSimpleName());
if(eventConsumers != null && !eventConsumers.isEmpty()){
eventConsumers.forEach(consumer -> consumer.consumeEvent(event));
consumed = true;
}
}
return consumed;
}
CircuitBreaker的事件类型
CircuitBreaker共有6种事件类型
Event- CircuitBreakerOnResetEvent:在熔断器重置时发布的事件。
- CircuitBreakerOnSuccessEvent:在调用成功时发布的事件。
- CircuitBreakerOnErrorEvent:在调用失败时发布的事件。
- CircuitBreakerOnIgnoredErrorEvent:在异常被忽略时发布的事件。
- CircuitBreakerOnCallNotPermittedEvent:在熔断器熔断时发布的事件。
- CircuitBreakerOnStateTranstionEvent:在熔断器状态转换时发布的事件。
CircuitBreakerEvent接口中声明了与具体事件对应的枚举类Type,用于表示事件类型,已经是应该否强制发布:
enum Type {
/** A CircuitBreakerEvent which informs that an error has been recorded */
ERROR(false),
/** A CircuitBreakerEvent which informs that an error has been ignored */
IGNORED_ERROR(false),
/** A CircuitBreakerEvent which informs that a success has been recorded */
SUCCESS(false),
/** A CircuitBreakerEvent which informs that a call was not permitted because the CircuitBreaker state is OPEN */
NOT_PERMITTED(false),
/** A CircuitBreakerEvent which informs the state of the CircuitBreaker has been changed */
STATE_TRANSITION(true),
/** A CircuitBreakerEvent which informs the CircuitBreaker has been reset */
RESET(true),
/** A CircuitBreakerEvent which informs the CircuitBreaker has been forced open */
FORCED_OPEN(false),
/** A CircuitBreakerEvent which informs the CircuitBreaker has been disabled */
DISABLED(false);
public final boolean forcePublish;
Type(boolean forcePublish) {
this.forcePublish = forcePublish;
}
}
CircuitBreakerEventProcessor
熔断器事件处理的继承实现关系如下:
CircuitBreakerEventProcessor其中EventPublisher继承了EventPublisher<T>,并设置了几种事件的消费者:
interface EventPublisher extends io.github.resilience4j.core.EventPublisher<CircuitBreakerEvent> {
EventPublisher onSuccess(EventConsumer<CircuitBreakerOnSuccessEvent> eventConsumer);
EventPublisher onError(EventConsumer<CircuitBreakerOnErrorEvent> eventConsumer);
EventPublisher onStateTransition(EventConsumer<CircuitBreakerOnStateTransitionEvent> eventConsumer);
EventPublisher onReset(EventConsumer<CircuitBreakerOnResetEvent> eventConsumer);
EventPublisher onIgnoredError(EventConsumer<CircuitBreakerOnIgnoredErrorEvent> eventConsumer);
EventPublisher onCallNotPermitted(EventConsumer<CircuitBreakerOnCallNotPermittedEvent> eventConsumer);
}
CircuitBreakerEventProcessor分别实现了事件注册接口和消费接口并继承EventProcessor完成整个事件的处理。
private class CircuitBreakerEventProcessor extends EventProcessor<CircuitBreakerEvent> implements EventConsumer<CircuitBreakerEvent>, EventPublisher {
// 完成事件消费者的注册
@Override
public EventPublisher onSuccess(EventConsumer<CircuitBreakerOnSuccessEvent> onSuccessEventConsumer) {
registerConsumer(CircuitBreakerOnSuccessEvent.class.getSimpleName(), onSuccessEventConsumer);
return this;
}
/*...*/
// 调用父类方法处理事件
@Override
public void consumeEvent(CircuitBreakerEvent event) {
super.processEvent(event);
}
}
CircuitBreaker熔断
有了前面的所有工作,CircuitBreaker的使用就十分简单了,只需要使用CircuitBreaker提供的函数包装 Callable
, Supplier
, Runnable
, Consumer
, CheckedRunnable
, CheckedSupplier
, CheckedConsumer
或者 CompletionStage
,这些包装函数都大同小异,看一下其中一个:
static <T> CheckedFunction0<T> decorateCheckedSupplier(CircuitBreaker circuitBreaker, CheckedFunction0<T> supplier){
return () -> {
// 1.先检查是否可以通过熔断器
circuitBreaker.acquirePermission();
long start = System.nanoTime();
try {
// 2.正常运行,拿取返回值
T returnValue = supplier.apply();
// 3.计算运行成功,调用onSuccess()
long durationInNanos = System.nanoTime() - start;
circuitBreaker.onSuccess(durationInNanos);
return returnValue;
} catch (Exception exception) {
// 如果抛出异常,调用onError(),onError()会对异常依据黑白名单进行进行判断
long durationInNanos = System.nanoTime() - start;
circuitBreaker.onError(durationInNanos, exception);
// 不管是否在黑白名单,异常都会抛出
throw exception;
}
};
}
onSuccess():
@Override
public void onSuccess(long durationInNanos) {
// 发布成功事件
publishSuccessEvent(durationInNanos);
// 调用状态检查是否需要状态转换
stateReference.get().onSuccess();
}
onError():
@Override
public void onError(long durationInNanos, Throwable throwable) {
// 拿取判断异常的谓词函数
Predicate<Throwable> recordFailurePredicate = circuitBreakerConfig.getRecordFailurePredicate();
// 解包CompletionException异常
if (throwable instanceof CompletionException) {
Throwable cause = throwable.getCause();
handleThrowable(durationInNanos, recordFailurePredicate, cause);
}else{
handleThrowable(durationInNanos, recordFailurePredicate, throwable);
}
}
private void handleThrowable(long durationInNanos, Predicate<Throwable> recordFailurePredicate, Throwable throwable) {
// 用谓词函数判断是否应该被忽略
if (recordFailurePredicate.test(throwable)) {
LOG.debug("CircuitBreaker '{}' recorded a failure:", name, throwable);
// 提交失败事件,调用状态检查是否需要状态转换
publishCircuitErrorEvent(name, durationInNanos, throwable);
stateReference.get().onError(throwable);
} else {
// 允许通过,提交忽略异常的事件
releasePermission();
publishCircuitIgnoredErrorEvent(name, durationInNanos, throwable);
}
}
CircuitBreaker总结
综上,熔断器的总体工作流程如下:
CircuitBreakerEND
网友评论