美文网首页
Hystrix核心熔断器

Hystrix核心熔断器

作者: Java学习录 | 来源:发表于2019-11-07 09:25 被阅读0次

在深入研究熔断器之前,我们需要先看一下Hystrix的几个重要的默认配置,这几个配置在HystrixCommandProperties

//时间窗(ms)
static final Integer default_metricsRollingStatisticalWindow = 10000;
//最少请求次数
private static final Integer default_circuitBreakerRequestVolumeThreshold = 20;
//熔断器打开后开始尝试半开的时间间隔
private static final Integer default_circuitBreakerSleepWindowInMilliseconds = 5000;
//错误比例
private static final Integer default_circuitBreakerErrorThresholdPercentage = 50;

这几个属性共同组成了熔断器的核心逻辑,即:

  1. 每10秒的窗口期内,当请求次数超过20次,且出错比例超过50%,则触发熔断器打开
  2. 当熔断器5秒后,会尝试放过去一部分流量进行试探
熔断器初始化

熔断器的初始化是在HystrixCircuitBreaker.FactorygetInstance方法

        private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();

        public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
            // this should find it for all but the first time
            HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
            if (previouslyCached != null) {
                return previouslyCached;
            }

            // if we get here this is the first time so we need to initialize

            // Create and add to the map ... use putIfAbsent to atomically handle the possible race-condition of
            // 2 threads hitting this point at the same time and let ConcurrentHashMap provide us our thread-safety
            // If 2 threads hit here only one will get added and the other will get a non-null response instead.
            HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
            if (cbForCommand == null) {
                // this means the putIfAbsent step just created a new one so let's retrieve and return it
                return circuitBreakersByCommand.get(key.name());
            } else {
                // this means a race occurred and while attempting to 'put' another one got there before
                // and we instead retrieved it and will now return it
                return cbForCommand;
            }
        }

由上方代码可知,每一个熔断器都是由HystrixCircuitBreakerImpl实现的,而所有的熔断器都维护在circuitBreakersByCommand这个ConcurrentHashMap

熔断器实现
构造方法
class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
    private final HystrixCommandProperties properties;
    private final HystrixCommandMetrics metrics;

    enum Status {
        CLOSED, OPEN, HALF_OPEN
    }

    private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);
    private final AtomicLong circuitOpened = new AtomicLong(-1);
    private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);

    protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
        this.properties = properties;
        this.metrics = metrics;

        //On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
        Subscription s = subscribeToStream();
        activeSubscription.set(s);
    }
}

先介绍一下几个比较基础的属性:

  1. HystrixCommandProperties:当前熔断器的配置
  2. HystrixCommandMetrics: 请求统计组件
  3. Status:熔断器状态枚举,一共包含三种,关闭、打开和半开
  4. status:当前熔断器的状态
  5. circuitOpened:当前熔断器的打开时间
  6. activeSubscription:订阅请求统计的处理函数
请求统计处理
private Subscription subscribeToStream() {
            /*
             * This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
             */
            return metrics.getHealthCountsStream()
                    .observe()
                    .subscribe(new Subscriber<HealthCounts>() {
                        @Override
                        public void onCompleted() {

                        }

                        @Override
                        public void onError(Throwable e) {

                        }

                        @Override
                        public void onNext(HealthCounts hc) {
                            // check if we are past the statisticalWindowVolumeThreshold
                            if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                                // we are not past the minimum volume threshold for the stat window,
                                // so no change to circuit status.
                                // if it was CLOSED, it stays CLOSED
                                // if it was half-open, we need to wait for a successful command execution
                                // if it was open, we need to wait for sleep window to elapse
                            } else {
                                if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                                    //we are not past the minimum error threshold for the stat window,
                                    // so no change to circuit status.
                                    // if it was CLOSED, it stays CLOSED
                                    // if it was half-open, we need to wait for a successful command execution
                                    // if it was open, we need to wait for sleep window to elapse
                                } else {
                                    // our failure rate is too high, we need to set the state to OPEN
                                    if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
                                        circuitOpened.set(System.currentTimeMillis());
                                    }
                                }
                            }
                        }
                    });
        }

直接看onNext方法里的处理方式:

  1. 时间窗内的请求数量是否达标,按默认配置就是10秒钟的请求数是否超过20次,如果不达标不能开启熔断器
  2. else中首先判断错误比例是否达到比例,按默认就是50%
  3. 满足打开条件,使用CAS修改状态为打开,并记录打开时间circuitOpened为当前时间

