美文网首页Java
Hystrix配置项详解

Hystrix配置项详解

作者: 黄大海 | 来源:发表于2020-03-22 20:21 被阅读0次

简介

组/前缀 配置项 默认值 说明
Execution execution.isolation.strategy THREAD 二选一 THREAD 或 SEMAPHORE
hystrix.command.default/ hystrix.command.HystrixCommandKey execution.isolation.thread.timeoutInMilliseconds 1000 调用超时时间设置,超时后触发fallback
execution.timeout.enabled true 是否启用超时机制
execution.isolation.thread.interruptOnTimeout true 超时后是否中断执行(只在THREAD模式下有效)
execution.isolation.thread.interruptOnCancel false 取消时是否中断执行(只在THREAD模式下有效)
execution.isolation.semaphore.maxConcurrentRequests 10 最大并发数,超过会被拒绝(只在SEMAPHORE模式下有效)
Fallback fallback.isolation.semaphore.maxConcurrentRequests 10 fallback最大并发数(不论Execution是什么模式,fallback都是SEMAPHORE模式)
hystrix.command.default/ hystrix.command.HystrixCommandKey fallback.enabled true 是否开启fallback功能
Circuit Breaker circuitBreaker.enabled true 是否开启断路器
hystrix.command.default/ hystrix.command.HystrixCommandKey circuitBreaker.requestVolumeThreshold 20 断路器开启的最小请求次数
circuitBreaker.sleepWindowInMilliseconds 5000 断路器开启后的维持时间,到时间后会处于半开状态放一个请求进来
circuitBreaker.errorThresholdPercentage 50 执行失败比例超过多少后开启断路
circuitBreaker.forceOpen false 是否强制开启断路器
circuitBreaker.forceClosed false 是否强制关闭断路器
Metrics metrics.rollingStats.timeInMilliseconds 10000 统计的时间窗口
hystrix.command.default/ hystrix.command.HystrixCommandKey metrics.rollingStats.numBuckets 10 统计时间窗口内的细分个数
metrics.rollingPercentile.enabled true 启用百分比直方图
metrics.rollingPercentile.timeInMilliseconds 60000 统计的时间窗口
metrics.rollingPercentile.numBuckets 6 统计时间窗口内的细分个数
metrics.rollingPercentile.bucketSize 100 没用。。
metrics.healthSnapshot.intervalInMilliseconds 500 HealthCounts 专用统计窗口(对断路器起作用)
Request Context requestCache.enabled true 是否启用RequestScope的缓存
hystrix.command.default/ hystrix.command.HystrixCommandKey requestLog.enabled true 是否记录执行的细节日志
Collapser Properties maxRequestsInBatch Integer.MAX_VALUE 一批的最大请求树
hystrix.collapser.default/ hystrix.collapser.HystrixCollapserKey timerDelayInMilliseconds 10 批量处理收集请求的时间窗口
requestCache.enabled true 启用requestscope缓存,同Command缓存,配置前缀为hystrix.collapser.XXX
ThreadPool Properties coreSize 10 核心线程数
hystrix.threadpool.default/ hystrix.threadpool.HystrixThreadPoolKey maximumSize 10 最大线程数
maxQueueSize -1 等待队列最大长度
queueSizeRejectionThreshold 5 动态调整等待队列大小
keepAliveTimeMinutes 1 空闲线程回收时间
allowMaximumSizeToDivergeFromCoreSize false 设为true之后最大线程数和核心线程数可以设不同的值
metrics.rollingStats.timeInMilliseconds 10000 线程池统计时间窗口
metrics.rollingStats.numBuckets 10 线程池统计滑动窗口数

