异常日志
Caused by: com.netflix.hystrix.exception.HystrixRuntimeException: EnterpriseWechatClient#getEnterpriseOperator(String) could not be queued for execution and no fallback available.
at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:822)
at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:807)
...
at com.netflix.hystrix.AbstractCommand$DeprecatedOnFallbackHookApplication$1.onError(AbstractCommand.java:1472)
at com.netflix.hystrix.AbstractCommand$FallbackHookApplication$1.onError(AbstractCommand.java:1397)
...
at com.netflix.hystrix.AbstractCommand$HystrixObservableTimeoutOperator$3.onError(AbstractCommand.java:1194)
...
at com.netflix.hystrix.HystrixCommand.queue(HystrixCommand.java:378)
at com.netflix.hystrix.HystrixCommand.execute(HystrixCommand.java:344)
at feign.hystrix.HystrixInvocationHandler.invoke(HystrixInvocationHandler.java:170)
... 115 more
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@29c53765 rejected from java.util.concurrent.ThreadPoolExecutor@3b866373[Running, pool size = 10, active threads = 10, queued tasks = 0, completed tasks = 6788739]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$ThreadPoolWorker.schedule(HystrixContextScheduler.java:172)
at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$HystrixContextSchedulerWorker.schedule(HystrixContextScheduler.java:106)
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:50)
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
... 170 more
异常复现
1)引入 Maven 依赖
<!-- https://mvnrepository.com/artifact/com.netflix.hystrix/hystrix-core -->
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>1.5.18</version>
</dependency>
2)例子:
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixThreadPoolProperties;
public class TestHystrix {
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
final int j = i;
System.out.println("submitting:" + j);
HystrixCommand command = new HystrixCommand(getHystrixCommandSetter()) {
@Override
protected Object run() throws Exception {
System.out.println("running: " + j);
Thread.sleep(Integer.MAX_VALUE);
return null;
}
};
command.observe().subscribe(item -> {
System.out.println("[onNext - " + j + " ] item : " + item);
});
}
Thread.sleep(3000);
}
private static HystrixCommand.Setter getHystrixCommandSetter() {
HystrixThreadPoolProperties.Setter threadPoolPropertiesSetter = HystrixThreadPoolProperties.Setter()
.withCoreSize(5);
return HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("test"))
.andThreadPoolPropertiesDefaults(threadPoolPropertiesSetter)
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(10000));
}
}
3)控制台输出:
submitting:0
submitting:1
submitting:2
submitting:3
submitting:4
submitting:5
running: 1
running: 3
running: 0
running: 2
running: 4
Exception in thread "main" rx.exceptions.OnErrorNotImplementedException: TestHystrix$1 could not be queued for execution and no fallback available.
...
at com.example.springdemo.TestHystrix.main(TestHystrix.java:22)
Caused by: com.netflix.hystrix.exception.HystrixRuntimeException: TestHystrix$1 could not be queued for execution and no fallback available.
at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:822)
at com.netflix.hystrix.AbstractCommand$22.call(AbstractCommand.java:807)
...
at com.netflix.hystrix.AbstractCommand$DeprecatedOnFallbackHookApplication$1.onError(AbstractCommand.java:1472)
at com.netflix.hystrix.AbstractCommand$FallbackHookApplication$1.onError(AbstractCommand.java:1397)
...
at com.netflix.hystrix.AbstractCommand$HystrixObservableTimeoutOperator$3.onError(AbstractCommand.java:1194)
...
at com.netflix.hystrix.AbstractCommand.observe(AbstractCommand.java:328)
at com.netflix.hystrix.HystrixCommand.observe(HystrixCommand.java:47)
... 1 more
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@16e7dcfd[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@222114ba[Wrapped task = null]] rejected from java.util.concurrent.ThreadPoolExecutor@3d121db3[Running, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0]
at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2057)
at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:827)
at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1357)
at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118)
at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$ThreadPoolWorker.schedule(HystrixContextScheduler.java:172)
at com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler$HystrixContextSchedulerWorker.schedule(HystrixContextScheduler.java:106)
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:45)
at rx.internal.operators.OperatorSubscribeOn.call(OperatorSubscribeOn.java:30)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
... 43 more
源码分析:
HystrixThreadPoolProperties
package com.netflix.hystrix;
// HystrixThreadPool 实例的属性
public abstract class HystrixThreadPoolProperties {
/* defaults */
static int default_coreSize = 10; // core size of thread pool
static int default_maximumSize = 10; // maximum size of thread pool
static int default_keepAliveTimeMinutes = 1; // minutes to keep a thread alive
static int default_maxQueueSize = -1; // size of queue (this can't be dynamically changed so we use 'queueSizeRejectionThreshold' to artificially limit and reject)
// -1 turns it off and makes us use SynchronousQueue
static boolean default_allow_maximum_size_to_diverge_from_core_size = false; //should the maximumSize config value get read and used in configuring the threadPool
//turning this on should be a conscious decision by the user, so we default it to false
static int default_queueSizeRejectionThreshold = 5; // number of items in queue
static int default_threadPoolRollingNumberStatisticalWindow = 10000; // milliseconds for rolling number
static int default_threadPoolRollingNumberStatisticalWindowBuckets = 10; // number of buckets in rolling number (10 1-second buckets)
protected HystrixThreadPoolProperties(HystrixThreadPoolKey key, Setter builder, String propertyPrefix) {
// 1. 配置 corePoolSize
this.corePoolSize = getProperty(propertyPrefix, key, "coreSize", builder.getCoreSize(), default_coreSize);
// 没有配置 maxQueueSize,则取 default_maximumSize,即为 -1
this.maxQueueSize = getProperty(propertyPrefix, key, "maxQueueSize", builder.getMaxQueueSize(), default_maxQueueSize);
}
}
HystrixConcurrencyStrategy
package com.netflix.hystrix.strategy.concurrency;
// Hystrix 并发策略
public abstract class HystrixConcurrencyStrategy {
public ThreadPoolExecutor getThreadPool(final HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties threadPoolProperties) {
// 获取配置的 coreSize
final int dynamicCoreSize = threadPoolProperties.coreSize().get();
// 获取配置的 maxQueueSize
final int maxQueueSize = threadPoolProperties.maxQueueSize().get();
// 根据 maxQueueSize 获取 BlockingQueue
final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);
// 线程池的 corePoolSize 和 maximumPoolSize 都设置为配置的 coreSize
return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES, workQueue, threadFactory);
}
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
if (maxQueueSize <= 0) {
// 如果 maxQueueSize <= 0 返回 SynchronousQueue
return new SynchronousQueue<Runnable>();
} else {
return new LinkedBlockingQueue<Runnable>(maxQueueSize);
}
}
}
ThreadPoolExecutor 的 execute() 方法
package java.util.concurrent;
public class ThreadPoolExecutor extends AbstractExecutorService {
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 分 3 步进行
int c = ctl.get();
// 1. 如果运行的线程数少于 corePoolSize,启动一个新线程执行 command
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 如果任务可以成功排队,则进行排队操作
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 如果我们不能排队任务,那么我们尝试添加一个新线程。
// 如果添加新线程失败,则抛出 RejectedExecutionException 异常
else if (!addWorker(command, false))
reject(command);
}
}
package com.netflix.hystrix;
/* package */abstract class AbstractCommand<R> implements HystrixInvokableInfo<R>, HystrixObservable<R> {
// 通过订阅 Observable 用于异步执行带有回调的命令。
public Observable<R> observe() {
// us a ReplaySubject to buffer the eagerly subscribed-to Observable
ReplaySubject<R> subject = ReplaySubject.create();
// eagerly kick off subscription
final Subscription sourceSubscription = toObservable().subscribe(subject);
// return the subject that can be subscribed to later while the execution has already started
return subject.doOnUnsubscribe(new Action0() {
@Override
public void call() {
sourceSubscription.unsubscribe();
}
});
}
// 将 AbstractCommand 转换为 Observable
public Observable<R> toObservable() {
final AbstractCommand<R> _cmd = this;
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd);
}
};
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
Observable<R> hystrixObservable =
Observable.defer(applyHystrixSemantics)
.map(wrapWithAllOnNextHooks);
return hystrixObservable
.doOnTerminate(terminateCommandCleanup) // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
.doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
.doOnCompleted(fireOnCompletedHook);
}
});
}
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
}
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
// 如果异常是 RejectedExecutionException,则调用 handleThreadPoolRejectionViaFallback() 方法
return handleThreadPoolRejectionViaFallback(e);
} else if (t instanceof HystrixTimeoutException) {
return handleTimeoutViaFallback();
} else if (t instanceof HystrixBadRequestException) {
return handleBadRequestByEmittingError(e);
} else {
/*
* Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
*/
if (e instanceof HystrixBadRequestException) {
eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
return Observable.error(e);
}
return handleFailureViaFallback(e);
}
}
};
return execution.doOnNext(markEmits)
.doOnCompleted(markOnCompleted)
.onErrorResumeNext(handleFallback) // 上游出现异常时通过 handleFallback 进行处理
.doOnEach(setRequestContext);
}
private Observable<R> handleThreadPoolRejectionViaFallback(Exception underlying) {
eventNotifier.markEvent(HystrixEventType.THREAD_POOL_REJECTED, commandKey);
threadPool.markThreadRejection();
// use a fallback instead (or throw exception if not implemented)
return getFallbackOrThrowException(this, HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, "could not be queued for execution", underlying);
}
}
问题出现原因
- 1)没有配置
maxQueueSize
导致使用的任务对应为SynchronousQueue
。-
SynchronousQueue
是一个没有数据缓冲的BlockingQueue
生产者线程对其的插入操作put()
必须等待消费者的移除操作take()
,反过来也一样。
-
- 2)并发量超过了
coreSize
配置的值(ThreadPoolExecutor
的corePoolSize
和maximumPoolSize
都设置为配置的coreSize
),从而导致在ThreadPoolExecutor.execute()
方法中执行addWorker(command, false)
时返回false
,从而抛出RejectedExecutionException
异常。 - 3)Hystrix 通过
handleThreadPoolRejectionViaFallback()
方法处理RejectedExecutionException
异常。
网友评论