Hystrix

作者: Deeglose | 来源:发表于2018-12-02 14:51 被阅读12次

配置

    <properties>
        <!-- 依赖版本 -->
        <hystrix.version>1.3.16</hystrix.version>
        <hystrix-metrics-event-stream.version>1.1.2</hystrix-metrics-event-stream.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-core</artifactId>
            <version>${hystrix.version}</version>
        </dependency>
        <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-metrics-event-stream</artifactId>
            <version>${hystrix-metrics-event-stream.version}</version>
        </dependency>
    </dependencies>

使用

超时降级

package com.fulton_shaw;

import com.fulton_shaw.common.util.concurrent.ThreadUtils;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;

/**
 * @author xiaohuadong
 * @date 2018/11/06
 */
public class FallbackCommand extends HystrixCommand<String> {
    public FallbackCommand(String name){
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(name))
        .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationThreadTimeoutInMilliseconds(500)));

    }

    @Override
    protected String getFallback() {
        return "FALLBACK";
    }

    protected String run() throws Exception {
        ThreadUtils.sleepRandom(300,700);
        return "SUCCEED RUN";
    }
}
package com.fulton_shaw;

import com.fulton_shaw.common.util.concurrent.ExpectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Callable;

/**
 * @author xiaohuadong
 * @date 2018/11/06
 */
public class FallbackMain {
    private static final Logger LOG = LoggerFactory.getLogger(FallbackMain.class);
    public static void main(String[] args) {
        ExpectUtils.untilTrueCount(20, new Callable<Boolean>() {
            public Boolean call() throws Exception {
                FallbackCommand command = new FallbackCommand("will-you-fallback");

                String result = command.execute();
                LOG.info("{}", result);
                return "FALLBACK".equals(result);
            }
        });

        LOG.info("end");

        System.exit(0);

    }
}

基本设计

接口+Factory+intern构成一个Hystrix的全局注册中心
同时,将UnitTest放到类的后面,以便发布时,将测试包含在内。

很大量是依赖RxJava库,其中,Observable,Operator非常经典。
HystrixCommand运行时,是处于线程池中的。

服务场景

1.单个服务熔断:一个服务正常能够接受2个并发访问,测试4个并发访问,带有超时的场景

2.服务之间存在依赖的熔断:服务A正常接受5个并发访问,服务B接受3个并发访问,服务C接受2个并发访问,服务A依赖B,B依赖C。

单服务模式下Hystrix的使用测试

构造ThroughputService代表一般的服务,该服务能够在一定时间内返回,同时,有一个支持的最大并发量上限

package com.fulton_shaw.common.lang.testing;


import com.fulton_shaw.common.lang.FixedValueGetter;
import com.fulton_shaw.common.lang.ValueGetter;
import com.fulton_shaw.common.util.concurrent.ThreadUtils;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author xiaohuadong
 * @date 2018/11/08
 */
@ThreadSafe
public class ThroughPutService implements Service {
    private static final Logger LOG = LoggerFactory.getLogger(ThroughPutService.class);


    /* 服务时间 , 单位:ms  小于等于0时,无需等待 */
    private ValueGetter<Long> serviceTimeStrategy;

    /* 最大并发数 , 小于等于0时,无限制*/
    private final int maxConcurrent;

    /* 服务调用 */
    private Runnable beforeService;
    private Runnable afterService;


    /* 服务拒绝时调用 */
    private Runnable onDeny;


    /* 服务异常时调用 */
    private Function<Exception,Void> onException;

    private AtomicInteger currentConcurrent;