详解

  • Execution

    • execution.isolation.strategy

      • 两种主要模式:“线程池隔离”或“信号量隔离”
      • com.netflix.hystrix.AbstractCommand#executeCommandWithSpecifiedIsolation
      private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
          if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
              return Observable.defer(...)
              ...
              .subscribeOn(threadPool.getScheduler())
          }else{
               return Observable.defer(...);          }
      
      • 这里需要一些RXJAVA的基础。 上面的逻辑 “subscribeOn(threadPool.getScheduler())” 在 “线程池隔离”模式下会让调用在线程池中执行。 而在“信号量隔离”模式下没有特殊设置,默认是在当前线程执行
    • execution.timeout.enabled

      • com.netflix.hystrix.AbstractCommand#executeCommandAndObserve
      private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
          ...
          if (properties.executionTimeoutEnabled().get()) {
              execution = executeCommandWithSpecifiedIsolation(_cmd)
                      .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
          } else {
              execution = executeCommandWithSpecifiedIsolation(_cmd);
          }
      
          ...
      }
      
      • lift是一个通用的Obserable操作,类似于代理,里面添加了超时的拦截逻辑。
      • 内部会创建一个TimeListner在另外的线程中固定时间后调用,幷取消下游订阅,抛出超时异常等,详细可以查看TimerListener#tick功能
    • execution.isolation.thread.timeoutInMilliseconds

      • 参考TimerListener#getIntervalTimeInMilliseconds
    • execution.isolation.thread.interruptOnTimeout

      • 当超时发生时会调用TimerListener.tick方法,里面会调用unsubscribe
      • com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$FutureCompleterWithConfigurableInterrupt#unsubscribe
      private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
          public void unsubscribe() {
              if (shouldInterruptThread.call()) {
                  futureTask.cancel(true);
              }else{
                  futureTask.cancel(false);
              }
          }
      }
      
    • execution.isolation.thread.interruptOnCancel

      • 当原始接口返回Future类型的时候,这时候任务可以被外面手动cancel。这个配置就有作用了。
      • com.netflix.hystrix.HystrixCommand#queue
          public Future<R> queue() {
              final Future<R> f = new Future<R>() {
                   public boolean cancel(boolean mayInterruptIfRunning) {
                          ...
                          final boolean res = delegate.cancel(interruptOnFutureCancel.get());
                          if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
                              final Thread t = executionThread.get();
                              t.interrupt();
                          }
                          ...
                   }
              }
          }
      
    • execution.isolation.semaphore.maxConcurrentRequests

      • 指定了SEMAPHORE模式下的最大并发数
      • AbstractCommand$TryableSemaphore接口和JDK的Semaphore功能类似,不过这个不会阻塞,并发性能更好。
      • 使用方式参考AbstractCommand#applyHystrixSemantics
  • Fallback

    • fallback.isolation.semaphore.maxConcurrentRequests
    • fallback.enabled
    • SEMAPHORE用法同EXECUTION, 无论EXECUTION是什么模式,fallback都是SEMAPHORE模式
  • Circuit Breaker

    • circuitBreaker.enabled

      • AbstractCommand#initCircuitBreaker
      if (enabled) {
          HystrixCircuitBreaker.Factory.getInstance(...);
      }else{
          return new NoOpCircuitBreaker();
      }
      
      • AbstractCommand#applyHystrixSemantics
          if (circuitBreaker.attemptExecution()) {
              ...//继续执行
          }else{
              return handleShortCircuitViaFallback();//直接调用fallback
          }
      }       ```
      
      
    • circuitBreaker.requestVolumeThreshold

    • circuitBreaker.errorThresholdPercentage

      • com.netflix.hystrix.HystrixCircuitBreaker$HystrixCircuitBreakerImpl#subscribeToStream
          public void onNext(HealthCounts hc) {
              if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
                  // we are not past the minimum volume threshold for the stat windo
              }else{
                  if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
                      //we are not past the minimum error threshold for the stat window
                  }else{
                      if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
                          circuitOpened.set(System.currentTimeMillis());
                      }
                  }
              }
          }
      
    • circuitBreaker.sleepWindowInMilliseconds

    • circuitBreaker.forceOpen

    • circuitBreaker.forceClosed

      • com.netflix.hystrix.HystrixCircuitBreaker$HystrixCircuitBreakerImpl#allowRequest
          public boolean allowRequest() {
              if (properties.circuitBreakerForceOpen().get()) {
                  return false;
              }
              if (properties.circuitBreakerForceClosed().get()) {
                  return true;
              }
              if (circuitOpened.get() == -1) {
                  return true;
              }else{
                  if (status.get().equals(Status.HALF_OPEN)) {
                      return false;
                  }else{
                      return isAfterSleepWindow();
                  }
              }
          }
          
          private boolean isAfterSleepWindow() {
              final long circuitOpenTime = circuitOpened.get();
              final long currentTime = System.currentTimeMillis();
              final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
              return currentTime > circuitOpenTime + sleepWindowTime;
          }
      
  • Metrics

    • metrics.rollingStats.timeInMilliseconds
    • metrics.rollingStats.numBuckets
    • metrics.rollingPercentile.enabled
    • metrics.rollingPercentile.timeInMilliseconds
    • metrics.rollingPercentile.numBuckets
    • metrics.rollingPercentile.bucketSize
    • metrics.healthSnapshot.intervalInMilliseconds
    • 大致类结构是这样的
      • BucketedCounterStream 桶计算基类
        • BucketedCumulativeCounterStream 累计桶计算基类
          • CumulativeCollapserEventCounterStream 累计计算Collapser事件
          • CumulativeCommandEventCounterStream 累计计算Command执行事件
          • CumulativeThreadPoolEventCounterStream 累计计算线程池事件
        • BucketedRollingCounterStream 滚动桶计算基类
          • HealthCountsStream 健康状态统计(用于断路器)
          • RollingCollapserEventCounterStream 滚动计算Collapser事件
          • RollingCommandEventCounterStream 滚动计算Command执行事件
          • RollingThreadPoolEventCounterStream 滚动计算线程池事件
      • RollingDistributionStream 直方图基类(百分比)
        • RollingCollapserBatchSizeDistributionStream 统计Collapser批大小
        • RollingCommandLatencyDistributionStream 统计Command执行延迟
        • RollingCommandUserLatencyDistributionStream 统计用户线程执行延迟
      • metrics.rollingStats.* 对大多数桶计算实现有效
      • metrics.rollingPercentile.* 对所有直方图统计有效
      • metrics.healthSnapshot.intervalInMilliseconds特殊,只用于HealthCountsStream, 断路器会使用这个统计数据来执行断路判断
    • 整个metric都是类似的套路,统计滑动时间窗口内的数据。主要是用到了Rxjava的window方法
    • 我们以com.netflix.hystrix.metric.consumer.RollingDistributionStream为例
    rollingDistributionStream = stream
        .observe()
        .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //按时间窗口分组事件
        .flatMap(reduceBucketToSingleDistribution)     //把事件转换成数据 
        .startWith(emptyDistributionsToStart)          //数据初始结构
        .window(numBuckets, 1)                         //切分成numBuckets份,每次滑动一份大小的窗口
        .flatMap(reduceWindowToSingleDistribution)     //统计每个窗口numBuckets份的数据
        .map(cacheHistogramValues)                     //其他逻辑
        .share()                                       //缓存计算
        .onBackpressureDrop();                         //下游计算跟不上上游发送时,丢弃数据          
    
    
  • Request Context

    • requestCache.enabled

      • 开启HTTP request scope的缓存执行,同请求在线程间共享
      • 需要设置缓存的key, 可以使用@CacheResult/@CacheKey/实现AbstractCommand#getCacheKey其中一种来实现
      • 需要一个servletFilter来开启和结束上下文
      public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
          HystrixRequestContext context = HystrixRequestContext.initializeContext();
          try {
              chain.doFilter(request, response);
          } finally {
              context.shutdown();
          }
      }
      
      • 用的人不是很多,可以用SpringCache + @RequestScope实现
    • requestLog.enabled

      • 貌似只是记录了所有命令的执行情况,幷没有实际的打印动作。可以自己实现
      • 参考HystrixRequestLog#getExecutedCommandsAsString
  • Collapser Properties

    • maxRequestsInBatch

      • com.netflix.hystrix.collapser.RequestBatch#offer
      public Observable<ResponseType> offer(RequestArgumentType arg) {
           ...
           if (argumentMap.size() >= maxBatchSize) {
              return null;//超过批量大小,外层拿到null之后会新建一个batch
         }else{
              CollapsedRequestSubject<> collapsedRequest = new CollapsedRequestSubject<>(arg, this);
              //放入缓存
              CollapsedRequestSubject<> existing = argumentMap.putIfAbsent(arg, collapsedRequest);
              if (existing != null) {
                  return existing.toObservable();
              }else{
                  return collapsedRequest.toObservable();
              }
         }
      
      }
      
    • timerDelayInMilliseconds

      • com.netflix.hystrix.collapser.RequestCollapser#submitRequest
      public Observable<ResponseType> submitRequest(final RequestArgumentType arg) {
          if (!timerListenerRegistered.get() && timerListenerRegistered.compareAndSet(false, true)) {
              /* schedule the collapsing task to be executed every x milliseconds (x defined inside CollapsedTask) */
              //设置定时任务,timerDelayInMilliseconds后运行
              timerListenerReference.set(timer.addListener(new CollapsedTask()));
          }
      }
      
      private class CollapsedTask implements TimerListener{
          public int getIntervalTimeInMilliseconds() {
              return properties.timerDelayInMilliseconds().get();
          }
      
          public void tick(){
              RequestBatch<> currentBatch = batch.get();
              if (currentBatch != null && currentBatch.getSize() > 0) {
                  //新建一个batch,幷执行前一个batch
                  createNewBatchAndExecutePreviousIfNeeded(currentBatch);//新建一个batch,幷执行前一个batch
              }
          }
      }
      
    • requestCache.enabled

      • collapser用的缓存开关
  • Thread Pool Properties

    • coreSize
    • maximumSize
    • maxQueueSize
    • queueSizeRejectionThreshold
    • keepAliveTimeMinutes
    • allowMaximumSizeToDivergeFromCoreSize
    • metrics.rollingStats.timeInMilliseconds
    • metrics.rollingStats.numBuckets
    • 基本都是ThreadPool的常规配置, 详见HystrixConcurrencyStrategy#getThreadPool
    • 官方推荐线程数设置公式为 线程池大小 = 峰值每秒请求数 * 99%延迟大小 + 富余空间。 比如 30rps * 0.2延迟 = 6, 给一个富余比例可以设为10

其他

  • spring如何初始化hystrix?

    1. @EnableCircuitBreaker

    2. spring-cloud-netflix-core -> spring.factories

      org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
      org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration
      
    3. HystrixCircuitBreakerConfiguration

    4. HystrixCommandAspect

    5. HystrixCommandAspect#methodsAnnotatedWithHystrixCommand

  • spring的配置properties是如何注入的?

    1. spring-cloud-netflix-hystrix 会引入 spring-cloud-netflix-archaius
    2. 他的spring.factories中有 ArchaiusAutoConfiguration
    3. ArchaiusAutoConfiguration#configurableEnvironmentConfiguration
    4. ArchaiusAutoConfiguration#configureArchaius
    5. ArchaiusAutoConfiguration#addArchaiusConfiguration
    6. 以上三部将spring的properties配置转换成netflix的动态ConfigurationManager
  • hystrix属性是如何动态更新的?

    1. ArchaiusAutoConfiguration$PropagateEventsConfiguration#onApplicationEvent监听spring的EnvironmentChangeEvent事件,幷转发给netflix的配置管理器

      public void onApplicationEvent(EnvironmentChangeEvent event) {
          AbstractConfiguration manager = ConfigurationManager.getConfigInstance();
          for (String key : event.getKeys()) {
              for (ConfigurationListener listener : manager
                      .getConfigurationListeners()) {
                  listener.configurationChanged(new ConfigurationEvent(source, type,
                          key, value, beforeUpdate));         
              }
          }
      }
      
    2. ExpandedConfigurationListenerAdapter#configurationChanged

    3. DynamicProperty$DynamicPropertyListener#setProperty

          private static boolean updateProperty(String propName, Object value) {
              DynamicProperty prop = ALL_PROPS.get(propName);
              if (prop != null && prop.updateValue(value)) {
                  prop.notifyCallbacks();
                  return true;
              }
              return false;
          }
      
    4. HystrixProperty -> HystrixDynamicProperty -> ArchaiusDynamicProperty -> PropertyWrapper -> DynamicProperty

    5. DynamicProperty就是我们最终获得值和更新值的地方

相关文章

网友评论

    本文标题:Hystrix配置项详解

    本文链接:https://www.haomeiwen.com/subject/xuzvyhtx.html