当记录了当前应用的统计数据之后,在每次请求的时候就可以根据这些数据来判断是否应该打开熔断器了

请求过滤

不知你是否还记得在系列文章第一篇中曾经提到了一个方法applyHystrixSemantics,在这个方法中就包含了判断是否应该熔断的逻辑,如果熔断器打开的情况下会直接进入降级逻辑。这个判断的方法如下:

        public boolean attemptExecution() {
            if (properties.circuitBreakerForceOpen().get()) {
                return false;
            }
            if (properties.circuitBreakerForceClosed().get()) {
                return true;
            }
            if (circuitOpened.get() == -1) {
                return true;
            } else {
                if (isAfterSleepWindow()) {
                    if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
                        //only the first request after sleep window should execute
                        return true;
                    } else {
                        return false;
                    }
                } else {
                    return false;
                }
            }
        }
  1. 第一个if,如果配置强制熔断则返回false表示开启熔断器进入降级逻辑
  2. 第二个,如果配置强制关闭则返回正常不进行后续的判断
  3. 第三个,打开时间为空则肯定没打开过
  4. 第四个,判断是否满足尝试时间,默认是5秒钟。时间计算方式如下:
private boolean isAfterSleepWindow() {
    final long circuitOpenTime = circuitOpened.get();
    final long currentTime = System.currentTimeMillis();
    final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
    return currentTime > circuitOpenTime + sleepWindowTime;
}
  1. 当满足尝试时则使用CAS方式修改熔断器为半开状态

而当请求成功的时候则会调用如下方法清除统计数据,更改熔断器状态为关闭

 public void markSuccess() {
            if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
                //This thread wins the race to close the circuit - it resets the stream to start it over from 0
                metrics.resetStream();
                Subscription previousSubscription = activeSubscription.get();
                if (previousSubscription != null) {
                    previousSubscription.unsubscribe();
                }
                Subscription newSubscription = subscribeToStream();
                activeSubscription.set(newSubscription);
                circuitOpened.set(-1L);
            }
        }

请求失败则再次打开熔断器,并更新打开时间

        public void markNonSuccess() {
            if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
                //This thread wins the race to re-open the circuit - it resets the start time for the sleep window
                circuitOpened.set(System.currentTimeMillis());
            }
        }

相关文章

  • SpringCloud 之Hystrix熔断器

    熔断器Hystrix 为什么要使用熔断器 什么是Hystrix Hystrix 中文意思就是豪猪 ,因其背上长满...

  • Hystrix核心熔断器

    在深入研究熔断器之前,我们需要先看一下Hystrix的几个重要的默认配置,这几个配置在HystrixCommand...

  • 本周学习(20211220-20211226)

    Hystrix提供的熔断器具有自我反馈,自我恢复的功能,Hystrix会根据调用接口的情况,让熔断器在closed...

  • springcloud使用(四) 熔断器Hystrix

    熔断器的概念和优点参考 springcloud(四):熔断器Hystrix, 讲的很详细 基于feign的Hyst...

  • Feign调用报错:failed and no fallback

    timed-out and no fallback 这个错误基本是出现在Hystrix熔断器,熔断器的作用是判断该...

  • 熔断器 Hystrix

    熔断器Hystrix 版本 2.2.1.RELEASE 1. 介绍  Hystrix是一个延迟和容错工具,旨在隔离...

  • Hystrix的正确理解方式

    hystrix-logo-tagline-640.png 什么是熔断器 熔断器,原本是电路中在电器发生短路时的防止...

  • Hystrix熔断器执行机制

    本篇假设大家对Hystrix的执行过程及源码有一定的了解,这里介绍Hystrix的熔断器执行机制。 1.Hystr...

  • hystrix

    hystrix 三个设计原则 资源隔离利用线程池实现,每个依赖服务最多耗尽自己线程池的资源 熔断器熔断器 命令模式...

  • 熔断器---Hystrix

    Hystrix:熔断器,容错管理工具,旨在通过熔断机制控制服务和第三方库的节点,从而对延迟和故障提供更强大的容错能...

网友评论

      本文标题:Hystrix核心熔断器

      本文链接:https://www.haomeiwen.com/subject/vjsmbctx.html