  /**
     * 调用服务
     *
     * @return 服务是否成功
     */
    public boolean tryService() {
        boolean canService = false;
        LOG.debug("{} --> beginning tryService,current concurrent = {}",ThreadUtils.getCurrentThreadName(),currentConcurrent.get());
        if (maxConcurrent <= 0) {
            currentConcurrent.incrementAndGet();
            canService = true;
        } else {
            // CAS
            while (true) {
                int old = currentConcurrent.get();
                if(old >= maxConcurrent){
                    break;
                }else if(currentConcurrent.compareAndSet(old, old + 1)){
                    canService = true;
                    break;
                }
            }
        }


        try {
            if (canService) {
                LOG.debug("{} --> gain tryService,current concurrent = {}",ThreadUtils.getCurrentThreadName(),currentConcurrent.get());
                beforeService.run();
                Long sleep = serviceTimeStrategy.get();
                LOG.debug("{} --> will sleep {} ms",ThreadUtils.getCurrentThreadName(),sleep);
                ThreadUtils.sleep(sleep);
                currentConcurrent.decrementAndGet();
                afterService.run();
            } else {
                onDeny.run();
            }
        }catch (Exception e){
            try {
                onException.apply(e);
            }catch (Exception ae){
                LOG.error("error happened while handing {}",e.toString(),ae);
            }
        }


        LOG.debug("{} --> end tryService,current concurrent = {}",ThreadUtils.getCurrentThreadName(),currentConcurrent.get());

        return canService;
    }

}

ThroughPutService类的tryService方法尝试调用一个服务,如果当前并发量尚未达到上限,则服务,否则,调用onDeny。若有错误发生,则调用onException。
下面是对该Service的一个测试

public class ThroughPutServiceMain {
    public static void main(String[] args) {
        final ThroughPutService service = ThroughPutService.makeService(new RandomLongGetter(300, 500), 2);
        ThreadUtils.startThreadForTimesAndWait(5, new ProcessMethod<Integer>() {
            @Nullable
            @Override
            public void apply(@Nullable Integer input) {
                service.tryService();
            }
        });
    }

}

serivce的服务时间在300~500毫秒之间,随机返回。最大允许的并发量是2。测试程序开启了5个线程来消费这个服务,下面的结果显示,只有其中两个线程得到服务,其他均被拒绝。
得到服务的是Thread-2和Thread-3, gain tryService.
输出结果:

