美文网首页
Hystrix源码分析(三)-信号量

Hystrix源码分析(三)-信号量

作者: leiwingqueen | 来源:发表于2019-06-15 19:27 被阅读0次

    一、概要

    hystrix的执行隔离策略有两种。一种是线程池的模式,另外一种是信号量的模式。hystrix默认的策略是线程池的模式。

    • 线程池
      对于每个command的执行会启用一个新的线程,同一个command key会使用同一个线程池,不同的command key会使用不同的线程池就进行隔离。
      优点:由于command的执行隔离在不同的线程中,因此某一个command线程执行异常不会影响到其它command的执行。如果希望使用到异步的方式执行,只能选择这种模式。
      缺点:由于多了线程的介入,相当于增加了线程间的上下文切换的成本。另外如果系统的command key很多,可能会影响到系统的整体性能。
    • 信号量
      个人理解跟信号量其实没有太大的关联性。本质上是跟主线程共用一个线程,只是这里增加了一个最大并发数的限制(信号量)。
      优点:不涉及线程上下文切换。在大部分业务场景下其实是能满足的(eg.tomcat单请求单线程的业务,command key之间没有异步执行的需求)
      缺点:某一个command执行过程中导致线程崩溃会影响到整个流程的执行(共享线程)。

    二、源码分析

    1. 信号量的接口定义
    /* package */static interface TryableSemaphore {
    
            /**
             * Use like this:
             * <p>
             * 
             * <pre>
             * if (s.tryAcquire()) {
             * try {
             * // do work that is protected by 's'
             * } finally {
             * s.release();
             * }
             * }
             * </pre>
             * 
             * @return boolean
             */
            public abstract boolean tryAcquire();
    
            /**
             * ONLY call release if tryAcquire returned true.
             * <p>
             * 
             * <pre>
             * if (s.tryAcquire()) {
             * try {
             * // do work that is protected by 's'
             * } finally {
             * s.release();
             * }
             * }
             * </pre>
             */
            public abstract void release();
    
            public abstract int getNumberOfPermitsUsed();
    
        }
    

    接口的定义很简单,上层调用会先tryAcquire获得信号量,然后通过release方式释放信号量。

    1. 实现类
      实现类有两种。TryableSemaphoreActual和TryableSemaphoreNoOp。
    • TryableSemaphoreActual
      信号量的实现类。
    • TryableSemaphoreNoOp
      这个实现类相当于不限制,线程池模式的实现类采用的方式就是这种。
    /* package */static class TryableSemaphoreActual implements TryableSemaphore {
            protected final HystrixProperty<Integer> numberOfPermits;
            private final AtomicInteger count = new AtomicInteger(0);
    
            public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) {
                this.numberOfPermits = numberOfPermits;
            }
    
            @Override
            public boolean tryAcquire() {
                int currentCount = count.incrementAndGet();
                if (currentCount > numberOfPermits.get()) {
                    count.decrementAndGet();
                    return false;
                } else {
                    return true;
                }
            }
    
            @Override
            public void release() {
                count.decrementAndGet();
            }
    
            @Override
            public int getNumberOfPermitsUsed() {
                return count.get();
            }
    
        }
    

    这个实现类就不多加解释了,主要是用了AtomicInteger做加减的操作。
    3.调用层

    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
           ...
    
            /* determine if we're allowed to execute */
            if (circuitBreaker.allowRequest()) {
                final TryableSemaphore executionSemaphore = getExecutionSemaphore();
                final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
                final Action0 singleSemaphoreRelease = new Action0() {
                    @Override
                    public void call() {
                        if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
    -- 释放信号量的操作
                            executionSemaphore.release();
                        }
                    }
                };
    
                final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                    @Override
                    public void call(Throwable t) {
                        eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                    }
                };
    -- 获取信号量
                if (executionSemaphore.tryAcquire()) {
                    try {
                        /* used to track userThreadExecutionTime */
                        executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                        return executeCommandAndObserve(_cmd)
                                .doOnError(markExceptionThrown)
                                .doOnTerminate(singleSemaphoreRelease)
                                .doOnUnsubscribe(singleSemaphoreRelease);
                    } catch (RuntimeException e) {
                        return Observable.error(e);
                    }
                } else {
                    return handleSemaphoreRejectionViaFallback();
                }
            } else {
                return handleShortCircuitViaFallback();
            }
        }
    

    三、总结

    在实际的工作中对信号量和线程池的选择还是要慎重,事实上对于一般场景的单请求单线程的业务,信号量是能够满足需求的。
    对于需要异步(command并行执行)的业务需要使用线程池的方式来实现。

    相关文章

      网友评论

          本文标题:Hystrix源码分析(三)-信号量

          本文链接:https://www.haomeiwen.com/subject/gfqifctx.html