美文网首页
Hystrix 异常分析之 could not be queue

Hystrix 异常分析之 could not be queue

作者: 蓝笔头 | 来源:发表于2021-07-13 14:15 被阅读0次

异常日志

Caused by: com.netflix.hystrix.exception.HystrixRuntimeException: EnterpriseWechatClient#getEnterpriseOperator(String) could not be queued for execution and no fallback available.
    at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:822)
    at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:807)
    ...
    at com.netflix.hystrix.AbstractCommand$DeprecatedOnFallbackHookApplication$1.onError(AbstractCommand.java:1472)
    at com.netflix.hystrix.AbstractCommand$FallbackHookApplication$1.onError(AbstractCommand.java:1397)
    ...
    at com.netflix.hystrix.AbstractCommand$HystrixObservableTimeoutOperator$3.onError(AbstractCommand.java:1194)
    ...
    at com.netflix.hystrix.HystrixCommand.queue(HystrixCommand.java:378)
    at com.netflix.hystrix.HystrixCommand.execute(HystrixCommand.java:344)
    at feign.hystrix.HystrixInvocationHandler.invoke(HystrixInvocationHandler.java:170)
    ... 115 more
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@29c53765 rejected from java.util.concurrent.ThreadPoolExecutor@3b866373[Running, pool size = 10, active threads = 10, queued tasks = 0, completed tasks = 6788739]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
    at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$ThreadPoolWorker.schedule(HystrixContextScheduler.java:172)
    at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$HystrixContextSchedulerWorker.schedule(HystrixContextScheduler.java:106)
    at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:50)
    at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:30)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    ... 170 more

异常复现

1)引入 Maven 依赖

        <!-- https://mvnrepository.com/artifact/com.netflix.hystrix/hystrix-core -->
        <dependency>
            <groupId>com.netflix.hystrix</groupId>
            <artifactId>hystrix-core</artifactId>
            <version>1.5.18</version>
        </dependency>

2)例子:

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixThreadPoolProperties;

public class TestHystrix {
    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            final int j = i;
            System.out.println("submitting:" + j);
            HystrixCommand command = new HystrixCommand(getHystrixCommandSetter()) {
                @Override
                protected Object run() throws Exception {
                    System.out.println("running: " + j);
                    Thread.sleep(Integer.MAX_VALUE);
                    return null;
                }
            };

            command.observe().subscribe(item -> {
                System.out.println("[onNext - " + j + " ] item : " + item);
            });
        }

        Thread.sleep(3000);
    }

    private static HystrixCommand.Setter getHystrixCommandSetter() {
        HystrixThreadPoolProperties.Setter threadPoolPropertiesSetter = HystrixThreadPoolProperties.Setter()
            .withCoreSize(5);

        return HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("test"))
            .andThreadPoolPropertiesDefaults(threadPoolPropertiesSetter)
            .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(10000));
    }
}

3)控制台输出:

submitting:0
submitting:1
submitting:2
submitting:3
submitting:4
submitting:5
running: 1
running: 3
running: 0
running: 2
running: 4
Exception in thread "main" rx.exceptions.OnErrorNotImplementedException: TestHystrix$1 could not be queued for execution and no fallback available.
    ...
    at com.example.springdemo.TestHystrix.main(TestHystrix.java:22)
Caused by: com.netflix.hystrix.exception.HystrixRuntimeException: TestHystrix$1 could not be queued for execution and no fallback available.
    at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:822)
    at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:807)
    ...
    at com.netflix.hystrix.AbstractCommand$DeprecatedOnFallbackHookApplication$1.onError(AbstractCommand.java:1472)
    at com.netflix.hystrix.AbstractCommand$FallbackHookApplication$1.onError(AbstractCommand.java:1397)
    ...
    at com.netflix.hystrix.AbstractCommand$HystrixObservableTimeoutOperator$3.onError(AbstractCommand.java:1194)
    ...
    at com.netflix.hystrix.AbstractCommand.observe(AbstractCommand.java:328)
    at com.netflix.hystrix.HystrixCommand.observe(HystrixCommand.java:47)
    ... 1 more
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@16e7dcfd[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@222114ba[Wrapped task = null]] rejected from java.util.concurrent.ThreadPoolExecutor@3d121db3[Running, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0]
    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2057)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:827)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1357)
    at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118)
    at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$ThreadPoolWorker.schedule(HystrixContextScheduler.java:172)
    at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$HystrixContextSchedulerWorker.schedule(HystrixContextScheduler.java:106)
    at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:45)
    at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:30)
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
    ... 43 more

源码分析:

HystrixThreadPoolProperties

package com.netflix.hystrix;

// HystrixThreadPool 实例的属性
public abstract class HystrixThreadPoolProperties {

    /* defaults */
    static int default_coreSize = 10;            // core size of thread pool
    static int default_maximumSize = 10;         // maximum size of thread pool
    static int default_keepAliveTimeMinutes = 1; // minutes to keep a thread alive
    static int default_maxQueueSize = -1;        // size of queue (this can't be dynamically changed so we use 'queueSizeRejectionThreshold' to artificially limit and reject)
                                                 // -1 turns it off and makes us use SynchronousQueue
    static boolean default_allow_maximum_size_to_diverge_from_core_size = false; //should the maximumSize config value get read and used in configuring the threadPool
                                                                                 //turning this on should be a conscious decision by the user, so we default it to false

    static int default_queueSizeRejectionThreshold = 5; // number of items in queue
    static int default_threadPoolRollingNumberStatisticalWindow = 10000; // milliseconds for rolling number
    static int default_threadPoolRollingNumberStatisticalWindowBuckets = 10; // number of buckets in rolling number (10 1-second buckets)


