美文网首页分布式系列
Redis分布式锁 Spring Schedule实现任务调度

Redis分布式锁 Spring Schedule实现任务调度

作者: ChaLLengerZeng | 来源:发表于2018-10-16 23:00 被阅读369次

    一看到标题就知道,这一篇博客又是总结分布式工作环境中集群产生的问题,个人觉得分布式没有那么难以理解,可能也是自己见识比较浅,对我来说,分布式只是一种后端业务演进时的一种工作方式,而真正实现这种工作方式的是集群

    关于集群是什么以及如何搭建集群环境,可以参考之前我的博文,这一片博客将着重介绍Redis分布式锁,这是一个基于SpringBoot构建的高并发电商后端服务项目,并且其中框架包括的Spring Schedule框架搭建的定时任务模块去实现定时关闭未付款的订单

    项目地址: https://github.com/challengerzsz/Mall

    在项目演进的过程中集群化就意味着复杂,复杂就意味着出问题,那么这个时候今天分享的这个问题是多进程可能在统一进程都去进行关单任务,这是不必要的,后面进行详细分析,希望看到这篇博客的初学者们能够了解到这听起来高大上的概念其实不是那么困难的,有经验的同行们欢迎指正修改

    如何去实现一个关单服务

    试想一下,自己是否有在购物客户端(web/app)中发起订单之后,本该选择支付的或者是因为什么原因,没钱了也好或者不想买了也好,订单挂在那里,但是却没有取消掉,这个例子在一个场景商城后端的项目中如何去解决呢?

    关单的逻辑在我看来可以有下面实现方式

    • 开放关单接口给商家管理端,供商家进入管理端进行手动关单

      如果这样实现的话,那么如果商家比较忙忘记了,或者因为订单太多根本来不及处理,怎么办?眼看着商品一件一件减少却不见付款,并且其余想买这件商品的买家因为订单数量为0,不能进行购买,那么你这个购物平台还有什么商家甘愿入驻

    • 开放关闭订单接口给客户端,用户主动取消发起的订单

      如果选择这样的实现,可能用户会忘记或者故意卡单不让你售卖出去,占用库存却又不付款,卖家心急,真正想买的人也着急,挂起订单的买家还有种皇上不急太监急的感觉

    • 服务端定时关单

      定时关单的逻辑就需要用到Spring Schedule的定时任务框架,通过制定合理地策略帮助卖家删除那些被遗忘的却又被发起订单了的数据,顺便提一句,之前我也写过类似的定时任务,但是是在MySQL中进行编码的,也就是说定时任务的执行我交给了数据库去进行,我们知道I/O操作是比较耗时的操作,比如上一篇分析服务器宕机的问题,如果MySQL状态正忙,这个时候再来一大堆定时任务需要删除的订单,不用说想必已经知道结果

    现在的购物平台给我的感觉这三种逻辑是都存在的,肯定也存在别的实现方式,我在这里也就是举例一下,下面来说今天的主题

    Spring Schedule实现定时关单

    今天的主题其实是上述三种分析的第三种实现,使用Spring提供的框架实现定时任务,制定自己的逻辑,下面介绍Schedule这个框架的时候,大家可以大概了解一下cron表达式,并且可以百度搜索一些cron自动生成的网站去方便开发

    Spring Schedule给我的感觉优点有下面3点

    1. 基于注解来设置调度器。
    2. 非常方便实现简单的调度
    3. 对代码不具有入侵性,非常轻量级

    什么是cron表达式

    cron表达式是一个字符串,字符串以5或6个空格隔开,分为6或7个域,每一个域代表一个含义 ,cron表达式在这里不具体介绍,具体可参考下列几图

    举一个例子

    @Component
    public class Task {
        private Logger logger = LoggerFactory.getLogger(getClass());
        
        @Scheduled(cron = "0 */1 * * * ?")
        public void testSchedule() {
            logger.info("定时任务启动");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            logger.info("定时任务执行完成");
        }
    }
    

    很简单将定时任务类使用@Component声明为Spring容器的一个组件Bean,使用@Schedule并且指定cron参数,并且在执行这个方法的时候让处理定时任务的这条线程睡2s,来模拟处理逻辑的时间,这个定时方法将会在每分钟整的时候被调用,看看效果

    执行定时任务的日志

    可以看到线程1从8:20:00被调用,线程睡眠2s模拟真实处理逻辑后,8:20:02执行完成

    实现关单定时器 V 1.0

    V1.0的逻辑实现很简单,写一个定时方法,直接去处理关单

    @Scheduled(cron = "0 */1 * * * ?")
    public void closeOrderTaskV1() {
    
        logger.info("关闭订单定时任务启动");
        //这里是你自己的关单逻辑
        logger.info("关闭订单定时任务结束");
    }
    

    我的关单逻辑是这样的,定时任务触发之后,关闭掉发起订单2个小时,却未付款的订单,关单逻辑可以是可配置的,使用Spring配置类的注解,读取.yml或.xml中关单的需要关闭订单的超时时间,之后的对数据库操作在这里也不提

    大家可以想想这个V1.0版本会有什么问题,尤其是在集群环境下

    多进程同时启动定时关单

    集群环境下,Tomcat集群其实是多个进程,并且部署的项目逻辑代码完全一致,这个时候问题来了,大家也应该都想到了,多个进程都会在分钟的整数倍的时候执行这个定时任务,会有什么问题呢

    • 损耗性能

      这个逻辑其实交个其中一台应用服务器去做就好了,没有必要多进程同时委派一条线程去做同一件事,假如这个时候满足条件需要关闭的订单较多,I/O速度较慢,并且现在电商前端面临浏览高峰期, 但是却有的线程被调度去做了浪费时间的事情,那这个定时任务设计的是失败的,设计定时任务的目的给我的感觉是,使用简单的注解去执行一些不需要程序员考虑和实现的定时任务,是为了让开发和逻辑变得简单,并且开发过程中根本不会希望会因为引入一个框架而浪费性能

    • 重复还原库存

      关单的逻辑是这样,关闭掉应该关闭的订单之后,需要还原库存,那么在这个时候如果多进程执行了关单,那将意味着,多个定时任务中都会存在还原库存这一操作,忽略数据有效性,如果多条进程同事执行这个任务,那将导致还原很多条无效的库存,明明只是还原10个库存,5条进程同时执行的时候,将会至少还原10个库存

    • 数据的有效性

      试想一下,多条进程同时执行这个并发任务的话,如果是定时任务执行关单逻辑,并发的结果是一样的,都是关闭掉符合条件的订单,看似这个并发情况不会出错,但是如果SQL语句设计的不好,或者没有使用一些对数据有效性的加锁策略,都会产生影响

    • 脏读/写数据进行操作

      上面分析了,如果只是关闭订单这一步操作,可能多条进程都会因为脏读造成对同一个订单进行关单,这虽然是一个并发导致的问题,但是不至于结果出错,该被关闭的订单迟早要被关闭,只是可能重复执行关闭它的操作

      那么还原库存这一步操作呢,如果不进行必要的加锁策略,那将产生真正的并发问题

      上面已经提到,多条进程同时执行关单的时候,需要进行还原库存这一操作,那么如果不加锁,可能会出现其中一条进程正在还原库存,库存已被还原,但是在此同时,一条请求来了,想要获取商品的库存信息,那会产生什么结果,假如此时商品库存为0,定时关单执行完成后将还原10件库存,但是请求的读线程读取数据库执行SQL操作,那么将出现脏读,逻辑上应该是这个时候用户在客户端能够看见的是10件库存

      若是这个时候多进程同时执行还原库存的操作呢,可能多条进程读取到的都是未还原的库存,这样最好,还原之后结果是正确的,但是这样的假设必定造成错误,并发量大的情况下,肯定有一台应用服务器有点吃不消,进程的调度也不是那么迅速,那么这个时候很可能出现4个进程还原成功之后,剩余1条进程才开始执行并且这个时候,增加的库存数就是脏数据,那么如何去解决

    • 订单关闭后还原商品库存

      我的关单逻辑中包含一步操作,先去检查订单中的商品库存,若获取到的库存为null,则说明该商品已被删除无记录,若有数据再将订单中选购的商品数量还原到库存中

      for (Order order : orderList) {
        List<OrderItem> orderItemList = orderItemMapper.getByOrderNo(order.getOrderNo());
        for (OrderItem orderItem : orderItemList) {
        //InnoDB
        Integer stock = productMapper.selectStockByProductId(orderItem.getProductId());
        //考虑到已生成订单里的商品,被删除的情况
        if (stock == null) {
            continue;
        }
        Product product = new Product();
        product.setId(orderItem.getProductId());
        //还原库存
        product.setStock(stock + orderItem.getQuantity());
        productMapper.updateByPrimaryKeySelective(product);
          }
      
          orderMapper.closeOrderByOrderId(order.getId());
          logger.info("关闭订单orderNo {}", order.getId());
      }
      

      遍历需要关闭的订单列表中的商品信息,查询库存,库存为空不进行还原操作,库存若不为空,则将订单中选购的数量还原至库存

      • 行锁/表锁

        
            -- 悲观锁使用主键查找是行锁,若查找不是主键则是表锁
            SELECT
            stock
            FROM product
            WHERE id = #{id}
            FOR UPDATE
        

        这里也说明了,这条SQL语句采用悲观锁的策略,这样写的目的大家应该也能够体会的到,如果多条进程实现关单操作,并且还原库存,若不对数据进行加锁,可能展示的是无效数据,若select的字段为主键,则使用的是行锁的策略,如果不是主键,则对整张表进行加锁保证数据正确性,乐观锁悲观锁不是要研究的主题,继续向下看

    • 多进程采用这样的策略能保证安全吗

      如果说写一个增加访问量的场景,答案是不能的,仅仅对select语句进行for update启动事务进行行锁或者表锁这种形式的加锁方式,多进程情况下同样会造成脏读,我们的目的是获取库存并且在库存基础上进行还原操作,但是这两步是不具有原子性的,其实就和多线程情况下i++一样,同样会出现严重的并发问题,除非使用事务,将查询和更新两步操作实现原子性,实现方式不再赘述

    • 定时任务交由单进程进行处理

      这句话的意思,其实就是某一时刻集群中的应用服务器只有一台是在跑这个定时任务的,这样的话避免了多进程情况下的数据一致性问题,并且性能方面也不会浪费,如何实现这一点?

    Redis分布式锁

    我们通过实现一个Redis分布式锁,来控制某一时刻只会有一条进程进行定时任务,Redis分布式锁给我的感觉有点像重入锁的计数器,通过其中的标示来实现这个锁

    定时任务V 2.0

    @Scheduled(cron = "0 */1 * * * ?")
    public void closeOrderTaskV2 {
        logger.info("关闭订单定时任务启动");
        //redis分布式锁的上锁时间ms
        long lockTimeOut = mallProperties.getTask().getLockTimeOut();
        Boolean setIfAbsentResult =         redisUtil.setIfAbsent(Const.RedisLock.CLOSE_ORDER_TASK_LOCK,
        String.valueOf(System.currentTimeMillis() + lockTimeOut));
    
        if (setIfAbsentResult) {
            //若返回值为true则说明获取到了分布式锁,原先没有服务器占用锁
            closeOrder(Const.RedisLock.CLOSE_ORDER_TASK_LOCK);
        } else {
            logger.info("未获取到分布式锁");
        }
        logger.info("关闭订单定时任务结束");
    }
    
    private void closeOrder(String lockName) {
        
        logger.info("获取{}, ThreadName:{}", Const.RedisLock.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
        Integer hour = mallProperties.getTask().getHour();
        orderService.closeOrder(hour);
        //释放锁
        redisUtil.delete(Const.RedisLock.CLOSE_ORDER_TASK_LOCK);
        logger.info("释放{}, ThreadName", Const.RedisLock.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
    }
    
    • 如何实现Redis分布式锁

      实现Redis锁之前,我们来复习一下Redis的几个基本命令

      可以参考这篇资料 http://357029540.iteye.com/blog/2388965

      使用到的命令为

      • SETNX key value

        key 的值设为 value ,当且仅当 key 不存在。

        若给定的 key 已经存在,则 SETNX 不做任何动作。

        SETNX 是『SET if Not eXists』(如果不存在,则 SET)的简写。

      • GETSET key value

        将给定 key 的值设为 value ,并返回 key 的旧值(old value)。

        key 存在但不是字符串类型时,返回一个错误。

        上面这两种命令,是Redis原生支持在redis-cli上使用的原始命令,我操作Redis是使用的RedisTemplate,这个在之前的博客也做过介绍,在这里也不再多说

    • 定时任务V 2.0产生的问题

      V 2.0中我们使用自己的逻辑实现了Redis分布式锁,那么我们现在来说一下V2.0版本的定时任务进行说明

      • 首先通过配置类获取允许一条定时任务线程可持有锁的最大时间

      • 其次需要说明一下上面的SETNX命令,被RedisTemplate封装成了setIfAbsen方法,详细的内容可以参考源代码,这里我只是使用了RedisTemplate进行了封装以便使用自己写的工具类进行调用,并且这个方法的返回值和原始命令有一些不符

        setIfAbsent(key, value)方法,若key存在,则不进行任何操作并且返回值为false

        若key不存在,则将value保存,并且返回true

      • 我们采用我们在代码中声明的常量去作为Redis分布式锁的key,通过当前系统时间毫秒数+允许持有锁的时间毫秒数作为value进行保存

      • 接下来如果获取到了这个分布式锁,即预示着这条线程可以开始执行自己的关单任务了

      • 我实现的关单逻辑是,获取配置中的允许订单超时的最大时间,并且交由service层去处理,具体的处理逻辑上面也提到过

      • 执行完了关单逻辑之后,最重要的一步来到了,那就是释放这个锁,如何释放呢,其实直接将Redis中的锁的标志key删除即可

      • 若setIfAbsent(key, value)方法返回true,则说明,之前Redis中并不存在锁的key,这也就是说,集群中没有任何一台应用服务器获取了这把锁,并且就可以执行真正的关单逻辑

      也就是说setIfAbsent(key, value)方法其实是一个尝试获得锁的操作,在之后的逻辑中setIfAbsent(key, value)方法依然是这个作用

    • 问题发现 —"死锁"

      如果V 2.0版本的实现仅仅如此,那么可能部署到了线上集群环境中,可能会给人感觉正确,但是却存在很重要的关键隐患,并且这个隐患可能成为今后定时任务无法执行的原因

      为什么会发生死锁

      简单地来说,死锁是一种在多线程环境下,获得锁的线程因为想要获取到另一条线程持有的锁导致不能及时释放锁,再加上另一条线程也想要持有这条线程持有的锁,导致两条线程都不能释放自己的锁,这种情况就会出现线程的阻塞,并且最终造成这两条线程中的任务无法运行,形成死锁

      对于Redis分布式锁来说,V 2.0版依旧不能防止死锁,大家可以想想这样一种情况,如果正在执行定时任务的应用服务器,在执行释放锁语句之前,宕机了,这个时候大家可以想想一下发生了什么问题

      首先我们使用的setNx(key, value)方法,若Redis中不含有这个锁,也就是说当前情况没有其余线程(这里说的线程是不同进程中的线程,Tomcat通过调度线程去执行定时任务,集群中的Tomcat其实就是开启的不同进程)去获取到这个锁(即执行定时任务),此方法若没有别的进程获得锁,将会发现Redis中没有锁的key,则将key和当前调用该方法的系统时间加上超时时间作为value存入Redis

      如果发生了正在执行定时任务的应用服务器宕机,因为setNx操作如果成功set,其默认的数据超时时间为-1即永久有效,这个时候如果在释放锁之前宕机,也就意味着Redis中将永远存在这个锁的key,也就是说,下次定时任务即将调用时,所有集群中的Tomcat进程中的线程都会去使用setNx方法尝试获得这个锁,但是却无果,因为返回值都将是false

      我们设置setNx的目的为让其进行尝试获取锁,但是却因为一台服务器未及时释放锁就已经宕机,造成锁的标志一直都在,这样的话接下来的时间,所有的定时任务都将因为获取不了这个分布式锁而不会执行,这也就构成了另一种死锁现象,我们把Redis分布式锁的死锁情况就如此定义

    • 解决V 2.0定时任务死锁的方法

      • 通过在关闭Tomcat前执行删除Redis锁的操作
      /**
       * 关闭tomcat的时候删除锁避免分布式锁的死锁
       * kill命令会直接杀掉tomcat的进程不会执行此方法
       */
      @PreDestroy
      public void delLock() {
          redisUtil.delete(Const.RedisLock.CLOSE_ORDER_TASK_LOCK);
      }
      

      根据注释其实大家都能理解了,使用linux环境下的Tomcat的同学应该知道,bin下有启动使用的start.sh以及关闭时需要用到的shutdown.sh这两个脚本,并且在调用shutdown.sh这个脚本关闭应用服务器的时候,将会自动在销毁之前执行这个方法,这个方法的意图将会是从Redis中删除可能会导致的死锁的key

      • 并且在closeOrder方法中加入一条语句设置Redis锁的过期时间
      private void closeOrder(String lockName) {
      
          //设置初获取锁的时候有效期,避免永久有效 时间单位s
          redisUtil.expire(lockName, 5);
          logger.info("获取{}, ThreadName:{}", Const.RedisLock.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
          Integer hour = mallProperties.getTask().getHour();
          orderService.closeOrder(hour);
          //释放锁
          redisUtil.delete(Const.RedisLock.CLOSE_ORDER_TASK_LOCK);
          logger.info("释放{}, ThreadName", Const.RedisLock.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
      } 
      

      ​ 通过实现一个expire方法,在某条获得到Redis分布式锁的实现逻辑中,添加对Redis锁中的过期时间操作的方法,设置这个key-value在Redis中保存时间为5秒,若在执行完这条语句之后,服务器宕机,则其实不会产生死锁的情况(依赖于定时任务的开始时间)

    • 问题依旧存在

      如果应用服务器所在的物理服务器断电,或执行kill命令,那么将不会执行这个方法,若执行定时任务的进程已经持有锁,这个锁依旧会在Redis中永久保存下去,依旧会造成死锁

      如果进程在获得到锁的时候就宕机,并没有执行expire方法,那么同样会造成死锁产生不良后果

    定时任务V 3.0 搭配上述私有closeOrder方法使用

    
    @Scheduled(cron = "0 */1 * * * ?")
    public void closeOrderTask() {
    
        logger.info("关闭订单定时任务启动");
        //redis分布式锁的上锁时间ms
        long lockTimeOut = mallProperties.getTask().getLockTimeOut();
        Boolean setIfAbsentResult = redisUtil.setIfAbsent(Const.RedisLock.CLOSE_ORDER_TASK_LOCK,
                    String.valueOf(System.currentTimeMillis() + lockTimeOut));
        if (setIfAbsentResult) {
            //若返回值为true则说明获取到了分布式锁,原先没有服务器占用锁
            closeOrder(Const.RedisLock.CLOSE_ORDER_TASK_LOCK);
        } else {
            //未获取到锁 判断时间戳判断是否可以重置锁
            String lockValueStr = redisUtil.getRedisValue(Const.RedisLock.CLOSE_ORDER_TASK_LOCK);
    
            //若该当前时间超过该锁本该释放的时间,但是由于某些原因未被释放则重置该锁
            // 意外终止,还没来得及
            if (lockValueStr != null && System.currentTimeMillis() > Long.parseLong(lockValueStr)) {
    
                String getSetResult = redisUtil.getSetRedisValue(Const.RedisLock.CLOSE_ORDER_TASK_LOCK,
                            String.valueOf(System.currentTimeMillis() + lockTimeOut));
                //将获取到的值与之前的值进行比较若相同则说明原先应该释放的锁没有被释放这个时候可以重置
                //若不相同则说明在这个时间段内另一台tomcat集群已经使获取到了分布式锁这个时候只能是获取不到这个分布式锁
                if (getSetResult == null || (getSetResult != null && StringUtils.equals(getSetResult, lockValueStr))) {
                    closeOrder(Const.RedisLock.CLOSE_ORDER_TASK_LOCK);
                } else {
                    logger.info("未获取到分布式锁");
                }
            } else {
                // 此时若lockValueStr为null,就说明定时任务已经完成并清除了Redis的那个value。 
                // 若时间没到,说明定时任务正在执行。
                // 两种情况都不需要获取分布式锁,所以不进行操作。
                logger.info("未获取到分布式锁");
            }
        }
    
        logger.info("关闭订单定时任务结束");
    }
    
    private void closeOrder(String lockName) {
    
        //设置初获取锁的时候有效期,避免永久有效 时间单位s
        redisUtil.expire(lockName, 5);
        logger.info("获取{}, ThreadName:{}", Const.RedisLock.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
        Integer hour = mallProperties.getTask().getHour();
        orderService.closeOrder(hour);
        //释放锁
        redisUtil.delete(Const.RedisLock.CLOSE_ORDER_TASK_LOCK);
        logger.info("释放{}, ThreadName", Const.RedisLock.CLOSE_ORDER_TASK_LOCK, Thread.currentThread().getName());
    }   
    

    Value值发挥作用

    之前尝试锁的逻辑依旧使用setNx进行,只不过现在的未获取到锁的逻辑需要大改一下,而这种改动的实现,其实就是Redis分布式锁实现的核心方式

    • 首先,先通过普通的get方法获取到Redis中锁的value,通过判断这个value值进一步判断这个锁是否可以被重置

    • 若这个value不为空,且系统当前时间大于value值,则说明,这个锁本应该被释放,已经过了超时时间却未被释放

    • 接下来需要使用之前提到的getSet方法,先获取到最新的value值,再将当前系统时间+允许的最大超时时间set为value,接下来的这一条判断语句将会成为整个Redis锁的较难理解部分

      if (getSetResult == null || (getSetResult != null && StringUtils.equals(getSetResult, lockValueStr))) {
        closeOrder(Const.RedisLock.CLOSE_ORDER_TASK_LOCK);
      } else {
        logger.info("未获取到分布式锁");
      }
      

      通过getSetResult接收调用getSet方法的返回值,获取到最新的value值,若getSetResult为空,或getSetResult不为空且与之前获取到的Redis锁的value一致,则说明当前进程占有了锁,可以进行真正的关单逻辑

    • 什么情况下getSetResult会为空?

      • 我先把结论抛出来,细节下面再讲,当之前获取锁的进程调用了expire方法后便宕机,在锁失效之前进入了第一个if块,即lockValueStr不为空,且系统当前时间大于value值,则说明,这个锁应该被重置

      • 接下来到了这个if-else块,进行第二次校验,这一步判断是非常重要的,因为在这一步可能出现并发情况,若getSetResult值为空,则说明在if-else块之前expire的时间到了,key-value在Redis中被自动删除,这个时候我们可以正常进行我们的逻辑

      • 若getSetResult不为空,则说明最新get到的值不为空,并且和之前的lockValueStr相同的话,则说明,之前执行定时任务的进程挂了,并且没有执行expire方法,或者执行了之后还没有到过期的时间

      以上两种情况其实都是会造成死锁的原因,通过实现这种逻辑判断加上Redis分布式锁value值的类型设计,将会无瑕疵得实现Redis分布式锁

      其余的else语句被判断为未获得这一次的分布式锁大家应该都能理解了

    • getSetResult若与lockValueStr不等,能否进行关单操作?

      我们可以思考一下,若这两个值不相等,则说明什么问题,因为在这条语句之前,假设有新的进程已经获取到了Redis分布式锁,那么将会set最新的系统当前时间,这个值将会与之前的值不一致,则说明已经有了新的进程获取到了锁执行了定时任务,则其余进程这次执行的逻辑就可以放弃了

    综上这就是完整的Redis分布式锁实现的定时任务的任务调度模块,接下来还将介绍一个第三方框架帮助我们更好地实现这种逻辑

    Redisson的引入

    Redisson的github:https://github.com/redisson

    中文Wiki:https://github.com/redisson/redisson/wiki/%E7%9B%AE%E5%BD%95

    因为本项目使用了Redis集群,所以直接看到中文文档的集群模式的设置

    官方的集群设置文档

    集群模式除了适用于Redis集群环境,也适用于任何云计算服务商提供的集群模式,例如AWS ElastiCache集群版Azure Redis Cache阿里云(Aliyun)的云数据库Redis版

    程序化配置集群的用法:

    Config config = new Config();
    config.useClusterServers()
        .setScanInterval(2000) // 集群状态扫描间隔时间,单位是毫秒
        //可以用"rediss://"来启用SSL连接
        .addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001")
        .addNodeAddress("redis://127.0.0.1:7002");
    
    RedissonClient redisson = Redisson.create(config);
    

    下面来看一下集成它的一些必要配置

    @Component
    public class RedissonManager {
    
        private Logger logger = LoggerFactory.getLogger(getClass());
    
        private Config config = new Config();
    
        private Redisson redisson = null;
    
        /**
         * 构造器执行完了之后执行这个init方法
         */
        @PostConstruct
        private void init() {
    
            try {
                this.config.useClusterServers()
                        .setScanInterval(2000)
                        .addNodeAddress("redis://127.0.0.1:6379", "redis://127.0.0.1:6380", "redis://127.0.0.1:6381","redis://127.0.0.1:6382", "redis://127.0.0.1:6383");
                this.redisson = (Redisson) Redisson.create(config);
                logger.info("初始化Redisson成功");
            } catch (Exception e) {
                logger.error("Redisson 初始化失败", e);
            }
        }
    
        public Redisson getRedisson() {
            return redisson;
        }
    
    }
    

    有人可能觉的我addNodeAddress方法有点像硬编码,这个方法官方提供的参数是可变字符串参数,我们从配置类中加载出来其实是一个List但是不影响我们使用,这个方法其实就是添加Redis集群中的节点ip:port

    可以看到它的官方文档的注释"redis://127.0.0.1:6379"表示启用SSL连接,如果不指定这个协议前缀,我们可以使用http协议进行替换,这里有一个异常情况

    Redisson框架采用的address节点要用URI编码,如果单纯使用127.0.0.1:6379它会奇怪得抛出一个异常

    Caused by: java.net.URISyntaxException: Illegal character in scheme name at index 0: 127.0.0.1:6379
    

    这里建议使用redis://进行SSL连接,下面看一下log

    可以发现我们通过添加配置好的Redis集群节点即可,它会自动去识别主从库,并且识别每个Redis负责的扇区

    使用Redisson实现Redis分布式锁

    @Scheduled(cron = "0 */1 * * * ?")
    public void closeOrderTaskWithRedisson() {
    
        logger.info("关闭订单定时任务启动
        RLock lock = redissonManager.getRedisson().getLock(Const.RedisLock.CLOSE_ORDER_TASK_LOCK);
        boolean getLock = false;
        //尝试获取锁
        try {
            //是否获取到锁
            //如果不设置waitTime为0的话如果一个逻辑或者sql执行的非常快的情况下,就会造成另一个Tomcat进程也会获取到锁执行一遍schedule
            if (getLock = lock.tryLock(2, 5, TimeUnit.SECONDS)) {
                logger.info("Redisson 获取到分布式锁:{} ThreadName:{}", Const.RedisLock.CLOSE_ORDER_TASK_LOCK,
                            Thread.currentThread().getName());
                Integer hour = mallProperties.getTask().getHour();
                orderService.closeOrder(hour);
            } else {
                logger.info("Redisson 没有获取到分布式锁:{} ThreadName:{}", Const.RedisLock.CLOSE_ORDER_TASK_LOCK,
                            Thread.currentThread().getName());
            }
        } catch (InterruptedException e) {
            logger.error("Redisson 分布式锁获取异常");
        } finally {
            //未获取到锁的话就不需要释放锁,判断getLock
            if (!getLock) {
                return;
            }
            lock.unlock();
            logger.info("Redisson 释放分布式锁");
        }
        logger.info("关闭订单定时任务结束");
    }
    
    • 声明一个RLock实例,通过注入的RedissonManager获取Redisson实例,并且传入锁的key即锁名

    • 逻辑其实与之前原生代码实现一样,使用Redisson框架只是提供了更好的对实现Redis分布式锁的一种封装

    • 首先尝试获取锁,tryLock方法有三个参数

      boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
      

      这是源码中该方法的方法声明,第一个参数为尝试获得锁的等待时间,第二个参数为持有锁的时间,第三个参数为时间单位

      这个tryLock方法其实源码封装的逻辑和我们上述的相似,想要了解的可以去官方网站了解更多,我们设置其尝试获取锁的时间为2s,持有锁的最长时间为5s,如果返回值为true的话,其实该进程中的这条执行定时任务的线程就已经获取到了Redis分布式锁,可以执行关单逻辑了

    • 我们需要一个finally块

      这个finally块中通过对获取锁的返回布尔值进行判断,如果没有获取到这个锁,则直接结束这个方法,若获取到了这个锁,则最终释放锁

    • 会不会存在问题?

      大家可以想一下,如果服务器负担非常小的情况下,并且这个定时任务的逻辑十分简单,可能毫秒级的过程就完成了,但是我们的锁的尝试时间为2s,这也就是说,可能还是会有多条进程获得到这个分布式锁,也就是说这个定时任务在一次执行的过程中,可能还是会被调用多次

      boolean getLock = lock.tryLock(0, 5, TimeUnit.SECONDS);
      

      如果我们采用这种写法,尝试获取锁的等待时间为0s,则在执行代码的时候,能获取到这个锁就是获取到,没获取到就放弃去尝试获取锁的动作,这样的话完整的且安全的Redis分布式锁就构建好了,并且分布式工作环境下,集群之后的任务调度也就依赖于Redis分布式锁实现了

    希望这篇博文能够让大家有所收获,有不正确的地方还希望大家能够认证指正

    相关文章

      网友评论

        本文标题:Redis分布式锁 Spring Schedule实现任务调度

        本文链接:https://www.haomeiwen.com/subject/deikzftx.html