美文网首页
任务限流工具

任务限流工具

作者: 蓝笔头 | 来源:发表于2021-09-16 17:40 被阅读0次

一段时间间隔类只允许执行某一个task一次

限流工具类

import com.google.common.annotations.VisibleForTesting;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.redisson.api.RBucket;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

import java.net.InetAddress;
import java.util.concurrent.TimeUnit;

@Service
@RequiredArgsConstructor
public class TaskExecuteRateLimiter {
    public static final String KEY_PREFIX = "an:itask:";

    @Qualifier("redissonSingleClient")
    private final RedissonClient redissonClient;

    private boolean setIfNotExist(String taskKey, int expireSeconds) {
        String key = KEY_PREFIX + taskKey;
        RBucket<String> bucket = redissonClient.getBucket(key);
        return bucket.trySet(getDefaultValue(), expireSeconds, TimeUnit.SECONDS);
    }

    private void set(String taskKey, int expireSeconds) {
        String key = KEY_PREFIX + taskKey;
        RBucket<String> bucket = redissonClient.getBucket(key);
        bucket.set(getDefaultValue(), expireSeconds, TimeUnit.SECONDS);
    }

    @SneakyThrows
    private String getDefaultValue() {
        return InetAddress.getLocalHost().getHostName() + "-" + Thread.currentThread().getName();
    }

    private boolean exists(String taskKey) {
        String key = KEY_PREFIX + taskKey;
        RBucket<String> bucket = redissonClient.getBucket(key);
        return bucket.isExists();
    }

    public boolean cannotExecute(String taskKey) {
        return exists(taskKey);
    }

    public boolean execute(Runnable task, String taskKey, int expireSeconds, boolean forceExecute) {
        if (forceExecute) {
            forceExecuteTask(task, taskKey, expireSeconds);
            return true;
        }

        return executeTaskOnlyOnceInInterval(task, taskKey, expireSeconds);
    }


    /**
     * 在同一个时间间隔内,不能重复执行 task
     *
     * @param task  需要执行的任务
     * @param taskKey 任务的唯一 key
     * @param expireSeconds 间隔多少秒可以执行一次 task
     *
     * @return true: 成功执行,false:被限制执行(因为之前已经执行过了)
     */
    public boolean executeTaskOnlyOnceInInterval(Runnable task, String taskKey, int expireSeconds) {
        if (setIfNotExist(taskKey, expireSeconds)) {
            task.run();
            return true;
        }
        return false;
    }

    /**
     * 强制执行 task,并执行过期时间到 taskKey 中
     */
    @VisibleForTesting
    void forceExecuteTask(Runnable task, String taskKey, int expireSeconds) {
        set(taskKey, expireSeconds);
        task.run();
    }
}

单元测试

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;

public class TaskExecuteRateLimiterTest extends AbstractITTest {
    @Autowired
    private TaskExecuteRateLimiter taskExecuteRateLimiter;
    @Qualifier("redissonSingleClient")
    @Autowired
    private RedissonClient redissonClient;

    private int count = 0;
    private String taskKey = "taskKey";

    @Before
    public void setUp() {
        count = 0;
        redissonClient.getBucket(TaskExecuteRateLimiter.KEY_PREFIX + taskKey).delete();
    }

    @Test
    public void executeTaskOnlyOnceInInterval() {
        Assert.assertEquals(0, count);

        int innerCount = 0;
        for (int i = 0; i < 10; ++i) {
            taskExecuteRateLimiter.executeTaskOnlyOnceInInterval(() -> count++, taskKey, 10);
            innerCount++;
        }
        Assert.assertEquals(1, count);
        Assert.assertEquals(10, innerCount);
    }

    @Test
    public void forceExecuteTask() {
        Assert.assertEquals(0, count);

        taskExecuteRateLimiter.forceExecuteTask(() -> count++, taskKey, 10);
        taskExecuteRateLimiter.forceExecuteTask(() -> count++, taskKey, 10);
        Assert.assertEquals(2, count);
    }

    @Test
    public void execute() throws InterruptedException {
        Assert.assertEquals(0, count);

        // forceExecute=false 需要间隔 1s(expireSeconds=1) 才能执行
        taskExecuteRateLimiter.execute(() -> count++, taskKey, 1, false);
        taskExecuteRateLimiter.execute(() -> count++, taskKey, 1, false);
        Assert.assertEquals(1, count);

        // forceExecute=true 可以立即执行
        taskExecuteRateLimiter.execute(() -> count++, taskKey, 1, true);
        taskExecuteRateLimiter.execute(() -> count++, taskKey, 1, true);
        Assert.assertEquals(3, count);

        // forceExecute=false 需要间隔 1s(expireSeconds=1) 才能执行
        taskExecuteRateLimiter.execute(() -> count++, taskKey, 1, false);
        Assert.assertEquals(3, count);

        Thread.sleep(1200);
        taskExecuteRateLimiter.execute(() -> count++, taskKey, 1, false);
        Assert.assertEquals(4, count);
    }

    @Test
    public void cannotExecute() {
        Assert.assertFalse(taskExecuteRateLimiter.cannotExecute(taskKey));

        taskExecuteRateLimiter.execute(() -> count++, taskKey, 1, false);
        Assert.assertTrue(taskExecuteRateLimiter.cannotExecute(taskKey));
    }
}

相关文章

  • 任务限流工具

    一段时间间隔类只允许执行某一个task一次 限流工具类 单元测试

  • redis+lua进行限流

    lua脚本,确保原子性 限流注解 限流处理器 redis配置 ip工具类

  • 限流工具RateLimter

    guava限流工具RateLimiter RateLimiter和java并发工具Semaphore不同的是,Ra...

  • 2.安装部署

    源码编译 MySQL编译依赖 必备的包和工具 功能需要的包 功能定制 MySQL限流SQL限流并行复制Thread...

  • RateLimiter

    com.google.common.util.concurrent.RateLimiter 谷歌下的限流工具,采用...

  • Guava RateLimiter 学习

    Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法(Token Bucke...

  • Spring Cloud Gateway 结合配置中心限流

    前言 假设你领导给你安排了一个任务,具体需求如下: 针对具体的接口做限流 不同接口限流的力度可以不同 可以动态调整...

  • RateLimiter源码

    Google开源工具包Guava提供了限流工具类RateLimiter,该类基于令牌桶算法实现流量限制,使用十分方...

  • 谷歌Guava限流工具RateLimiter

    基于guava-29.0版本。 RateLimiter是一个基于令牌桶算法实现的限流器,常用于控制网站的QPS。与...

  • 服务流量限制

    性能测试 写个简单的web服务,再用工具进行压测。 使用wrk 工具进行压测: 常见限流手段 流量限制的手段有很多...

网友评论

      本文标题:任务限流工具

      本文链接:https://www.haomeiwen.com/subject/okxdgltx.html