美文网首页
Spring分布式任务高效流程

Spring分布式任务高效流程

作者: 十毛tenmao | 来源:发表于2021-06-21 23:59 被阅读0次

项目中有很多定时任务,而且他们的执行模式非常类似,抽象整理如下

定时任务特点

  • 比较多:项目中的定时任务很多,大概有20个左右
  • 任务保存在Redis中:为了减少对数据库的压力,定时任务大部分都是使用Redis的ZSET进行存储
  • 幂等:任务是幂等的
  • 集群分布式执行
  • 分布式锁: 虽然执行是幂等的,但是为了提高效率,还是使用分布式锁,保证一个任务只会被执行一次

执行流程

  • 获取任务的批量数据
  • 尝试获取数据对应的锁,如果没有获取就结束
  • 再次在任务源中判断该数据是否待处理,如果是就处理,否则就跳过
  • 执行该数据的任务(需要幂等)
  • 从数据源中修改状态为已处理或删除
  • 释放数据锁

代码示例

@Slf4j
@RequiredArgsConstructor
@AllArgsConstructor
public class RedisSortedSetTaskWorker {
    private final RedissonClient redissonClient;
    private final StringRedisTemplate stringRedisTemplate;

    private final String redisKey;
    private final Function<String, Boolean> task;
    private Predicate<String> directRemoveFilter;
    private boolean continueOnError = true;

    public void work(double beginScore, double endScore, @Nullable Integer limit) {
        //获取符合条件的所有子任务
        Set<String> subTasks = limit == null
                ? stringRedisTemplate.opsForZSet().rangeByScore(redisKey, beginScore, endScore)
                : stringRedisTemplate.opsForZSet().rangeByScore(redisKey, beginScore, endScore, 0, limit);
        log.info("[{}]: there are [{}] sub tasks to try to work", redisKey, subTasks == null ? 0 : subTasks.size());
        if (subTasks == null) {
            return;
        }

        //删除子任务
        if (directRemoveFilter != null) {
            Set<String> toRemove = subTasks.stream().filter(directRemoveFilter).collect(Collectors.toSet());
            Object[] directToRemove = toRemove.toArray();
            if (directToRemove.length > 0) {
                stringRedisTemplate.opsForZSet().remove(redisKey, directToRemove);
                subTasks.removeAll(toRemove);
            }
        }

        log.info("there are [{}] sub tasks to work after filter", subTasks.size());
        for (String subTask : subTasks) {
            try {
                String lockKey = redisKey + ":lock:" + subTask;
                trySubmit(lockKey, () -> {
                    //如果已经不存在了,就直接返回,说明可能被其他节点执行了
                    Double score = stringRedisTemplate.opsForZSet().score(redisKey, subTask);
                    if (score == null) {
                        return;
                    }

                    //如果执行成功就删除该子任务(subTask)
                    boolean success = task.apply(subTask);
                    if (success) {
                        stringRedisTemplate.opsForZSet().remove(redisKey, subTask);
                    }
                });
            } catch (RuntimeException e) {
                log.warn("fail to work one sub task: redisKey[{}], subTask[{}]", redisKey, subTask, e);

                //遇到异常是否继续
                if (continueOnError) {
                    continue;
                }
                throw e;
            }
        }
    }

    /**
     * 尝试分布式环境下互斥执行任务.
     *
     * @param lockName 分布式锁
     * @param work     任务
     * @return true锁成功,false锁失败
     */
    private boolean trySubmit(String lockName, Runnable work) {
        RLock lock = redissonClient.getLock(lockName);
        //获取分布式锁
        if (!lock.tryLock()) {
            log.info("fail to lock [{}]", lockName);
            return false;
        }
        log.info("lock [{}] successfully", lockName);
        try {
            work.run();
            return true;
        } catch (Exception e) {
            log.warn("fail to trySubmit", e);
            throw e;
        } finally {
            lock.unlock();
        }
    }
}

相关文章

网友评论

      本文标题:Spring分布式任务高效流程

      本文链接:https://www.haomeiwen.com/subject/ntxpyltx.html