美文网首页
Spring分布式任务高效流程

Spring分布式任务高效流程

作者: 十毛tenmao | 来源:发表于2021-06-21 23:59 被阅读0次

    项目中有很多定时任务,而且他们的执行模式非常类似,抽象整理如下

    定时任务特点

    • 比较多:项目中的定时任务很多,大概有20个左右
    • 任务保存在Redis中:为了减少对数据库的压力,定时任务大部分都是使用Redis的ZSET进行存储
    • 幂等:任务是幂等的
    • 集群分布式执行
    • 分布式锁: 虽然执行是幂等的,但是为了提高效率,还是使用分布式锁,保证一个任务只会被执行一次

    执行流程

    • 获取任务的批量数据
    • 尝试获取数据对应的锁,如果没有获取就结束
    • 再次在任务源中判断该数据是否待处理,如果是就处理,否则就跳过
    • 执行该数据的任务(需要幂等)
    • 从数据源中修改状态为已处理或删除
    • 释放数据锁

    代码示例

    @Slf4j
    @RequiredArgsConstructor
    @AllArgsConstructor
    public class RedisSortedSetTaskWorker {
        private final RedissonClient redissonClient;
        private final StringRedisTemplate stringRedisTemplate;
    
        private final String redisKey;
        private final Function<String, Boolean> task;
        private Predicate<String> directRemoveFilter;
        private boolean continueOnError = true;
    
        public void work(double beginScore, double endScore, @Nullable Integer limit) {
            //获取符合条件的所有子任务
            Set<String> subTasks = limit == null
                    ? stringRedisTemplate.opsForZSet().rangeByScore(redisKey, beginScore, endScore)
                    : stringRedisTemplate.opsForZSet().rangeByScore(redisKey, beginScore, endScore, 0, limit);
            log.info("[{}]: there are [{}] sub tasks to try to work", redisKey, subTasks == null ? 0 : subTasks.size());
            if (subTasks == null) {
                return;
            }
    
            //删除子任务
            if (directRemoveFilter != null) {
                Set<String> toRemove = subTasks.stream().filter(directRemoveFilter).collect(Collectors.toSet());
                Object[] directToRemove = toRemove.toArray();
                if (directToRemove.length > 0) {
                    stringRedisTemplate.opsForZSet().remove(redisKey, directToRemove);
                    subTasks.removeAll(toRemove);
                }
            }
    
            log.info("there are [{}] sub tasks to work after filter", subTasks.size());
            for (String subTask : subTasks) {
                try {
                    String lockKey = redisKey + ":lock:" + subTask;
                    trySubmit(lockKey, () -> {
                        //如果已经不存在了,就直接返回,说明可能被其他节点执行了
                        Double score = stringRedisTemplate.opsForZSet().score(redisKey, subTask);
                        if (score == null) {
                            return;
                        }
    
                        //如果执行成功就删除该子任务(subTask)
                        boolean success = task.apply(subTask);
                        if (success) {
                            stringRedisTemplate.opsForZSet().remove(redisKey, subTask);
                        }
                    });
                } catch (RuntimeException e) {
                    log.warn("fail to work one sub task: redisKey[{}], subTask[{}]", redisKey, subTask, e);
    
                    //遇到异常是否继续
                    if (continueOnError) {
                        continue;
                    }
                    throw e;
                }
            }
        }
    
        /**
         * 尝试分布式环境下互斥执行任务.
         *
         * @param lockName 分布式锁
         * @param work     任务
         * @return true锁成功,false锁失败
         */
        private boolean trySubmit(String lockName, Runnable work) {
            RLock lock = redissonClient.getLock(lockName);
            //获取分布式锁
            if (!lock.tryLock()) {
                log.info("fail to lock [{}]", lockName);
                return false;
            }
            log.info("lock [{}] successfully", lockName);
            try {
                work.run();
                return true;
            } catch (Exception e) {
                log.warn("fail to trySubmit", e);
                throw e;
            } finally {
                lock.unlock();
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:Spring分布式任务高效流程

          本文链接:https://www.haomeiwen.com/subject/ntxpyltx.html