美文网首页
Hystrix 初识

Hystrix 初识

作者: binge1024 | 来源:发表于2017-04-20 11:59 被阅读0次

    Netflix公司开源的Hystrix框架,对延迟和故障可以提供强大的容错能力,在分布式系统中对请求远程系统、服务或者第三方库产生的错误,通过熔断、线程池隔离等手段,可以及时停止系统中的级联错误从而起到自适应调节的作用。

    Hystrix 工作流程图

    工作流程分析

    从以上的流程图可以看出,首先我们的创建一个HystrixCommand或者HystrixObservableCommand实例,来代表向其他的的组件发出请求(指令),然后通过相关的方法来操作指令。主要有四个方法。HystrixCommand中有execute()[堵塞方法返回单个结果或者抛出异常]和queue()[异步方法,返回一个Future对象,可以从中取出单个结果。或者抛出异常]。HystrixObservableCommand中有observe()和toObservable(),这两个方法都返回Observable对象,代表(多个)操作结果,注意在observe()方法开始执行的时候就调用了相应的指令,而toObservable()方法相当于observe()方法的lazy加载,当我们去 subscribe的时候,它才去执行相应的指令。
    execute()和queue() 源码

     public R execute() {
            try {
                return queue().get(); //调用了queue().get()方法
            } catch (Exception e) {
                throw Exceptions.sneakyThrow(decomposeException(e));
            }
        }
    public Future<R> queue() {
            /*
             * The Future returned by Observable.toBlocking().toFuture() does not implement the
             * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
             * thus, to comply with the contract of Future, we must wrap around it.
             */
            final Future<R> delegate = toObservable().toBlocking().toFuture();
    
            final Future<R> f = new Future<R>() {
    
                @Override
                public boolean cancel(boolean mayInterruptIfRunning) {
                    if (delegate.isCancelled()) {
                        return false;
                    }
    
                    if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
                        /*
                         * The only valid transition here is false -> true. If there are two futures, say f1 and f2, created by this command
                         * (which is super-weird, but has never been prohibited), and calls to f1.cancel(true) and to f2.cancel(false) are
                         * issued by different threads, it's unclear about what value would be used by the time mayInterruptOnCancel is checked.
                         * The most consistent way to deal with this scenario is to say that if *any* cancellation is invoked with interruption,
                         * than that interruption request cannot be taken back.
                         */
                        interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
                 }
    
                    final boolean res = delegate.cancel(interruptOnFutureCancel.get());
    
                    if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
                        final Thread t = executionThread.get();
                        if (t != null && !t.equals(Thread.currentThread())) {
                            t.interrupt();
                        }
                    }
    
                    return res;
                 }
    
                @Override
                public boolean isCancelled() {
                    return delegate.isCancelled();
                 }
    
                @Override
                public boolean isDone() {
                    return delegate.isDone();
                 }
    
                @Override
                public R get() throws InterruptedException, ExecutionException {
                    return delegate.get();
                }
    
                @Override
                public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
                    return delegate.get(timeout, unit);
                }
    
            };
    
            /* special handling of error states that throw immediately */
            if (f.isDone()) {
                try {
                    f.get();
                    return f;
                } catch (Exception e) {
                    Throwable t = decomposeException(e);
                    if (t instanceof HystrixBadRequestException) {
                        return f;
                    } else if (t instanceof HystrixRuntimeException) {
                        HystrixRuntimeException hre = (HystrixRuntimeException) t;
                        switch (hre.getFailureType()) {
                         case COMMAND_EXCEPTION:
                         case TIMEOUT:
                              // we don't throw these types from queue() only from queue().get() as they are execution errors
                              return f;
                         default:
                              // these are errors we throw from queue() as they as rejection type errors
                              throw hre;
                         }
                    } else {
                        throw Exceptions.sneakyThrow(t);
                    }
                }
            }
    
            return f;
        }
    
    
    //HystrixObservableCommand extends AbstractCommand{observe(),toObservable()}
    
    public Observable<R> observe() {
            // us a ReplaySubject to buffer the eagerly subscribed-to Observable
            ReplaySubject<R> subject = ReplaySubject.create();
            // eagerly kick off subscription
            final Subscription sourceSubscription = toObservable().subscribe(subject);
            // return the subject that can be subscribed to later while the execution has already started
            return subject.doOnUnsubscribe(new Action0() {
                @Override
                public void call() {
                    sourceSubscription.unsubscribe();
                }
            });
        }
    
     public Observable<R> toObservable() {
            final AbstractCommand<R> _cmd = this;
    
            //doOnCompleted handler already did all of the SUCCESS work
            //doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work
            final Action0 terminateCommandCleanup = new Action0() {
    
                @Override
                public void call() {
                    if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
                        handleCommandEnd(false); //user code never ran
                    } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
                        handleCommandEnd(true); //user code did run
                    }
                }
            };
    
            //mark the command as CANCELLED and store the latency (in addition to standard cleanup)
            final Action0 unsubscribeCommandCleanup = new Action0() {
                @Override
                public void call() {
                    if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
                        if (!_cmd.executionResult.containsTerminalEvent()) {
                            _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
                            try {
                                executionHook.onUnsubscribe(_cmd);
                            } catch (Throwable hookEx) {
                                logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx);
                            }
                            _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
                                    .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
                        }
                        handleCommandEnd(false); //user code never ran
                    } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
                        if (!_cmd.executionResult.containsTerminalEvent()) {
                            _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
                            try {
                                executionHook.onUnsubscribe(_cmd);
                            } catch (Throwable hookEx) {
                                logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe", hookEx);
                            }
                            _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
                                    .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
                        }
                        handleCommandEnd(true); //user code did run
                    }
                }
            };
    
            final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                    if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                        return Observable.never();
                    }
                    return applyHystrixSemantics(_cmd);
                }
            };
    
            final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {
                @Override
                public R call(R r) {
                    R afterFirstApplication = r;
    
                    try {
                        afterFirstApplication = executionHook.onComplete(_cmd, r);
                    } catch (Throwable hookEx) {
                        logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx);
                    }
    
                    try {
                        return executionHook.onEmit(_cmd, afterFirstApplication);
                    } catch (Throwable hookEx) {
                        logger.warn("Error calling HystrixCommandExecutionHook.onEmit", hookEx);
                        return afterFirstApplication;
                    }
                }
            };
    
            final Action0 fireOnCompletedHook = new Action0() {
                @Override
                public void call() {
                    try {
                        executionHook.onSuccess(_cmd);
                    } catch (Throwable hookEx) {
                        logger.warn("Error calling HystrixCommandExecutionHook.onSuccess", hookEx);
                    }
                }
            };
    
            return Observable.defer(new Func0<Observable<R>>() {
                @Override
                public Observable<R> call() {
                     /* this is a stateful object so can only be used once */
                    if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
                        IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
                        //TODO make a new error type for this
                        throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted.", ex, null);
                    }
    
                    commandStartTimestamp = System.currentTimeMillis();
    
                    if (properties.requestLogEnabled().get()) {
                        // log this command execution regardless of what happened
                        if (currentRequestLog != null) {
                            currentRequestLog.addExecutedCommand(_cmd);
                        }
                    }
    
                    final boolean requestCacheEnabled = isRequestCachingEnabled();
                    final String cacheKey = getCacheKey();
    
                    /* try from cache first */
                    if (requestCacheEnabled) {
                        HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                        if (fromCache != null) {
                            isResponseFromCache = true;
                            return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                        }
                    }
    
                    Observable<R> hystrixObservable =
                            Observable.defer(applyHystrixSemantics)
                                    .map(wrapWithAllOnNextHooks);
    
                    Observable<R> afterCache;
    
                    // put in cache
                    if (requestCacheEnabled && cacheKey != null) {
                        // wrap it for caching
                        HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                        HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                        if (fromCache != null) {
                            // another thread beat us so we'll use the cached value instead
                            toCache.unsubscribe();
                            isResponseFromCache = true;
                            return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                        } else {
                            // we just created an ObservableCommand so we cast and return it
                            afterCache = toCache.toObservable();
                        }
                    } else {
                        afterCache = hystrixObservable;
                    }
    
                    return afterCache
                            .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                            .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                            .doOnCompleted(fireOnCompletedHook);
                }
            });
        }
    
    

    执行操作指令时,Hystrix首先会检查缓存内是否有对应指令的结果,如果有的话,将缓存的结果直接以Observable对象的形式返回。如果没有对应的缓存,Hystrix会检查Circuit Breaker的状态。如果Circuit Breaker的状态为开启状态,Hystrix将不会执行对应指令,而是直接进入失败处理状态。如果Circuit Breaker的状态为关闭状态,Hystrix会继续进行线程池、任务队列、信号量的检查,确认是否有足够的资源执行操作指令。如果资源满,Hystrix同样将不会执行对应指令并且直接进入失败处理状态。
    如果资源充足,Hystrix将会执行操作指令。操作指令的调用最终都会到这两个方法:

    /**
         * Implement this method with code to be executed when {@link #execute()} or {@link #queue()} are invoked.
         *
         * @return R response type
         * @throws Exception
         *             if command execution fails
         */
        protected abstract R run() throws Exception;
    
    /**
         * Implement this method with code to be executed when {@link #observe()} or {@link #toObservable()} are invoked.
         *
         * @return R response type
         */
        protected abstract Observable<R> construct();
    
    

    如果执行指令的时间超时,执行线程会抛出TimeoutException异常。Hystrix会抛弃结果并直接进入失败处理状态。如果执行指令成功,Hystrix会进行一系列的数据记录,然后返回执行的结果。
    同时,Hystrix会根据记录的数据来计算失败比率,一旦失败比率达到某一阈值将自动开启Circuit Breaker。
    最后我们再来看一下Hystrix是如何处理失败的。如果我们在Command中实现了HystrixCommand.getFallback()方法(或HystrixObservableCommand.resumeWithFallback()方法,Hystrix会返回对应方法的结果。如果没有实现这些方法的话,从底层看Hystrix将会返回一个空的Observable对象,并且可以通过onError来终止并处理错误。从上层看:
    1.execute方法将会抛出异常。
    2.queue方法将会返回一个失败状态的Future对象。
    3.observe()和toObservable()方法都会返回上述的Observable对象。

    参考

    1.https://github.com/Netflix/Hystrix/wiki/How-it-Works#flow3
    2.http://mp.weixin.qq.com/s/tKO8wmPyNHMh8Yotmm4L4w

    相关文章

      网友评论

          本文标题:Hystrix 初识

          本文链接:https://www.haomeiwen.com/subject/sbokzttx.html