背景
- 升级redisson版本后解决了redisson的队列丢消息问题
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.15.0</version>
</dependency>
- 随之又出现了其它异常问题,报错信息有如下几种:
PingConnectionHandler类报异常,之前分析过这个类,这个异常报得特别多 一晚上将近10w条
![](https://img.haomeiwen.com/i4639175/97640c05ca87dd93.png)
- RedisExecutor类异常,堆栈如下
at org.redisson.command.RedisExecutor$2.run(RedisExecutor.java:205) ~[redisson-3.15.0.jar!/:3.15.0]
at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672) ~[netty-common-4.1.45.Final.jar!/:4.1.45.Final]
at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747) ~[netty-common-4.1.45.Final.jar!/:4.1.45.Final]
at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472) ~[netty-common-4.1.45.Final.jar!/:4.1.45.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.45.Final.jar!/:4.1.45.Final]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]
异常信息如下:
![](https://img.haomeiwen.com/i4639175/82a7ac4b9a8c5390.png)
-
Caused by: org.redisson.client.WriteRedisConnectionException: Channel has been closed!
image-20210204141100695.png
分析结果
上面的报错信息redisson在最新的版本报错信息已经明确提示了"Avoid to use blocking commands in Async/JavaRx/Reactive handlers
即不要用阻塞命令,否则会有问题。
经过分析之后,上面几种异常发生的原因都是因为发送PING命令引发的,原理就是代码使用了阻塞方法,由netty-threads执行的监听器或者带延迟时间的监听器导致了Redis的请求响应处理错误。
解决方案
- 不用redisson的阻塞相关方法,改用非阻塞方式
- 使用redisson的异步非阻塞方式实现代码,此种方式实现起来,代码更简洁,直观,并且节省了资源。下面是官方给的使用示例
- 禁用redisson ping即可解决使用阻塞方式引发的报错
官方示例地址:
https://github.com/redisson/redisson/wiki/3.-Operations-execution#31-async-way
![](https://img.haomeiwen.com/i4639175/36344c36f7b5021f.png)
结合我们自身业务代码伪代码如下:
- redisQueueExecutorPool是一个自定义的线程池
public <T extends AbstractRedisQueueJob, R> void doTask(RedisQueueJobListener<T, R> taskEventListener, RBlockingQueue<T> blockingFairQueue) {
//使用非阻塞方式
blockingFairQueue.pollAsync().whenCompleteAsync((res, throwable) -> {
try {
if (Objects.nonNull(res)) {
log.info(res);
RedisQueueJobQueueResult<R> result = taskEventListener.invoke(res);
//判断是否进行重试 true:结束循环 false:继续循环
if (taskEventListener.postProcessor(res, result.getResult()) != RedisQueueJobListener.Result.PROCESSING_COMPLETE) {
if (res.getIsDelayQueue()) {
RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
delayedQueue.offer(res, res.getDelay(), TimeUnit.SECONDS);
} else {
blockingFairQueue.add(res);
}
} else {
log.info("redis队列任务执行完成,任务对象 {} 结果对象 {}", res, result);
}
}
if (Objects.nonNull(throwable)) {
log.error(throwable.getMessage(), throwable);
}
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
//在这里休眠一小会儿
//注意这里不是递归,不会产生递归的方法栈溢出问题,这里是重新调用而已不会堆栈
doTask(taskEventListener, blockingFairQueue);
}
}, redisQueueExecutorPool);
}
关于解决方案我已经提交了issues给官方作者,最近就会有官方回复,读者可以先关注下我提交给官方的问题:https://github.com/redisson/redisson/issues/3405,https://github.com/redisson/redisson/issues/3412
网友评论