美文网首页
2022-07-14

2022-07-14

作者: 音乐与咖啡Bean | 来源:发表于2022-07-13 15:15 被阅读0次

    Quartz redis 与 分布式 锁

    背景

    前两天研究的利用数据库锁实现Quartz分布式调度一文中提到几点问题,利用数据库行锁机制和唯一性约束,不仅无法解决单点问题,而且频繁访问数据库,造成db性能降低。那么最近就研究了一下redis缓存服务,通过redis的哨兵和复制功能(不知道这两个功能的,请自行百度)可以实现redis集群部署和redis分布式锁,并且数据是缓存在内存中的,所以性能要比数据库锁提高很多。

    思路

    1.既然是基于Redis的分布式锁,那么首先编写redis的相关配置,包括redisTemplate,cacheManager和redis分布式锁,我这里使用springboot集成redis缓存,网上有很多教程,有兴趣的可以自行研究一下。

    2.配置好redis后,我使用TriggerListener接口的特性在执行任务调度时,处自动触发实现了TriggerListener接口的类,在vetoJobExecution方法总完成获取锁的操作,在triggerComplete方法中完成释放锁的操纵。

    具体实现

    redis相关配置

    首先配置redsiTemplate,它用于直接操作redis缓存的值,过期时间等等

    @Configuration
    @EnableCaching
    public class RedisConfig {
    
        /**
         *  定义 StringRedisTemplate ,指定序列化和反序列化的处理类
         * @param factory
         * @return
         */
        @Bean
        public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
            StringRedisTemplate template = new StringRedisTemplate(factory);
            Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(
                    Object.class);
            ObjectMapper om = new ObjectMapper();
            om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
            om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            jackson2JsonRedisSerializer.setObjectMapper(om);
            //序列化 值时使用此序列化方法
            template.setValueSerializer(jackson2JsonRedisSerializer);
            template.afterPropertiesSet();
            return template;
        }
    
        @Bean
        public CacheManager cacheManager(RedisTemplate<String,String> redisTemplate) {
            RedisCacheManager rcm = new RedisCacheManager(redisTemplate);
            //使用前缀
            rcm.setUsePrefix(true);
            //缓存分割符 默认为 ":"
    //        rcm.setCachePrefix(new DefaultRedisCachePrefix(":"));
            //设置缓存过期时间
            //rcm.setDefaultExpiration(60);//秒
            return rcm;
        }
    }
    

    然后配置redis的分布式锁,由于redis的特性支持分布式锁,所以这里编写获取锁(lock)和释放锁(unlock)两个方法。

    /**
     * redis分布式锁
     * @author hww
     */
    @Component
    public class RedisDistributedLock {
    
        @Autowired
        StringRedisTemplate redisTemplate;
    
        /**
         * redis锁默认超时时间
         */
        private final long DEFAULT_TIMEOUT_LOCK = 60*1000;
    
        public boolean lock(String key) throws Exception{
            return lock(key, DEFAULT_TIMEOUT_LOCK);
        }
    
        /**
         * 获取锁,并设置超时时间
         * @param key
         * @param timeout
         * @return
         * @throws Exception 
         */
        public boolean lock(final String key,long timeout) throws Exception{
            if(StringUtils.isEmpty(key)){
                throw new Exception("key must be not null!");
            }
            //设置key,并返回操作结果
            Boolean execute = redisTemplate.execute(new RedisCallback<Boolean>() {
                @Override
                public Boolean doInRedis(RedisConnection connection)
                        throws DataAccessException {
                    //等同于redisTemplate.opsForValue().set(key, value),只不过此方法会返回一个操作结果
                    return connection.setNX(key.getBytes(), key.getBytes());
                }
            });
            //如果为true,设置超时时间
            if(execute){
                System.out.println("获取到锁,设置超时时间");
                Boolean expire = redisTemplate.expire(key, timeout, TimeUnit.SECONDS);
            }
            return execute;
        }
    
        /**
         * 释放锁
         * @param key
         * @throws Exception
         */
        public void unlock(String key) throws Exception{
            if(StringUtils.isEmpty(key)){
                throw new Exception("key must be not null!");
            }
            redisTemplate.delete(key);
            System.out.println("释放锁");
        }
    
    }
    

    编写TriggerListener实现类

    TriggerListener的javaDoc介绍:

    The interface to be implemented by classes that want to be informed when a Trigger fires. In general, applications that use a Scheduler will not have use for this mechanism

    意思是说实现了该接口的实现类将会在绑定的trigger被触发时通知其执行相关动作。

    public class RedisDistributedTriggerListener implements TriggerListener{
    
        RedisDistributedLock redisDistributedLock = SpringUtils.getBean(RedisDistributedLock.class);
    
        @Override
        public String getName() {
            return "RedisDistributedTriggerListener";
        }
    
        @Override
        public void triggerFired(Trigger trigger, JobExecutionContext context) {
        }
    
        /**
         * 当一个scheduler被调起时,将会唤起vetoJobExecution方法
         * 返回true,则取消任务执行,否则继续执行
         * @author hww
         */
        public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) {
            try {
                String lockKey = trigger.getJobKey().getGroup() + ":" + trigger.getJobKey().getName();
                //获取到锁,true代表获取到锁,返回false放行, false代表获取锁失败,返回true停止执行
                return !redisDistributedLock.lock(lockKey);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return false;
        }
    
        @Override
        public void triggerMisfired(Trigger trigger) {
        }
    
        /**
         * 在任务调度执行完成时,调用此方法
         */
        @Override
        public void triggerComplete(Trigger trigger, JobExecutionContext context,
                CompletedExecutionInstruction triggerInstructionCode) {
            String lockKey = trigger.getJobKey().getGroup() + ":" + trigger.getJobKey().getName();
            try {
                redisDistributedLock.unlock(lockKey);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }
    
    

    在这个类中有几点需要注意:

    1.getName方法的返回值需要自己定义,默认为null,否则会抛出异常。

    2.vetoJobExecution方法在任务调起时执行,当返回false时,继续执行任务,否则停止任务。

    3.在vetoJobExecution方法中获取到锁,则继续执行任务,没有获取到则停止。

    4.triggerComplete方法在在任务调度执行完成时,调用此方法来释放锁。

    TriggerListener绑定Scheduler

    SpringBoot集成Quartz,动态创建,更新,暂停,唤醒,删除任务调度一文中实现了动态创建Job。

    @SuppressWarnings("unchecked")
        public void addTimerJob(SchedulerJob job) {
            try {
                JobDetail jobDetail = JobBuilder
                        .newJob((Class<? extends Job>) Class.forName(job.getClassname()))
                        // 指定执行类
                        .withIdentity(job.getJobname(), job.getJobgroup())
                        // 指定name和group
                        .requestRecovery().withDescription(job.getDescription())
                        .build();
                // 创建表达式调度构建器
                CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder
                        .cronSchedule(job.getCronexpression());
                // 创建触发器
                CronTrigger cronTrigger = TriggerBuilder.newTrigger()
                        .withIdentity(job.getTriggername(), job.getTriggergroup())
                        .withSchedule(cronScheduleBuilder).build();
                //注册triggerListener
                scheduler.getListenerManager().addTriggerListener(new RedisDistributedTriggerListener());
                scheduler.scheduleJob(jobDetail, cronTrigger);
                scheduler.start();// 触发器并不会立刻触发
                System.out
                        .println("==================================创建Job成功!==================================");
            } catch (ClassNotFoundException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            } catch (SchedulerException e) {
                e.printStackTrace();
            }
        }
    

    总结

    至此就完成了实现Quartz分布式调度的所有配置,当执行任意的Job任务时,都会触发注册到Scheduler上的TriggerListener的实现类,并在vetoJobExecution方法中获取到锁,此时只能有一个线程获取该锁,并继续执行,并在执行完毕后,在triggerComplete方法中释放锁。

    相关文章

      网友评论

          本文标题:2022-07-14

          本文链接:https://www.haomeiwen.com/subject/cexfirtx.html