美文网首页
Hystrix源码分析(三)-信号量

Hystrix源码分析(三)-信号量

作者: leiwingqueen | 来源:发表于2019-06-15 19:27 被阅读0次

一、概要

hystrix的执行隔离策略有两种。一种是线程池的模式,另外一种是信号量的模式。hystrix默认的策略是线程池的模式。

  • 线程池
    对于每个command的执行会启用一个新的线程,同一个command key会使用同一个线程池,不同的command key会使用不同的线程池就进行隔离。
    优点:由于command的执行隔离在不同的线程中,因此某一个command线程执行异常不会影响到其它command的执行。如果希望使用到异步的方式执行,只能选择这种模式。
    缺点:由于多了线程的介入,相当于增加了线程间的上下文切换的成本。另外如果系统的command key很多,可能会影响到系统的整体性能。
  • 信号量
    个人理解跟信号量其实没有太大的关联性。本质上是跟主线程共用一个线程,只是这里增加了一个最大并发数的限制(信号量)。
    优点:不涉及线程上下文切换。在大部分业务场景下其实是能满足的(eg.tomcat单请求单线程的业务,command key之间没有异步执行的需求)
    缺点:某一个command执行过程中导致线程崩溃会影响到整个流程的执行(共享线程)。

二、源码分析

  1. 信号量的接口定义
/* package */static interface TryableSemaphore {

        /**
         * Use like this:
         * <p>
         * 
         * <pre>
         * if (s.tryAcquire()) {
         * try {
         * // do work that is protected by 's'
         * } finally {
         * s.release();
         * }
         * }
         * </pre>
         * 
         * @return boolean
         */
        public abstract boolean tryAcquire();

        /**
         * ONLY call release if tryAcquire returned true.
         * <p>
         * 
         * <pre>
         * if (s.tryAcquire()) {
         * try {
         * // do work that is protected by 's'
         * } finally {
         * s.release();
         * }
         * }
         * </pre>
         */
        public abstract void release();

        public abstract int getNumberOfPermitsUsed();

    }

接口的定义很简单,上层调用会先tryAcquire获得信号量,然后通过release方式释放信号量。

  1. 实现类
    实现类有两种。TryableSemaphoreActual和TryableSemaphoreNoOp。
  • TryableSemaphoreActual
    信号量的实现类。
  • TryableSemaphoreNoOp
    这个实现类相当于不限制,线程池模式的实现类采用的方式就是这种。
/* package */static class TryableSemaphoreActual implements TryableSemaphore {
        protected final HystrixProperty<Integer> numberOfPermits;
        private final AtomicInteger count = new AtomicInteger(0);

        public TryableSemaphoreActual(HystrixProperty<Integer> numberOfPermits) {
            this.numberOfPermits = numberOfPermits;
        }

        @Override
        public boolean tryAcquire() {
            int currentCount = count.incrementAndGet();
            if (currentCount > numberOfPermits.get()) {
                count.decrementAndGet();
                return false;
            } else {
                return true;
            }
        }

        @Override
        public void release() {
            count.decrementAndGet();
        }

        @Override
        public int getNumberOfPermitsUsed() {
            return count.get();
        }

    }

这个实现类就不多加解释了,主要是用了AtomicInteger做加减的操作。
3.调用层

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
       ...

        /* determine if we're allowed to execute */
        if (circuitBreaker.allowRequest()) {
            final TryableSemaphore executionSemaphore = getExecutionSemaphore();
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            final Action0 singleSemaphoreRelease = new Action0() {
                @Override
                public void call() {
                    if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
-- 释放信号量的操作
                        executionSemaphore.release();
                    }
                }
            };

            final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                @Override
                public void call(Throwable t) {
                    eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                }
            };
-- 获取信号量
            if (executionSemaphore.tryAcquire()) {
                try {
                    /* used to track userThreadExecutionTime */
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    return executeCommandAndObserve(_cmd)
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            } else {
                return handleSemaphoreRejectionViaFallback();
            }
        } else {
            return handleShortCircuitViaFallback();
        }
    }

三、总结

在实际的工作中对信号量和线程池的选择还是要慎重,事实上对于一般场景的单请求单线程的业务,信号量是能够满足需求的。
对于需要异步(command并行执行)的业务需要使用线程池的方式来实现。

相关文章

网友评论

      本文标题:Hystrix源码分析(三)-信号量

      本文链接:https://www.haomeiwen.com/subject/gfqifctx.html