美文网首页技术分享
redisson使用阻塞队列引发的异常解决

redisson使用阻塞队列引发的异常解决

作者: Java及SpringBoot | 来源:发表于2021-02-05 13:07 被阅读0次

背景

  • 升级redisson版本后解决了redisson的队列丢消息问题
<dependency>
   <groupId>org.redisson</groupId>
   <artifactId>redisson</artifactId>
   <version>3.15.0</version>
</dependency>
  • 随之又出现了其它异常问题,报错信息有如下几种:
  1. PingConnectionHandler类报异常,之前分析过这个类,这个异常报得特别多 一晚上将近10w条
    
image-20210204140409138.png
  1. RedisExecutor类异常,堆栈如下
        at org.redisson.command.RedisExecutor$2.run(RedisExecutor.java:205) ~[redisson-3.15.0.jar!/:3.15.0]
        at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:672) ~[netty-common-4.1.45.Final.jar!/:4.1.45.Final]
        at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:747) ~[netty-common-4.1.45.Final.jar!/:4.1.45.Final]
        at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:472) ~[netty-common-4.1.45.Final.jar!/:4.1.45.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.45.Final.jar!/:4.1.45.Final]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]

异常信息如下:

image-20210204140659888.png
  1. Caused by: org.redisson.client.WriteRedisConnectionException: Channel has been closed!
    
    image-20210204141100695.png

分析结果

上面的报错信息redisson在最新的版本报错信息已经明确提示了"Avoid to use blocking commands in Async/JavaRx/Reactive handlers即不要用阻塞命令,否则会有问题。

经过分析之后,上面几种异常发生的原因都是因为发送PING命令引发的,原理就是代码使用了阻塞方法,由netty-threads执行的监听器或者带延迟时间的监听器导致了Redis的请求响应处理错误。

解决方案

  1. 不用redisson的阻塞相关方法,改用非阻塞方式
  2. 使用redisson的异步非阻塞方式实现代码,此种方式实现起来,代码更简洁,直观,并且节省了资源。下面是官方给的使用示例
  3. 禁用redisson ping即可解决使用阻塞方式引发的报错
官方示例地址:
https://github.com/redisson/redisson/wiki/3.-Operations-execution#31-async-way
image-20210204170547157.png

结合我们自身业务代码伪代码如下:

  • redisQueueExecutorPool是一个自定义的线程池
public <T extends AbstractRedisQueueJob, R> void doTask(RedisQueueJobListener<T, R> taskEventListener, RBlockingQueue<T> blockingFairQueue) {
    //使用非阻塞方式
        blockingFairQueue.pollAsync().whenCompleteAsync((res, throwable) -> {
            try {
                if (Objects.nonNull(res)) {
                    log.info(res);
                    RedisQueueJobQueueResult<R> result = taskEventListener.invoke(res);
                    //判断是否进行重试 true:结束循环  false:继续循环
                    if (taskEventListener.postProcessor(res, result.getResult()) != RedisQueueJobListener.Result.PROCESSING_COMPLETE) {
                        if (res.getIsDelayQueue()) {
                            RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
                            delayedQueue.offer(res, res.getDelay(), TimeUnit.SECONDS);
                        } else {
                            blockingFairQueue.add(res);
                        }
                    } else {
                        log.info("redis队列任务执行完成,任务对象 {} 结果对象 {}", res, result);
                    }
                }
                if (Objects.nonNull(throwable)) {
                    log.error(throwable.getMessage(), throwable);
                }
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            } finally {
                //在这里休眠一小会儿
                //注意这里不是递归,不会产生递归的方法栈溢出问题,这里是重新调用而已不会堆栈
                doTask(taskEventListener, blockingFairQueue);
            }
        }, redisQueueExecutorPool);
    }

关于解决方案我已经提交了issues给官方作者,最近就会有官方回复,读者可以先关注下我提交给官方的问题:https://github.com/redisson/redisson/issues/3405https://github.com/redisson/redisson/issues/3412

相关文章

网友评论

    本文标题:redisson使用阻塞队列引发的异常解决

    本文链接:https://www.haomeiwen.com/subject/hkebtltx.html