美文网首页
任务限流工具

任务限流工具

作者: 蓝笔头 | 来源:发表于2021-09-16 17:40 被阅读0次

    一段时间间隔类只允许执行某一个task一次

    限流工具类

    import com.google.common.annotations.VisibleForTesting;
    import lombok.RequiredArgsConstructor;
    import lombok.SneakyThrows;
    import org.redisson.api.RBucket;
    import org.redisson.api.RedissonClient;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.stereotype.Service;
    
    import java.net.InetAddress;
    import java.util.concurrent.TimeUnit;
    
    @Service
    @RequiredArgsConstructor
    public class TaskExecuteRateLimiter {
        public static final String KEY_PREFIX = "an:itask:";
    
        @Qualifier("redissonSingleClient")
        private final RedissonClient redissonClient;
    
        private boolean setIfNotExist(String taskKey, int expireSeconds) {
            String key = KEY_PREFIX + taskKey;
            RBucket<String> bucket = redissonClient.getBucket(key);
            return bucket.trySet(getDefaultValue(), expireSeconds, TimeUnit.SECONDS);
        }
    
        private void set(String taskKey, int expireSeconds) {
            String key = KEY_PREFIX + taskKey;
            RBucket<String> bucket = redissonClient.getBucket(key);
            bucket.set(getDefaultValue(), expireSeconds, TimeUnit.SECONDS);
        }
    
        @SneakyThrows
        private String getDefaultValue() {
            return InetAddress.getLocalHost().getHostName() + "-" + Thread.currentThread().getName();
        }
    
        private boolean exists(String taskKey) {
            String key = KEY_PREFIX + taskKey;
            RBucket<String> bucket = redissonClient.getBucket(key);
            return bucket.isExists();
        }
    
        public boolean cannotExecute(String taskKey) {
            return exists(taskKey);
        }
    
        public boolean execute(Runnable task, String taskKey, int expireSeconds, boolean forceExecute) {
            if (forceExecute) {
                forceExecuteTask(task, taskKey, expireSeconds);
                return true;
            }
    
            return executeTaskOnlyOnceInInterval(task, taskKey, expireSeconds);
        }
    
    
        /**
         * 在同一个时间间隔内,不能重复执行 task
         *
         * @param task  需要执行的任务
         * @param taskKey 任务的唯一 key
         * @param expireSeconds 间隔多少秒可以执行一次 task
         *
         * @return true: 成功执行,false:被限制执行(因为之前已经执行过了)
         */
        public boolean executeTaskOnlyOnceInInterval(Runnable task, String taskKey, int expireSeconds) {
            if (setIfNotExist(taskKey, expireSeconds)) {
                task.run();
                return true;
            }
            return false;
        }
    
        /**
         * 强制执行 task,并执行过期时间到 taskKey 中
         */
        @VisibleForTesting
        void forceExecuteTask(Runnable task, String taskKey, int expireSeconds) {
            set(taskKey, expireSeconds);
            task.run();
        }
    }
    

    单元测试

    import org.junit.Assert;
    import org.junit.Before;
    import org.junit.Test;
    import org.redisson.api.RedissonClient;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    
    public class TaskExecuteRateLimiterTest extends AbstractITTest {
        @Autowired
        private TaskExecuteRateLimiter taskExecuteRateLimiter;
        @Qualifier("redissonSingleClient")
        @Autowired
        private RedissonClient redissonClient;
    
        private int count = 0;
        private String taskKey = "taskKey";
    
        @Before
        public void setUp() {
            count = 0;
            redissonClient.getBucket(TaskExecuteRateLimiter.KEY_PREFIX + taskKey).delete();
        }
    
        @Test
        public void executeTaskOnlyOnceInInterval() {
            Assert.assertEquals(0, count);
    
            int innerCount = 0;
            for (int i = 0; i < 10; ++i) {
                taskExecuteRateLimiter.executeTaskOnlyOnceInInterval(() -> count++, taskKey, 10);
                innerCount++;
            }
            Assert.assertEquals(1, count);
            Assert.assertEquals(10, innerCount);
        }
    
        @Test
        public void forceExecuteTask() {
            Assert.assertEquals(0, count);
    
            taskExecuteRateLimiter.forceExecuteTask(() -> count++, taskKey, 10);
            taskExecuteRateLimiter.forceExecuteTask(() -> count++, taskKey, 10);
            Assert.assertEquals(2, count);
        }
    
        @Test
        public void execute() throws InterruptedException {
            Assert.assertEquals(0, count);
    
            // forceExecute=false 需要间隔 1s(expireSeconds=1) 才能执行
            taskExecuteRateLimiter.execute(() -> count++, taskKey, 1, false);
            taskExecuteRateLimiter.execute(() -> count++, taskKey, 1, false);
            Assert.assertEquals(1, count);
    
            // forceExecute=true 可以立即执行
            taskExecuteRateLimiter.execute(() -> count++, taskKey, 1, true);
            taskExecuteRateLimiter.execute(() -> count++, taskKey, 1, true);
            Assert.assertEquals(3, count);
    
            // forceExecute=false 需要间隔 1s(expireSeconds=1) 才能执行
            taskExecuteRateLimiter.execute(() -> count++, taskKey, 1, false);
            Assert.assertEquals(3, count);
    
            Thread.sleep(1200);
            taskExecuteRateLimiter.execute(() -> count++, taskKey, 1, false);
            Assert.assertEquals(4, count);
        }
    
        @Test
        public void cannotExecute() {
            Assert.assertFalse(taskExecuteRateLimiter.cannotExecute(taskKey));
    
            taskExecuteRateLimiter.execute(() -> count++, taskKey, 1, false);
            Assert.assertTrue(taskExecuteRateLimiter.cannotExecute(taskKey));
        }
    }
    

    相关文章

      网友评论

          本文标题:任务限流工具

          本文链接:https://www.haomeiwen.com/subject/okxdgltx.html