    protected HystrixThreadPoolProperties(HystrixThreadPoolKey key, Setter builder, String propertyPrefix) {
        // 1. 配置 corePoolSize
        this.corePoolSize = getProperty(propertyPrefix, key, "coreSize", builder.getCoreSize(), default_coreSize);
        
        // 没有配置 maxQueueSize,则取 default_maximumSize,即为 -1
        this.maxQueueSize = getProperty(propertyPrefix, key, "maxQueueSize", builder.getMaxQueueSize(), default_maxQueueSize);
    }

}

HystrixConcurrencyStrategy

package com.netflix.hystrix.strategy.concurrency;

// Hystrix 并发策略
public abstract class HystrixConcurrencyStrategy {
    public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
        // 获取配置的 coreSize
        final int dynamicCoreSize = threadPoolProperties.coreSize().get();
        // 获取配置的 maxQueueSize
        final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
        // 根据 maxQueueSize 获取 BlockingQueue
        final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);

        // 线程池的 corePoolSize 和 maximumPoolSize 都设置为配置的 coreSize
        return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
    }
    
    public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
        if (maxQueueSize <= 0) {
            // 如果 maxQueueSize <= 0 返回 SynchronousQueue
            return new SynchronousQueue<Runnable>();
        } else {
            return new LinkedBlockingQueue<Runnable>(maxQueueSize);
        }
    }
}

ThreadPoolExecutor 的 execute() 方法

package java.util.concurrent;

public class ThreadPoolExecutor extends AbstractExecutorService {    
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // 分 3 步进行
        int c = ctl.get();
        // 1. 如果运行的线程数少于 corePoolSize,启动一个新线程执行 command
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        
        // 2. 如果任务可以成功排队,则进行排队操作
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 3. 如果我们不能排队任务,那么我们尝试添加一个新线程。
        // 如果添加新线程失败,则抛出 RejectedExecutionException 异常
        else if (!addWorker(command, false))
            reject(command);
    }
}

package com.netflix.hystrix;

/* package */abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
    // 通过订阅 Observable 用于异步执行带有回调的命令。
    public Observable<R> observe() {
        // us a ReplaySubject to buffer the eagerly subscribed-to Observable
        ReplaySubject<R> subject = ReplaySubject.create();
        // eagerly kick off subscription
        final Subscription sourceSubscription = toObservable().subscribe(subject);
        // return the subject that can be subscribed to later while the execution has already started
        return subject.doOnUnsubscribe(new Action0() {
            @Override
            public void call() {
                sourceSubscription.unsubscribe();
            }
        });
    }
    
    // 将 AbstractCommand 转换为 Observable
    public Observable<R> toObservable() {
        final AbstractCommand<R> _cmd = this;

        final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                    return Observable.never();
                }
                return applyHystrixSemantics(_cmd);
            }
        };

        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics)
                                .map(wrapWithAllOnNextHooks);
                return hystrixObservable
                        .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                        .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                        .doOnCompleted(fireOnCompletedHook);
            }
        });
    }
    
    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
       return executeCommandAndObserve(_cmd)
                .doOnError(markExceptionThrown)
                .doOnTerminate(singleSemaphoreRelease)
                .doOnUnsubscribe(singleSemaphoreRelease);
    }
    
    private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();


        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
            @Override
            public Observable<R> call(Throwable t) {
                Exception e = getExceptionFromThrowable(t);
                executionResult = executionResult.setExecutionException(e);
                if (e instanceof RejectedExecutionException) {
                    // 如果异常是 RejectedExecutionException,则调用 handleThreadPoolRejectionViaFallback() 方法
                    return handleThreadPoolRejectionViaFallback(e);
                } else if (t instanceof HystrixTimeoutException) {
                    return handleTimeoutViaFallback();
                } else if (t instanceof HystrixBadRequestException) {
                    return handleBadRequestByEmittingError(e);
                } else {
                    /*
                     * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                     */
                    if (e instanceof HystrixBadRequestException) {
                        eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                        return Observable.error(e);
                    }

                    return handleFailureViaFallback(e);
                }
            }
        };

        return execution.doOnNext(markEmits)
                .doOnCompleted(markOnCompleted)
                .onErrorResumeNext(handleFallback) // 上游出现异常时通过 handleFallback 进行处理
                .doOnEach(setRequestContext);
    }

    private Observable<R> handleThreadPoolRejectionViaFallback(Exception underlying) {
        eventNotifier.markEvent(HystrixEventType.THREAD_POOL_REJECTED, commandKey);
        threadPool.markThreadRejection();
        // use a fallback instead (or throw exception if not implemented)
        return getFallbackOrThrowException(this, HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, "could not be queued for execution", underlying);
    }
}

问题出现原因

  • 1)没有配置 maxQueueSize 导致使用的任务对应为 SynchronousQueue
    • SynchronousQueue 是一个没有数据缓冲的 BlockingQueue 生产者线程对其的插入操作 put() 必须等待消费者的移除操作 take(),反过来也一样。
  • 2)并发量超过了 coreSize 配置的值(ThreadPoolExecutorcorePoolSizemaximumPoolSize 都设置为配置的 coreSize),从而导致在 ThreadPoolExecutor.execute() 方法中执行 addWorker(command, false) 时返回 false,从而抛出 RejectedExecutionException 异常。
  • 3)Hystrix 通过 handleThreadPoolRejectionViaFallback() 方法处理 RejectedExecutionException 异常。

参考

相关文章

网友评论

      本文标题:Hystrix 异常分析之 could not be queue

      本文链接:https://www.haomeiwen.com/subject/hhtspltx.html