Hystrix

作者: Deeglose | 来源:发表于2018-12-02 14:51 被阅读12次

    配置

        <properties>
            <!-- 依赖版本 -->
            <hystrix.version>1.3.16</hystrix.version>
            <hystrix-metrics-event-stream.version>1.1.2</hystrix-metrics-event-stream.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>com.netflix.hystrix</groupId>
                <artifactId>hystrix-core</artifactId>
                <version>${hystrix.version}</version>
            </dependency>
            <dependency>
                <groupId>com.netflix.hystrix</groupId>
                <artifactId>hystrix-metrics-event-stream</artifactId>
                <version>${hystrix-metrics-event-stream.version}</version>
            </dependency>
        </dependencies>
    

    使用

    超时降级

    package com.fulton_shaw;
    
    import com.fulton_shaw.common.util.concurrent.ThreadUtils;
    import com.netflix.hystrix.HystrixCommand;
    import com.netflix.hystrix.HystrixCommandGroupKey;
    import com.netflix.hystrix.HystrixCommandProperties;
    
    /**
     * @author xiaohuadong
     * @date 2018/11/06
     */
    public class FallbackCommand extends HystrixCommand<String> {
        public FallbackCommand(String name){
            super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(name))
            .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionIsolationThreadTimeoutInMilliseconds(500)));
    
        }
    
        @Override
        protected String getFallback() {
            return "FALLBACK";
        }
    
        protected String run() throws Exception {
            ThreadUtils.sleepRandom(300,700);
            return "SUCCEED RUN";
        }
    }
    
    package com.fulton_shaw;
    
    import com.fulton_shaw.common.util.concurrent.ExpectUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.concurrent.Callable;
    
    /**
     * @author xiaohuadong
     * @date 2018/11/06
     */
    public class FallbackMain {
        private static final Logger LOG = LoggerFactory.getLogger(FallbackMain.class);
        public static void main(String[] args) {
            ExpectUtils.untilTrueCount(20, new Callable<Boolean>() {
                public Boolean call() throws Exception {
                    FallbackCommand command = new FallbackCommand("will-you-fallback");
    
                    String result = command.execute();
                    LOG.info("{}", result);
                    return "FALLBACK".equals(result);
                }
            });
    
            LOG.info("end");
    
            System.exit(0);
    
        }
    }
    

    基本设计

    接口+Factory+intern构成一个Hystrix的全局注册中心
    同时,将UnitTest放到类的后面,以便发布时,将测试包含在内。

    很大量是依赖RxJava库,其中,Observable,Operator非常经典。
    HystrixCommand运行时,是处于线程池中的。

    服务场景

    1.单个服务熔断:一个服务正常能够接受2个并发访问,测试4个并发访问,带有超时的场景

    2.服务之间存在依赖的熔断:服务A正常接受5个并发访问,服务B接受3个并发访问,服务C接受2个并发访问,服务A依赖B,B依赖C。

    单服务模式下Hystrix的使用测试

    构造ThroughputService代表一般的服务,该服务能够在一定时间内返回,同时,有一个支持的最大并发量上限

    package com.fulton_shaw.common.lang.testing;
    
    
    import com.fulton_shaw.common.lang.FixedValueGetter;
    import com.fulton_shaw.common.lang.ValueGetter;
    import com.fulton_shaw.common.util.concurrent.ThreadUtils;
    import com.google.common.base.Function;
    import com.google.common.base.Preconditions;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import javax.annotation.Nullable;
    import javax.annotation.concurrent.ThreadSafe;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @author xiaohuadong
     * @date 2018/11/08
     */
    @ThreadSafe
    public class ThroughPutService implements Service {
        private static final Logger LOG = LoggerFactory.getLogger(ThroughPutService.class);
    
    
        /* 服务时间 , 单位:ms  小于等于0时,无需等待 */
        private ValueGetter<Long> serviceTimeStrategy;
    
        /* 最大并发数 , 小于等于0时,无限制*/
        private final int maxConcurrent;
    
        /* 服务调用 */
        private Runnable beforeService;
        private Runnable afterService;
    
    
        /* 服务拒绝时调用 */
        private Runnable onDeny;
    
    
        /* 服务异常时调用 */
        private Function<Exception,Void> onException;
    
        private AtomicInteger currentConcurrent;
    
    
      /**
         * 调用服务
         *
         * @return 服务是否成功
         */
        public boolean tryService() {
            boolean canService = false;
            LOG.debug("{} --> beginning tryService,current concurrent = {}",ThreadUtils.getCurrentThreadName(),currentConcurrent.get());
            if (maxConcurrent <= 0) {
                currentConcurrent.incrementAndGet();
                canService = true;
            } else {
                // CAS
                while (true) {
                    int old = currentConcurrent.get();
                    if(old >= maxConcurrent){
                        break;
                    }else if(currentConcurrent.compareAndSet(old, old + 1)){
                        canService = true;
                        break;
                    }
                }
            }
    
    
            try {
                if (canService) {
                    LOG.debug("{} --> gain tryService,current concurrent = {}",ThreadUtils.getCurrentThreadName(),currentConcurrent.get());
                    beforeService.run();
                    Long sleep = serviceTimeStrategy.get();
                    LOG.debug("{} --> will sleep {} ms",ThreadUtils.getCurrentThreadName(),sleep);
                    ThreadUtils.sleep(sleep);
                    currentConcurrent.decrementAndGet();
                    afterService.run();
                } else {
                    onDeny.run();
                }
            }catch (Exception e){
                try {
                    onException.apply(e);
                }catch (Exception ae){
                    LOG.error("error happened while handing {}",e.toString(),ae);
                }
            }
    
    
            LOG.debug("{} --> end tryService,current concurrent = {}",ThreadUtils.getCurrentThreadName(),currentConcurrent.get());
    
            return canService;
        }
    
    }
    

    ThroughPutService类的tryService方法尝试调用一个服务,如果当前并发量尚未达到上限,则服务,否则,调用onDeny。若有错误发生,则调用onException。
    下面是对该Service的一个测试

    public class ThroughPutServiceMain {
        public static void main(String[] args) {
            final ThroughPutService service = ThroughPutService.makeService(new RandomLongGetter(300, 500), 2);
            ThreadUtils.startThreadForTimesAndWait(5, new ProcessMethod<Integer>() {
                @Nullable
                @Override
                public void apply(@Nullable Integer input) {
                    service.tryService();
                }
            });
        }
    
    }
    

    serivce的服务时间在300~500毫秒之间,随机返回。最大允许的并发量是2。测试程序开启了5个线程来消费这个服务,下面的结果显示,只有其中两个线程得到服务,其他均被拒绝。
    得到服务的是Thread-2和Thread-3, gain tryService.
    输出结果:

    15:39:46.789 [Thread-2] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-2 --> beginning tryService,current concurrent = 0
    15:39:46.789 [Thread-4] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-4 --> beginning tryService,current concurrent = 0
    15:39:46.789 [Thread-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-3 --> beginning tryService,current concurrent = 0
    15:39:46.789 [Thread-5] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-5 --> beginning tryService,current concurrent = 0
    15:39:46.789 [Thread-1] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-1 --> beginning tryService,current concurrent = 0
    15:39:46.791 [Thread-5] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-5 --> end tryService,current concurrent = 2
    15:39:46.791 [Thread-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-3 --> gain tryService,current concurrent = 2
    15:39:46.791 [Thread-2] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-2 --> gain tryService,current concurrent = 2
    15:39:46.791 [Thread-1] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-1 --> end tryService,current concurrent = 2
    15:39:46.791 [Thread-4] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-4 --> end tryService,current concurrent = 2
    15:39:46.791 [Thread-2] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-2 --> will sleep 415 ms
    15:39:46.791 [Thread-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-3 --> will sleep 461 ms
    15:39:47.206 [Thread-2] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-2 --> end tryService,current concurrent = 1
    15:39:47.252 [Thread-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - Thread-3 --> end tryService,current concurrent = 0
    

    ThroughPutService不能服务的情况有一种:就是最大并发量。
    但是对于客户端而言,ThroughPutService不能服务还包括一种情况:ThroughPutService本身的服务质量--响应时间超时。

    HystrixCommand封装ThroughPutService

    我们首先识别出ThroughPutService服务不可用的情况,就是:1.超过服务的最大并发量 2.服务本身超时
    在下面的Command的封装中,run会调用service,但是service可能返回false,因此,在返回false的情况下,我们应当抛出异常,表明服务不可用,迫使Hystrix转向getFallback()调用。
    此外,即使service正常可用,但是服务时间超时,Hystrix本身会对run进行超时判断,因此,它也会转向getFallback()

    package com.fulton_shaw.third_party_demo.hystrix;
    
    public class SingleServiceCommand extends HystrixCommand<Void> {
        private static final Logger LOG = LoggerFactory.getLogger(SingleServiceCommand.class);
        private ThroughPutService service;
    
    
        protected SingleServiceCommand(ThroughPutService service,long time) {
            super(HystrixCommand.Setter.
                    withGroupKey(HystrixCommandGroupKey.Factory.asKey(SingleServiceCommand.class.getSimpleName()))
                    .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutEnabled(true).withExecutionTimeoutInMilliseconds((int)time))
            );
            this.service = service;
        }
    
        @Override
        protected Void run() throws Exception {
            boolean succeed = service.tryService();
            LOG.info("{} --> tryService succeed? {} ",ThreadUtils.getCurrentThreadName(),succeed);
            if(!succeed){
                throw  new RuntimeException(String.format("%s --> try service rejected", ThreadUtils.getCurrentThreadName()));
            }
            return null;
        }
    
        @Override
        protected Void getFallback() {
            LOG.info("{} --> getting Fallback",Thread.currentThread().getName());
            return null;
        }
    }
    

    测试封装的HystrixCommand

    package com.fulton_shaw.third_party_demo.hystrix;
    
    public class SingleServiceDemo {
        private static final Logger LOG = LoggerFactory.getLogger(SingleServiceDemo.class);
    
        public static void main(String[] args) {
    
            final ThroughPutService service = ThroughPutService.makeService(new RandomLongGetter(1000, 3000), 2);
    
    
            final AtomicInteger denied = new AtomicInteger(0);
            service.setOnDeny(new Runnable() {
                @Override
                public void run() {
                    denied.incrementAndGet();
                }
            });
    
            int timeout = 2000;
            int commandCount = 4;
            final SingleServiceCommand[] commands = new SingleServiceCommand[commandCount];
            for (int i = 0; i < commandCount; i++) {
                commands[i] = new SingleServiceCommand(service,timeout);
            }
    
            ThreadUtils.startThreadForTimesAndWait(4, new ProcessMethod<Integer>() {
                @Nullable
                @Override
                public void apply(@Nullable Integer i) {
                    commands[i].execute();
                }
            });
    
            LOG.info("concurrent denied = {}", denied.get());
        }
    }
    

    在上面的程序中,我们新建了一个服务,随机返回时间是1000-3000ms之间,我们将服务的超时时间设置为2000ms,然后使用denied变量记录服务被拒绝的数量。然后,我们新建了4个对该服务的调用实例,由Hystrix管理。
    从下面的输出结果中,我们可以看出4和3获得服务执行,其中4超时,3没有超时,1和2均被拒绝服务。
    由于1,2被拒绝服务,因此他们抛出异常之后,Hystrix转向了Fallback,我们可以看到的时,抛出异常之后,getFallback仍然处于同一线程之中。
    但是4转向Fallback之后,却是由HystrixTimer-1来调用的,而原来的线程仍然继续运行,我们可以看到4最后也输出了 end tryService.
    输出结果:

    16:22:04.243 [hystrix-SingleServiceCommand-4] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-4 --> beginning tryService,current concurrent = 0
    16:22:04.244 [hystrix-SingleServiceCommand-4] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-4 --> gain tryService,current concurrent = 1
    16:22:04.243 [hystrix-SingleServiceCommand-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-3 --> beginning tryService,current concurrent = 0
    16:22:04.244 [hystrix-SingleServiceCommand-4] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-4 --> will sleep 2665 ms
    16:22:04.244 [hystrix-SingleServiceCommand-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-3 --> gain tryService,current concurrent = 2
    16:22:04.244 [hystrix-SingleServiceCommand-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-3 --> will sleep 1284 ms
    16:22:04.244 [hystrix-SingleServiceCommand-1] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-1 --> beginning tryService,current concurrent = 2
    16:22:04.244 [hystrix-SingleServiceCommand-1] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-1 --> end tryService,current concurrent = 2
    16:22:04.244 [hystrix-SingleServiceCommand-1] INFO  com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - hystrix-SingleServiceCommand-1 --> tryService succeed? false
    16:22:04.244 [hystrix-SingleServiceCommand-2] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-2 --> beginning tryService,current concurrent = 2
    16:22:04.244 [hystrix-SingleServiceCommand-2] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-2 --> end tryService,current concurrent = 2
    16:22:04.244 [hystrix-SingleServiceCommand-2] INFO  com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - hystrix-SingleServiceCommand-2 --> tryService succeed? false
    16:22:04.245 [hystrix-SingleServiceCommand-1] DEBUG com.netflix.hystrix.AbstractCommand - Error executing HystrixCommand.run(). Proceeding to fallback logic ...
    java.lang.RuntimeException: hystrix-SingleServiceCommand-1 --> try service rejected
    at com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand.run(SingleServiceCommand.java:34) ~[classes/:?]
    ...
    16:22:04.245 [hystrix-SingleServiceCommand-2] DEBUG com.netflix.hystrix.AbstractCommand - Error executing HystrixCommand.run(). Proceeding to fallback logic ...
    java.lang.RuntimeException: hystrix-SingleServiceCommand-2 --> try service rejected
    at com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand.run(SingleServiceCommand.java:34) ~[classes/:?]
    ...
    16:22:04.254 [hystrix-SingleServiceCommand-2] INFO  com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - hystrix-SingleServiceCommand-2 --> getting Fallback
    16:22:04.257 [hystrix-SingleServiceCommand-1] INFO  com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - hystrix-SingleServiceCommand-1 --> getting Fallback
    16:22:05.528 [hystrix-SingleServiceCommand-3] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-3 --> end tryService,current concurrent = 1
    16:22:05.528 [hystrix-SingleServiceCommand-3] INFO  com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - hystrix-SingleServiceCommand-3 --> tryService succeed? true
    16:22:06.231 [hystrix-SingleServiceCommand-4] DEBUG com.fulton_shaw.common.lang.testing.ThroughPutService - hystrix-SingleServiceCommand-4 --> end tryService,current concurrent = 0
    16:22:06.231 [hystrix-SingleServiceCommand-4] INFO  com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - hystrix-SingleServiceCommand-4 --> tryService succeed? true
    16:22:06.232 [HystrixTimer-1] INFO  com.fulton_shaw.third_party_demo.hystrix.SingleServiceCommand - HystrixTimer-1 --> getting Fallback
    16:22:06.233 [main] INFO  com.fulton_shaw.third_party_demo.hystrix.SingleServiceDemo - concurrent denied = 2
    

    多服务和服务存在依赖的场景测试

    2.服务之间存在依赖的熔断:服务A正常接受5个并发访问,服务B接受3个并发访问,服务C接受2个并发访问,服务A依赖B,B依赖C。
    只需要将服务

    参考

    https://www.jianshu.com/p/fc19f6ed6d0d

    https://www.jianshu.com/p/05f3e75b3739

    Hystrix中的设计哲学

    Hystrix使用总结

    execute方法会开启新的线程执行,当前线程阻塞等待那个线程。
    所谓信号量隔离,是指当服务并发量大于信号量设定的值时,自动降级
    熔断是指降级的终极情况,

    • 假设线路内的容量(请求QPS)达到一定阈值(通过 HystrixCommandProperties.circuitBreakerRequestVolumeThreshold() 配置)
    • 同时,假设线路内的错误率达到一定阈值(通过 HystrixCommandProperties.circuitBreakerErrorThresholdPercentage() 配置)
    • 熔断器将从『闭路』转换成『开路』
    • 若此时是『开路』状态,熔断器将短路后续所有经过该熔断器的请求,这些请求直接走『失败回退逻辑』
    • 经过一定时间(即『休眠窗口』,通过 HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds() 配置),后续第一个请求将会被允许通过熔断器(此时熔断器处于『半开』状态),若该请求失败,熔断器将又进入『开路』状态,且在休眠窗口内保持此状态;若该请求成功,熔断器将进入『闭路』状态,回到逻辑1循环往复。

    其他特性

    缓存
    组合请求
    Dashboard监控

    相关文章

      网友评论

          本文标题:Hystrix

          本文链接:https://www.haomeiwen.com/subject/rjfpxqtx.html