15:39:46.789 [Thread-2] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-2 --> beginning tryService,current concurrent = 0
15:39:46.789 [Thread-4] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-4 --> beginning tryService,current concurrent = 0
15:39:46.789 [Thread-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-3 --> beginning tryService,current concurrent = 0
15:39:46.789 [Thread-5] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-5 --> beginning tryService,current concurrent = 0
15:39:46.789 [Thread-1] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-1 --> beginning tryService,current concurrent = 0
15:39:46.791 [Thread-5] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-5 --> end tryService,current concurrent = 2
15:39:46.791 [Thread-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-3 --> gain tryService,current concurrent = 2
15:39:46.791 [Thread-2] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-2 --> gain tryService,current concurrent = 2
15:39:46.791 [Thread-1] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-1 --> end tryService,current concurrent = 2
15:39:46.791 [Thread-4] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-4 --> end tryService,current concurrent = 2
15:39:46.791 [Thread-2] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-2 --> will sleep 415 ms
15:39:46.791 [Thread-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-3 --> will sleep 461 ms
15:39:47.206 [Thread-2] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-2 --> end tryService,current concurrent = 1
15:39:47.252 [Thread-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-3 --> end tryService,current concurrent = 0

ThroughPutService不能服务的情况有一种:就是最大并发量。
但是对于客户端而言,ThroughPutService不能服务还包括一种情况:ThroughPutService本身的服务质量--响应时间超时。

HystrixCommand封装ThroughPutService

我们首先识别出ThroughPutService服务不可用的情况,就是:1.超过服务的最大并发量 2.服务本身超时
在下面的Command的封装中,run会调用service,但是service可能返回false,因此,在返回false的情况下,我们应当抛出异常,表明服务不可用,迫使Hystrix转向getFallback()调用。
此外,即使service正常可用,但是服务时间超时,Hystrix本身会对run进行超时判断,因此,它也会转向getFallback()

package com.fulton_shaw.third_party_demo.hystrix;

public class SingleServiceCommand extends HystrixCommand<Void> {
    private static final Logger LOG = LoggerFactory.getLogger(SingleServiceCommand.class);
    private ThroughPutService service;


    protected SingleServiceCommand(ThroughPutService service,long time) {
        super(HystrixCommand.Setter.
                withGroupKey(HystrixCommandGroupKey.Factory.asKey(SingleServiceCommand.class.getSimpleName()))
                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutEnabled(true).withExecutionTimeoutInMilliseconds((int)time))
        );
        this.service = service;
    }

    @Override
    protected Void run() throws Exception {
        boolean succeed = service.tryService();
        LOG.info("{} --> tryService succeed? {} ",ThreadUtils.getCurrentThreadName(),succeed);
        if(!succeed){
            throw  new RuntimeException(String.format("%s --> try service rejected", ThreadUtils.getCurrentThreadName()));
        }
        return null;
    }

    @Override
    protected Void getFallback() {
        LOG.info("{} --> getting Fallback",Thread.currentThread().getName());
        return null;
    }
}

测试封装的HystrixCommand

package com.fulton_shaw.third_party_demo.hystrix;

public class SingleServiceDemo {
    private static final Logger LOG = LoggerFactory.getLogger(SingleServiceDemo.class);

    public static void main(String[] args) {

        final ThroughPutService service = ThroughPutService.makeService(new RandomLongGetter(1000, 3000), 2);


        final AtomicInteger denied = new AtomicInteger(0);
        service.setOnDeny(new Runnable() {
            @Override
            public void run() {
                denied.incrementAndGet();
            }
        });

        int timeout = 2000;
        int commandCount = 4;
        final SingleServiceCommand[] commands = new SingleServiceCommand[commandCount];
        for (int i = 0; i < commandCount; i++) {
            commands[i] = new SingleServiceCommand(service,timeout);
        }

        ThreadUtils.startThreadForTimesAndWait(4, new ProcessMethod<Integer>() {
            @Nullable
            @Override
            public void apply(@Nullable Integer i) {
                commands[i].execute();
            }
        });

        LOG.info("concurrent denied = {}", denied.get());
    }
}

在上面的程序中,我们新建了一个服务,随机返回时间是1000-3000ms之间,我们将服务的超时时间设置为2000ms,然后使用denied变量记录服务被拒绝的数量。然后,我们新建了4个对该服务的调用实例,由Hystrix管理。
从下面的输出结果中,我们可以看出4和3获得服务执行,其中4超时,3没有超时,1和2均被拒绝服务。
由于1,2被拒绝服务,因此他们抛出异常之后,Hystrix转向了Fallback,我们可以看到的时,抛出异常之后,getFallback仍然处于同一线程之中。
但是4转向Fallback之后,却是由HystrixTimer-1来调用的,而原来的线程仍然继续运行,我们可以看到4最后也输出了 end tryService.
输出结果:

16:22:04.243 [hystrix-SingleServiceCommand-4] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-4 --> beginning tryService,current concurrent = 0
16:22:04.244 [hystrix-SingleServiceCommand-4] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-4 --> gain tryService,current concurrent = 1
16:22:04.243 [hystrix-SingleServiceCommand-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-3 --> beginning tryService,current concurrent = 0
16:22:04.244 [hystrix-SingleServiceCommand-4] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-4 --> will sleep 2665 ms
16:22:04.244 [hystrix-SingleServiceCommand-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-3 --> gain tryService,current concurrent = 2
16:22:04.244 [hystrix-SingleServiceCommand-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-3 --> will sleep 1284 ms
16:22:04.244 [hystrix-SingleServiceCommand-1] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-1 --> beginning tryService,current concurrent = 2
16:22:04.244 [hystrix-SingleServiceCommand-1] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-1 --> end tryService,current concurrent = 2
16:22:04.244 [hystrix-SingleServiceCommand-1] INFO  com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - hystrix-SingleServiceCommand-1 --> tryService succeed? false
16:22:04.244 [hystrix-SingleServiceCommand-2] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-2 --> beginning tryService,current concurrent = 2
16:22:04.244 [hystrix-SingleServiceCommand-2] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-2 --> end tryService,current concurrent = 2
16:22:04.244 [hystrix-SingleServiceCommand-2] INFO  com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - hystrix-SingleServiceCommand-2 --> tryService succeed? false
16:22:04.245 [hystrix-SingleServiceCommand-1] DEBUG com.netflix.hystrix.AbstractCommand - Error executing HystrixCommand.run(). Proceeding to fallback logic ...
java.lang.RuntimeException: hystrix-SingleServiceCommand-1 --> try service rejected
at com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand.run(SingleServiceCommand.java:34) ~[classes/:?]
...
16:22:04.245 [hystrix-SingleServiceCommand-2] DEBUG com.netflix.hystrix.AbstractCommand - Error executing HystrixCommand.run(). Proceeding to fallback logic ...
java.lang.RuntimeException: hystrix-SingleServiceCommand-2 --> try service rejected
at com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand.run(SingleServiceCommand.java:34) ~[classes/:?]
...
16:22:04.254 [hystrix-SingleServiceCommand-2] INFO  com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - hystrix-SingleServiceCommand-2 --> getting Fallback
16:22:04.257 [hystrix-SingleServiceCommand-1] INFO  com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - hystrix-SingleServiceCommand-1 --> getting Fallback
16:22:05.528 [hystrix-SingleServiceCommand-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-3 --> end tryService,current concurrent = 1
16:22:05.528 [hystrix-SingleServiceCommand-3] INFO  com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - hystrix-SingleServiceCommand-3 --> tryService succeed? true
16:22:06.231 [hystrix-SingleServiceCommand-4] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-4 --> end tryService,current concurrent = 0
16:22:06.231 [hystrix-SingleServiceCommand-4] INFO  com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - hystrix-SingleServiceCommand-4 --> tryService succeed? true
16:22:06.232 [HystrixTimer-1] INFO  com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - HystrixTimer-1 --> getting Fallback
16:22:06.233 [main] INFO  com.fulton_shaw.third_party_demo.hystrix.SingleServiceDemo - concurrent denied = 2

多服务和服务存在依赖的场景测试

2.服务之间存在依赖的熔断:服务A正常接受5个并发访问,服务B接受3个并发访问,服务C接受2个并发访问,服务A依赖B,B依赖C。
只需要将服务

参考

https://www.jianshu.com/p/fc19f6ed6d0d

https://www.jianshu.com/p/05f3e75b3739

Hystrix中的设计哲学

Hystrix使用总结

execute方法会开启新的线程执行,当前线程阻塞等待那个线程。
所谓信号量隔离,是指当服务并发量大于信号量设定的值时,自动降级
熔断是指降级的终极情况,

  • 假设线路内的容量(请求QPS)达到一定阈值(通过 HystrixCommandProperties.circuitBreakerRequestVolumeThreshold() 配置)
  • 同时,假设线路内的错误率达到一定阈值(通过 HystrixCommandProperties.circuitBreakerErrorThresholdPercentage() 配置)
  • 熔断器将从『闭路』转换成『开路』
  • 若此时是『开路』状态,熔断器将短路后续所有经过该熔断器的请求,这些请求直接走『失败回退逻辑』
  • 经过一定时间(即『休眠窗口』,通过 HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds() 配置),后续第一个请求将会被允许通过熔断器(此时熔断器处于『半开』状态),若该请求失败,熔断器将又进入『开路』状态,且在休眠窗口内保持此状态;若该请求成功,熔断器将进入『闭路』状态,回到逻辑1循环往复。

其他特性

缓存
组合请求
Dashboard监控

相关文章

网友评论

      本文标题:Hystrix

      本文链接:https://www.haomeiwen.com/subject/rjfpxqtx.html