美文网首页
HYSTRIX线程池

HYSTRIX线程池

作者: lxqfirst | 来源:发表于2018-02-08 14:28 被阅读0次

    线程池

    在Hystrix中Command默认运行在一个单独的线程池内,线程池的名称是根据设定的ThreadPoolKey定义的,如果没有设置那么会使用CommandGroupKey作为线程池。

    AbstractCommand初始化时会初始化线程池

        public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
            final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);
    
            final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();
            //Hystrix线程池模式下默认设置为10,可以通过配置进行修改
            final int dynamicCoreSize = threadPoolProperties.coreSize().get();
            //默认为1min
            final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();
            //Hystrix线程池模式下默认配置为-1,可以通过配置进行修改
            final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
            //当maxQueueSize<=0时,队列为SynchronousQueue,没有缓存区
            //当maxQueueSize>0时,队列为LinkedBlockingQueue,缓冲区大小即为maxQueueSize
            final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
    
            if (allowMaximumSizeToDivergeFromCoreSize) {
                final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();
                if (dynamicCoreSize > dynamicMaximumSize) {
                    logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " +
                            dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ".  Maximum size will be set to " +
                            dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value");
                    return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
                } else {
                    return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
                }
            } else {
                //创建线程池
                return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
            }
        }
    
        public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
            /*
             * We are using SynchronousQueue if maxQueueSize <= 0 (meaning a queue is not wanted).
             * <p>
             * SynchronousQueue will do a handoff from calling thread to worker thread and not allow queuing which is what we want.
             * <p>
             * Queuing results in added latency and would only occur when the thread-pool is full at which point there are latency issues
             * and rejecting is the preferred solution.
             */
            if (maxQueueSize <= 0) {
                return new SynchronousQueue<Runnable>();
            } else {
                return new LinkedBlockingQueue<Runnable>(maxQueueSize);
            }
        }
    

    以上是默认的ConcurrencyStrategy,Hystrix中可以通过Plugin配置自定义的Strategy:

    HystrixPlugins.getInstance().registerConcurrencyStrategy
    

    但这个Plugin是单例的且register方法只能调用一次,也就是无法设置多个Strategy,如果想要使用不同的Strategy只能在方法内部使用一定逻辑来完成。

    Semaphore

    还有一种不用ThreadPool的方法,是配置SEMAPHORE

    HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(10).withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)
    

    这种模式会在主线程自身运行(在调用queue时就会执行)。同时可以通过withExecutionIsolationSemaphoreMaxConcurrentRequests设置并发的数量。

    Fallback方法调用
    当配置的等待队列满了的时候Hystrix会直接调用Command的fallback方法。
    下面来看下各种情况下代码是执行在哪个线程中的

    ThreadPool模式
    1、超时调用getFallback:Timer线程
    2、线程池队列满调用getFallback:主线程
    3、Command出错调用getFallback:Command线程池

    Semaphore模式
    1、超时调用getFallback:Timer线程
    2、并发数满调用getFallback:主线程
    3、Command出错调用getFallback:主线程

    相关文章

      网友评论

          本文标题:HYSTRIX线程池

          本文链接:https://www.haomeiwen.com/subject/kgfttftx.html