美文网首页
Resilience4j-CircuitBreaker详解

Resilience4j-CircuitBreaker详解

作者: I讨厌鬼I | 来源:发表于2019-07-31 16:46 被阅读0次

    CircuitBreaker

    CircuitBreaker主要由以下几个部分组成:配置、注册器、熔断器、度量指标、事件发布及熔断器的工作原理。接下来会逐一介绍。

    CircuitBreaker配置

    基本配置

    首先是CircuitBreaker的一些可配置项,在CircuitBreakerConfig中:

    CircuitBreaker

    这是一个独立的类,里面包含熔断器的可配置项,提供了一个内部类Builder来构建配置,主要通过三个方法实现:

    • ofDefaults():使用默认配置
    • custom():返回一个Builder对象
    • from(CircuitBreakerConfig baseConfig):返回一个包含baseConfigBuilder对象

    得到Builder对象后就可以根据Builder提供的方法构建配置

    SpringBoot配置

    CircuitBreakerProperties

    首先是CircuitBreakerProperties,整体继承关系如下图:

    properties继承关系

    CircuitBreakerProperties类中添加了@ConfigurationProperties(prefix = "resilience4j.circuitbreaker")注解,是读取ymlproperties文件中配置的入口。

    CircuitBreakerConfigurationProperties添加了@Configuration注解,并在其中定义了Aspect的顺序,即之前提到的注解使用熔断器、限流器、重试组件和隔板组件的切入顺序。

    CircuitBreakerConfigurationProperties

    基类的CircuitBreakerConfigurationProperties包含了两个Map<String, InstanceProperties> instancesMap<String, InstanceProperties> configs以及一个内部静态类InstancePropertiesInstanceProperties中除了熔断器的各项配置外,还有一个baseConfig的可配置项,在ymlproperties文件中的配置最终都会进入instancesconfigs这两个map中。例如如下的yml文件:

    resilience4j:
      circuitbreaker:
        configs:
          myDefault:
            ringBufferSizeInClosedState: 10 # 熔断器关闭时的缓冲区大小
            ringBufferSizeInHalfOpenState: 5 # 熔断器半开时的缓冲区大小
            waitDurationInOpenState: 10000 # 熔断器从打开到半开需要的时间
            failureRateThreshold: 60 # 熔断器打开的失败阈值
            eventConsumerBufferSize: 10 # 事件缓冲区大小
            registerHealthIndicator: true # 健康监测
            automaticTransitionFromOpenToHalfOpenEnabled: false # 是否自动从打开到半开
            recordFailurePredicate:     com.example.resilience4j.predicate.RecordFailurePredicate # 谓词设置异常是否为失败
            recordExceptions: # 记录的异常
              - com.example.resilience4j.exceptions.BusinessBException
            ignoreExceptions: # 忽略的异常
              - com.example.resilience4j.exceptions.BusinessAException
        instances:
          backendA:
            baseConfig: myDefault
            waitDurationInOpenState: 5000
            failureRateThreshold: 20
    

    myDefault将作为keyfailureRateThreshold等将作为InstanceProperties对象中的属性一起存入configs这个Map中,instances也是如此。

    CircuitBreakerConfigurationProperties提供了一个createCircuitBreakerConfig方法用于创建CircuitBreakerConfig

    public CircuitBreakerConfig createCircuitBreakerConfig(InstanceProperties instanceProperties) {
            // 如果配置项中有baseConfig,就去configs中找到baseConfig
            if (StringUtils.isNotEmpty(instanceProperties.getBaseConfig())) {
                InstanceProperties baseProperties = configs.get(instanceProperties.getBaseConfig());
                if (baseProperties == null) {
                    throw new ConfigurationNotFoundException(instanceProperties.getBaseConfig());
                }
                //调用buildConfigFromBaseConfig方法使用instanceProperties覆盖baseProperties
                return buildConfigFromBaseConfig(instanceProperties, baseProperties);
            }
            //如果没有调用buildConfig方法返回CircuitBreakerConfig
            return buildConfig(CircuitBreakerConfig.custom(), instanceProperties);
        }
    
        private CircuitBreakerConfig buildConfigFromBaseConfig(InstanceProperties instanceProperties, InstanceProperties baseProperties) {
            ConfigUtils.mergePropertiesIfAny(instanceProperties, baseProperties);
            //覆盖的实现调用了CircuitBreakerConfig的custom()方法和from()方法
            CircuitBreakerConfig baseConfig = buildConfig(CircuitBreakerConfig.custom(), baseProperties);
            return buildConfig(CircuitBreakerConfig.from(baseConfig), instanceProperties);
        }
    

    CircuitBreakerConfiguration

    接下来看看这些配置是如何配置到注册器中的,在CircuitBreakerConfiguration中:

    CircuitBreakerConfiguration CircuitBreakerConfiguration

    CircuitBreakerConfiguration中的circuitBreakerRegistry方法使用@Bean注解注入了注册器,方法中主要做了3件事:

    1. 创建注册器

      public CircuitBreakerRegistry createCircuitBreakerRegistry(CircuitBreakerConfigurationProperties circuitBreakerProperties) {
          //把Map<String, InstanceProperties> 转换为Map<String, CircuitBreakerConfig>
          Map<String, CircuitBreakerConfig> configs = circuitBreakerProperties.getConfigs()
              .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
                       entry -> circuitBreakerProperties.createCircuitBreakerConfig(entry.getValue())));
       //调用registry的静态of方法新建一个InMemoryCircuitBreakerRegistry,放入configurations,Map作为传入参数
          return CircuitBreakerRegistry.of(configs);
      }
      
    2. 注册事件消费者:说到事件的时候再提

    3. 初始化注册器

    public void initCircuitBreakerRegistry(CircuitBreakerRegistry circuitBreakerRegistry) {
        //把circuitBreakerProperties中的instances中的配置先转换,再用注册器构建实例放入entryMap
        circuitBreakerProperties.getInstances().forEach(
            (name, properties) -> circuitBreakerRegistry.circuitBreaker(name,                                                circuitBreakerProperties.createCircuitBreakerConfig(properties)));
    }
    

    动态配置

    基于以上结果,若使用配置中心完成配置的动态刷新,刷新的内容是CircuitBreakerConfigurationProperties中的内容,若要使配置生效需要把CircuitBreakerConfigurationProperties中的配置添加到circuitBreakerRegistry中,并且替换之前的注册器实例,如下:

    //1.获取更新的配置
    Map<String, CircuitBreakerConfig> configs = circuitBreakerProperties.getConfigs()
        .entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
                                                      entry -> circuitBreakerProperties.createCircuitBreakerConfig(entry.getValue())));
    //2.将新配置加入注册器
    configs.forEach(circuitBreakerRegistry::addConfiguration);
    //3.替换注册器中的实例
    circuitBreakerProperties.getInstances().forEach(
        (name, properties) -> circuitBreakerRegistry.replace(name, CircuitBreaker.of(
            name, circuitBreakerProperties.createCircuitBreakerConfig(properties))));
    

    CircuitBreaker注册器

    InMemoryCircuitBreakerRegistry是注册器的实现类,继承及实现关系如下图:

    CircuitBreakerRegistry

    CircuitBreakerRegistry的创建

    承接之前配置读入的过程我们看一下InMemoryCircuitBreakerRegistry的构造方法,因为registry的静态of方法其实就是调用了InMemoryCircuitBreakerRegistry的构造方法,3个of方法对应3个构造方法,如下:

    //无参构造方法,其实就是使用默认配置调用第三个构造方法,这个默认配置是内置的,不受spring配置注入的影响
    public InMemoryCircuitBreakerRegistry() {
        this(CircuitBreakerConfig.ofDefaults());
    }
    //批量配置,首先会查看configs中是否有名为"default"的配置,如果有则将该配置放入configurations中,没有则放入内置的默认配置
    public InMemoryCircuitBreakerRegistry(Map<String, CircuitBreakerConfig> configs) {
        this(configs.getOrDefault(DEFAULT_CONFIG, CircuitBreakerConfig.ofDefaults()));
        //然后把其他配置放入configurations中
        this.configurations.putAll(configs);
    }
    //使用指定的配置作为默认配置
    public InMemoryCircuitBreakerRegistry(CircuitBreakerConfig defaultConfig) {
        super(defaultConfig);
    }
    

    假设目前我们有3个配置,分别为:

    1. 内置的默认配置:default
    2. 在配置文件中配置名为default的配置:myDefault
    3. 在配置文件中配置名为configA的配置:configA

    如果我们配置了23,从Bean中拿到的注册器配置属性如下:

    configurations1

    如果我们只配置了3,从Bean中拿到的注册器配置属性如下:

    configurations2

    如果我们不从Bean中拿,而是直接new CircuitBreakerRegistry(),则注册器配置属性如下:

    configurations3

    上面3张图说明了三个问题:

    1. 通过配置文件注入到Bean的注册器,默认的配置可以被人为覆盖
    2. 如果不添加名为default的配置,Bean中的注册器会把内置的默认配置放入注册器中
    3. 直接新建一个注册器会取内置的配置而不会从配置文件中去拿

    从CircuitBreakerRegistry中获取CircuitBreaker

    从注册器中获取CircuitBreaker4种方式:

    //从entryMap中获取所有的熔断器
    @Override
    public Seq<CircuitBreaker> getAllCircuitBreakers() {
        return Array.ofAll(entryMap.values());
    }
    
    //根据名字获取熔断器,实际是使用默认的配置调用下面的方法
    @Override
    public CircuitBreaker circuitBreaker(String name) {
        return circuitBreaker(name, getDefaultConfig());
    }
    
    //根据名称及配置获取熔断器
    @Override
    public CircuitBreaker circuitBreaker(String name, CircuitBreakerConfig config) {
        return computeIfAbsent(name, () -> CircuitBreaker
                               .of(name, Objects.requireNonNull(config, CONFIG_MUST_NOT_BE_NULL)));
    }
    
    //根据名称及配置名称获取熔断器
    @Override
    public CircuitBreaker circuitBreaker(String name, String configName) {
        return computeIfAbsent(name, () -> CircuitBreaker.of(name, getConfiguration(configName)
                              .orElseThrow(() -> new ConfigurationNotFoundException(configName))));
    }
    

    这里有个名叫computeIfAbsent的方法,这个方法内部简单来说做了以下的事:如果entryMap中存在名为name的熔断器,就获取该熔断器,如果不存在该熔断器,就使用config这个配置新建一个名为name的熔断器放入entryMap中,然后返回。所以以下三种方法熔断器的逻辑如下:

    CircuitBreaker获取

    还是假设目前我们有3个配置,分别为:

    1. 内置的默认配置:default
    2. 在配置文件中配置名为default的配置:myDefault
    3. 在配置文件中配置名为configA的配置:configA

    还配置了一个熔断器实例:backendA,并且baseConfigconfigA

    如果我们配置了23,并使用3个方法分别去Bean中的注册器拿名为backendA的熔断器,得到的熔断器如下:

    backendA

    同样的情况下使用3个方法分别去Bean中的注册器拿名为backendB的熔断器,得到的熔断器如下:

    backendB

    而对于新建的注册器,分别用3个方法去注册器中拿名为backend的熔断器,得到的熔断器如下:

    backend

    CircuitBreaker状态

    CircuitBreaker共有5个状态,通过实现CircuitBreakerState接口实现各状态的功能:

    CircuitBreakerState

    CircuitBreakerState共有7个方法和一个default方法:

    // 用于返回是否允许通过熔断器
    boolean tryAcquirePermission();
    // 主要用于打开状态时熔断器的触发转换和半开状态时限制线程数
    void acquirePermission();
    // 主要用于半开状态时线程数释放
    void releasePermission();
    // 调用失败,计算失败率
    void onError(Throwable throwable);
    // 调用成功,计算失败率
    void onSuccess();
    // 返回当前状态在CircuitBreaker的枚举值
    CircuitBreaker.State getState();
    // 返回当前封装了度量指标的类实例
    CircuitBreakerMetrics getMetrics();
    // 决定是否会发布事件
    default boolean shouldPublishEvents(CircuitBreakerEvent event){
        return event.getEventType().forcePublish || getState().allowPublish;
    }
    

    关闭状态

    关闭状态比较简单,所有请求都会通过,在成功或失败的时候分别调用度量指标的onSuccess()或onError()方法,然后检查失败率进行状态装换。

    @Override
    public boolean tryAcquirePermission() {
        return true;
    }
    
    @Override
    public void onError(Throwable throwable) {
        // CircuitBreakerMetrics是线程安全的
        checkFailureRate(circuitBreakerMetrics.onError());
    }
    
    @Override
    public void onSuccess() {
        // CircuitBreakerMetrics是线程安全的
        checkFailureRate(circuitBreakerMetrics.onSuccess());
    }
    
    private void checkFailureRate(float currentFailureRate) {
        if (currentFailureRate >= failureRateThreshold) {
            transitionToOpenState();
        }
    }
    

    打开状态

    打开状态的构造方法会拿取isAutomaticTransitionFromOpenToHalfOpenEnabled配置,如果为true,就会调用scheduledExecutorServiceschedule方法时间达到的时候自动转换,如果为false,就必须等到别的请求调用了acquirePermission方法:

    OpenState(CircuitBreakerMetrics circuitBreakerMetrics) {
        final Duration waitDurationInOpenState = circuitBreakerConfig.getWaitDurationInOpenState();
        this.retryAfterWaitDuration = clock.instant().plus(waitDurationInOpenState);
        this.circuitBreakerMetrics = circuitBreakerMetrics;
        // 检查是否自动转换
        if (circuitBreakerConfig.isAutomaticTransitionFromOpenToHalfOpenEnabled()) {
            ScheduledExecutorService scheduledExecutorService = schedulerFactory.getScheduler();
            scheduledExecutorService.schedule(CircuitBreakerStateMachine.this::transitionToHalfOpenState, waitDurationInOpenState.toMillis(), TimeUnit.MILLISECONDS);
        }
    }
    
    @Override
    public boolean tryAcquirePermission() {
        // 请求进来时检查时间,如果时间到了,就状态转换并且放请求通过
        if (clock.instant().isAfter(retryAfterWaitDuration)) {
            transitionToHalfOpenState();
            return true;
        }
        circuitBreakerMetrics.onCallNotPermitted();
        return false;
    }
    
    @Override
    public void acquirePermission() {
        if(!tryAcquirePermission()){
            throw new CallNotPermittedException(CircuitBreakerStateMachine.this);
        }
    }
    
    // 成功和失败仍然调用Metrics的方法增加成功或失败次数,并且计算失败率
    @Override
    public void onError(Throwable throwable) {
        circuitBreakerMetrics.onError();
    }
    @Override
    public void onSuccess() {
        circuitBreakerMetrics.onSuccess();
    }
    

    半开状态

    半开状态和关闭状态大体相似,但略有不同,请求通过时并不是全都通过,而是使用原子变量testRequestCounter,来限制请求数,testRequestCounter的大小为半开状态环的大小,若超过允许的请求数则直接拒绝:

    @Override
    public boolean tryAcquirePermission() {
        // 查看半开状态环是否装满,没装满就进入,装满就拒绝
        if (testRequestCounter.getAndUpdate(current -> current == 0 ? current : --current) > 0) {
            return true;
        }
        circuitBreakerMetrics.onCallNotPermitted();
        return false;
    }
    
    @Override
    public void acquirePermission() {
        if(!tryAcquirePermission()){
            throw new CallNotPermittedException(CircuitBreakerStateMachine.this);
        }
    }
    
    // 请求完成会释放拿着的原子变量
    @Override
    public void releasePermission() {
        testRequestCounter.incrementAndGet();
    }
    
    // 根据失败率完成两种状态的转换
    private void checkFailureRate(float currentFailureRate) {
        if(currentFailureRate != -1){
            if(currentFailureRate >= failureRateThreshold) {
                transitionToOpenState();
            }else{
                transitionToClosedState();
            }
        }
    }
    

    ForcedOpenStateDisabledState两种状态比较简单,强制开启就直接熔断也不计算失败率,禁止状态什么都不干,只能拿到状态枚举和度量指标。

    CircuitBreaker状态机

    有了状态还需要状态机来控制状态的转换,CircuitBreakerStateMachine实现了CircuitBreaker接口,同时又一个内部类CircuitBreakerEventProcessor来处理事件,是熔断器的核心部分,继承实现关系如下:

    CircuitBreakerStateMachine

    StateStateTransition是两个CircuitBreaker接口中的内部枚举

    State

    定义了几个状态的枚举类型,allowPublish设置了每种状态是否允许发布事件

    enum State {
        DISABLED(3, false),
        CLOSED(0, true),
        OPEN(1, true),
        FORCED_OPEN(4, false),
        HALF_OPEN(2, true);
    
        private final int order;
        public final boolean allowPublish;
    
        State(int order, boolean allowPublish){
            this.order = order;
            this.allowPublish = allowPublish;
        }
    
        public int getOrder(){
            return order;
        }
    }
    

    StateTransition

    定义了状态转换的枚举,存在一个元组<State, State>为键,枚举值为StateTransitionMap中,同时提供一个状态转换的方法,能够根据转换的两个状态的枚举值返回StateTransition,主要便于发布事件。

    enum StateTransition {
        CLOSED_TO_OPEN(State.CLOSED, State.OPEN),
        CLOSED_TO_DISABLED(State.CLOSED, State.DISABLED),
        CLOSED_TO_FORCED_OPEN(State.CLOSED, State.FORCED_OPEN),
        HALF_OPEN_TO_CLOSED(State.HALF_OPEN, State.CLOSED),
        HALF_OPEN_TO_OPEN(State.HALF_OPEN, State.OPEN),
        HALF_OPEN_TO_DISABLED(State.HALF_OPEN, State.DISABLED),
        HALF_OPEN_TO_FORCED_OPEN(State.HALF_OPEN, State.FORCED_OPEN),
        OPEN_TO_CLOSED(State.OPEN, State.CLOSED),
        OPEN_TO_HALF_OPEN(State.OPEN, State.HALF_OPEN),
        OPEN_TO_DISABLED(State.OPEN, State.DISABLED),
        OPEN_TO_FORCED_OPEN(State.OPEN, State.FORCED_OPEN),
        FORCED_OPEN_TO_CLOSED(State.FORCED_OPEN, State.CLOSED),
        FORCED_OPEN_TO_OPEN(State.FORCED_OPEN, State.OPEN),
        FORCED_OPEN_TO_DISABLED(State.FORCED_OPEN, State.DISABLED),
        FORCED_OPEN_TO_HALF_OPEN(State.FORCED_OPEN, State.HALF_OPEN),
        DISABLED_TO_CLOSED(State.DISABLED, State.CLOSED),
        DISABLED_TO_OPEN(State.DISABLED, State.OPEN),
        DISABLED_TO_FORCED_OPEN(State.DISABLED, State.FORCED_OPEN),
        DISABLED_TO_HALF_OPEN(State.DISABLED, State.HALF_OPEN);
    
        private final State fromState;
    
        private final State toState;
    
        // 将枚举中的值全放入Map中
        private static final Map<Tuple2<State, State>, StateTransition> STATE_TRANSITION_MAP = Arrays.stream(StateTransition.values())
            .collect(Collectors.toMap(v -> Tuple.of(v.fromState, v.toState), Function.identity()));
    
        // 提供方法根据两个状态返回状态转换的枚举
        public static StateTransition transitionBetween(State fromState, State toState){
            final StateTransition stateTransition = STATE_TRANSITION_MAP.get(Tuple.of(fromState, toState));
            if(stateTransition == null) {
                throw new IllegalStateException(
                    String.format("Illegal state transition from %s to %s", fromState.toString(), toState.toString()));
            }
            return stateTransition;
        }
    }
    

    进行状态转换时先用原子变量的操作更新引用,然后根据前后状态是否相同决定是否发布事件。

    private void stateTransition(State newState, UnaryOperator<CircuitBreakerState> newStateGenerator) {
        // 原子操作获取之前的状态并更新状态
        CircuitBreakerState previousState = stateReference.getAndUpdate(currentState -> {
            if (currentState.getState() == newState) {
                return currentState;
            }
            return newStateGenerator.apply(currentState);
        });
        // 如果两个状态不同则发布事件
        if (previousState.getState() != newState) {
            // 通过两个不同状态的枚举获取StateTransition
            publishStateTransitionEvent(StateTransition.transitionBetween(previousState.getState(), newState));
        }
    }
    

    CircuitBreakerMetrics

    度量标的继承实现关系如下:

    CircuitBreakerMetrics

    CircuitBreakerMetrics实现了Metrics接口,并持有RingBitSet,当各个状态下的熔断器调用成功或失败就调用Metrics相应的方法写入环的bit位并计算失败率。

    float onError() {
        int currentNumberOfFailedCalls = ringBitSet.setNextBit(true);
        return getFailureRate(currentNumberOfFailedCalls);
    }
    
    
    float onSuccess() {
        int currentNumberOfFailedCalls = ringBitSet.setNextBit(false);
        return getFailureRate(currentNumberOfFailedCalls);
    }
    
    void onCallNotPermitted() {
        numberOfNotPermittedCalls.increment();
    }
    

    RingBitSet

    RingBitSet内部有一个按位存储的Ring Bit Bufffer(环形缓存区)数据结构BitSetMod,具体实现不用特别关心,只需知道它的大小是由long[]size决定,逻辑上是如下的环状:

    RingBuffer

    成功的调用会写0,失败的写1,当环满了就计算失败率。例如,如果Ring Bit Buffer的大小设置为10,如果前9次的请求调用都失败也不会计算请求调用失败率。

    CircuitBreaker从初始状态转换到打开状态为例,熔断器的转换如下:

    CircuitBreakerStateMachine

    CircuitBreaker事件

    CircuitBreaker的事件采用观察者模式,事件框架如下:

    eventProcessor
    • EventConsumer<T>是事件消费者接口(观察者),是函数式接口,使用event->{......}来创建事件消费函数。
    • EventPublisher<T>是事件发布者接口(被观察者),只有一个方法onEvent(EventConsumer<T> onEventConsumer)用于设置事件消费者。
    • EventProcessor<T>EventPublisher<T>的通用实现类,主要完成消费者注册以及调用消费者消费事件。

    EventProcessor

    EventProcessor主要完成消费者的注册和事件消费,主要代码如下:

    // 这列表中的消费者是通用事件消费者,任何类型的事件都会触发列表里的消费者消费
    List<EventConsumer<T>> onEventConsumers = new CopyOnWriteArrayList<>();
    // ConcurrentMap中存储着特定的消费者,特定类型事件触发时会调用特定消费者
    ConcurrentMap<String, List<EventConsumer<T>>> eventConsumerMap = new ConcurrentHashMap<>();
    
    @Override
    // 注册通用消费者的方法
    public synchronized void onEvent(@Nullable EventConsumer<T> onEventConsumer) {
        this.consumerRegistered = true;
        this.onEventConsumers.add(onEventConsumer);
    }
    
    // 注册特定的消费者
    public synchronized void registerConsumer(String className, EventConsumer<? extends T> eventConsumer){
        this.consumerRegistered = true;
        this.eventConsumerMap.compute(className, (k, consumers) -> {
            if(consumers == null){
                consumers = new ArrayList<>();
                consumers.add((EventConsumer<T>) eventConsumer);
                return consumers;
            }else{
                consumers.add((EventConsumer<T>) eventConsumer);
                return consumers;
            }
        });
    }
    
    // 调用消费者消费事件
    public <E extends T> boolean processEvent(E event) {
        boolean consumed = false;
        // 通用消费者消费事件
        if(!onEventConsumers.isEmpty()){
            onEventConsumers.forEach(onEventConsumer -> onEventConsumer.consumeEvent(event));
            consumed = true;
        }
        // 特定消费者消费事件
        if(!eventConsumerMap.isEmpty()){
            List<EventConsumer<T>> eventConsumers = this.eventConsumerMap.get(event.getClass().getSimpleName());
            if(eventConsumers != null && !eventConsumers.isEmpty()){
                eventConsumers.forEach(consumer -> consumer.consumeEvent(event));
                consumed = true;
            }
        }
        return consumed;
    }
    

    CircuitBreaker的事件类型

    CircuitBreaker共有6种事件类型

    Event
    • CircuitBreakerOnResetEvent:在熔断器重置时发布的事件。
    • CircuitBreakerOnSuccessEvent:在调用成功时发布的事件。
    • CircuitBreakerOnErrorEvent:在调用失败时发布的事件。
    • CircuitBreakerOnIgnoredErrorEvent:在异常被忽略时发布的事件。
    • CircuitBreakerOnCallNotPermittedEvent:在熔断器熔断时发布的事件。
    • CircuitBreakerOnStateTranstionEvent:在熔断器状态转换时发布的事件。

    CircuitBreakerEvent接口中声明了与具体事件对应的枚举类Type,用于表示事件类型,已经是应该否强制发布:

    enum Type {
        /** A CircuitBreakerEvent which informs that an error has been recorded */
        ERROR(false),
        /** A CircuitBreakerEvent which informs that an error has been ignored */
        IGNORED_ERROR(false),
        /** A CircuitBreakerEvent which informs that a success has been recorded */
        SUCCESS(false),
        /** A CircuitBreakerEvent which informs that a call was not permitted because the CircuitBreaker state is OPEN */
        NOT_PERMITTED(false),
        /** A CircuitBreakerEvent which informs the state of the CircuitBreaker has been changed */
        STATE_TRANSITION(true),
        /** A CircuitBreakerEvent which informs the CircuitBreaker has been reset */
        RESET(true),
        /** A CircuitBreakerEvent which informs the CircuitBreaker has been forced open */
        FORCED_OPEN(false),
        /** A CircuitBreakerEvent which informs the CircuitBreaker has been disabled */
        DISABLED(false);
    
        public final boolean forcePublish;
    
        Type(boolean forcePublish) {
            this.forcePublish = forcePublish;
        }
    }
    

    CircuitBreakerEventProcessor

    熔断器事件处理的继承实现关系如下:

    CircuitBreakerEventProcessor

    其中EventPublisher继承了EventPublisher<T>,并设置了几种事件的消费者:

    interface EventPublisher extends io.github.resilience4j.core.EventPublisher<CircuitBreakerEvent> {
    
        EventPublisher onSuccess(EventConsumer<CircuitBreakerOnSuccessEvent> eventConsumer);
    
        EventPublisher onError(EventConsumer<CircuitBreakerOnErrorEvent> eventConsumer);
    
        EventPublisher onStateTransition(EventConsumer<CircuitBreakerOnStateTransitionEvent> eventConsumer);
    
        EventPublisher onReset(EventConsumer<CircuitBreakerOnResetEvent> eventConsumer);
    
        EventPublisher onIgnoredError(EventConsumer<CircuitBreakerOnIgnoredErrorEvent> eventConsumer);
    
        EventPublisher onCallNotPermitted(EventConsumer<CircuitBreakerOnCallNotPermittedEvent> eventConsumer);
    }
    

    CircuitBreakerEventProcessor分别实现了事件注册接口和消费接口并继承EventProcessor完成整个事件的处理。

    private class CircuitBreakerEventProcessor extends EventProcessor<CircuitBreakerEvent> implements EventConsumer<CircuitBreakerEvent>, EventPublisher {
        // 完成事件消费者的注册
        @Override
        public EventPublisher onSuccess(EventConsumer<CircuitBreakerOnSuccessEvent> onSuccessEventConsumer) {
            registerConsumer(CircuitBreakerOnSuccessEvent.class.getSimpleName(), onSuccessEventConsumer);
            return this;
        }
        /*...*/
        // 调用父类方法处理事件
        @Override
        public void consumeEvent(CircuitBreakerEvent event) {
            super.processEvent(event);
        }
    }
    

    CircuitBreaker熔断

    有了前面的所有工作,CircuitBreaker的使用就十分简单了,只需要使用CircuitBreaker提供的函数包装 Callable, Supplier, Runnable, Consumer, CheckedRunnable, CheckedSupplier, CheckedConsumer 或者 CompletionStage ,这些包装函数都大同小异,看一下其中一个:

    static <T> CheckedFunction0<T> decorateCheckedSupplier(CircuitBreaker circuitBreaker, CheckedFunction0<T> supplier){
        return () -> {
            // 1.先检查是否可以通过熔断器
            circuitBreaker.acquirePermission();
            long start = System.nanoTime();
            try {
                // 2.正常运行,拿取返回值
                T returnValue = supplier.apply();
                // 3.计算运行成功,调用onSuccess()
                long durationInNanos = System.nanoTime() - start;
                circuitBreaker.onSuccess(durationInNanos);
                return returnValue;
            } catch (Exception exception) {
                // 如果抛出异常,调用onError(),onError()会对异常依据黑白名单进行进行判断
                long durationInNanos = System.nanoTime() - start;
                circuitBreaker.onError(durationInNanos, exception);
                // 不管是否在黑白名单,异常都会抛出
                throw exception;
            }
        };
    }
    

    onSuccess()

    @Override
    public void onSuccess(long durationInNanos) {
        // 发布成功事件
        publishSuccessEvent(durationInNanos);
        // 调用状态检查是否需要状态转换
        stateReference.get().onSuccess();
    }
    

    onError()

    @Override
    public void onError(long durationInNanos, Throwable throwable) {
        // 拿取判断异常的谓词函数
        Predicate<Throwable> recordFailurePredicate = circuitBreakerConfig.getRecordFailurePredicate();
        // 解包CompletionException异常
        if (throwable instanceof CompletionException) {
            Throwable cause = throwable.getCause();
            handleThrowable(durationInNanos, recordFailurePredicate, cause);
        }else{
            handleThrowable(durationInNanos, recordFailurePredicate, throwable);
        }
    }
    
    private void handleThrowable(long durationInNanos, Predicate<Throwable> recordFailurePredicate, Throwable throwable) {
        // 用谓词函数判断是否应该被忽略
        if (recordFailurePredicate.test(throwable)) {
            LOG.debug("CircuitBreaker '{}' recorded a failure:", name, throwable);
            // 提交失败事件,调用状态检查是否需要状态转换
            publishCircuitErrorEvent(name, durationInNanos, throwable);
            stateReference.get().onError(throwable);
        } else {
            // 允许通过,提交忽略异常的事件
            releasePermission();
            publishCircuitIgnoredErrorEvent(name, durationInNanos, throwable);
        }
    }
    

    CircuitBreaker总结

    综上,熔断器的总体工作流程如下:

    CircuitBreaker
    END

    相关文章

      网友评论

          本文标题:Resilience4j-CircuitBreaker详解

          本文链接:https://www.haomeiwen.com/subject/ijtjrctx.html