单机限流 - 限流算法及隔离策略

作者: 右耳菌 | 来源:发表于2022-06-24 00:14 被阅读0次

限流算法 - 计数器

计数器是一种比较简单的限流算法,用途比较广泛,在接口层面,很多地方使用这种方式限流。在一段时间内,进行计数,与阀值进行比较,到了时间临界点,将计数器清0。

  • 优点
    实现简单
  • 问题
    临界问题,
    例子: 在12:01:00到12:01:58这段时间内没有用户请求,然后在12:01:59这一瞬时发出100个请求,OK,然后在12:02:00这一瞬时又发出了100个请求。 会导致短时间内接受了 200 次请求,可能会超出系统的承受能力。
/**
 * 用一个 long型的count变量做计数器:
 * 请求前计数器+1,如超过阈值并且与第一个请求的间隔还在1分钟内,则限流。
 */
public class CounterLimiter {
    private static long timeStamp = System.currentTimeMillis();
    // 限制为1s内限制在100个请求
    private static long limitCount = 100;
    // 时间间隔(毫秒)
    private static long interval = 1000;
    // 请求数
    private static long reqCount = 0;

    public static boolean grant() {
        long now = System.currentTimeMillis();
        if (now < timeStamp + interval) {
            if (reqCount < limitCount) {
                ++reqCount;
                return true;
            } else {
                return false;
            }
        } else {
            timeStamp = System.currentTimeMillis();
            reqCount = 0;
            return false;
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 500; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    if (grant()) {
                        System.out.println("执行业务逻辑");
                    } else {
                        System.out.println("限流");
                    }
                }
            }).start();
        }
    }

}

限流算法 - 滑动窗口

  • 将时间划分为多个区间;
  • 在每个区间内每有一次请求就将计数器加一维持一个时间窗口,占据多个区间;·每经过一个区间的时间,则抛弃最老的一个区间,并纳入最新的一个区间;
  • 如果当前窗口内区间的请求计数总和超过了限制数量,则本窗口内所有的请求都被丢弃。
  • 优点
    滑动窗口计数器是通过将窗口再细分,并且按照时间"滑动",这种算法避免了固定窗口计数器带来的双倍突发请求
  • 问题
    时间区间的精度越高,算法所需的空间容量就越大。

import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 滑动窗口。该窗口同样的key,都是单线程计算。
 *
 * 代码实现思路就是定义好分片数量,每个分片都有一个独立的计数器,所有的分片合计为一个数组。
 * 当请求来时,按照分片规则,判断请求应该划分到哪个分片中去。要判断是否超过阈值,就将前N个统计值相加,对比定义的阈值即可。
 */
public class SlidingWindowLimiter {
    /**
     * 循环队列,就是装多个窗口用,该数量是windowSize的2倍
     */
    private AtomicInteger[] timeSlices;
    /**
     * 队列的总长度
     */
    private int timeSliceSize;
    /**
     * 每个时间片的时长,以毫秒为单位
     */
    private int timeMillisPerSlice;
    /**
     * 共有多少个时间片(即窗口长度)
     */
    private int windowSize;
    /**
     * 在一个完整窗口期内允许通过的最大阈值
     */
    private int threshold;
    /**
     * 该滑窗的起始创建时间,也就是第一个数据
     */
    private long beginTimestamp;
    /**
     * 最后一个数据的时间戳
     */
    private long lastAddTimestamp;


    public SlidingWindowLimiter(int timeMillisPerSlice, int windowSize, int threshold) {
        this.timeMillisPerSlice = timeMillisPerSlice;
        this.windowSize = windowSize;
        this.threshold = threshold;
        // 保证存储在至少两个window
        this.timeSliceSize = windowSize * 2;

        reset();
    }

    //通过修改每个时间片的时间,窗口数量,阈值,来进行测试。
    public static void main(String[] args) {
        //1秒一个时间片,窗口共5个
        SlidingWindowLimiter window = new SlidingWindowLimiter(100, 4, 8);
        for (int i = 0; i < 100; i++) {
            // 生成范围内的随机数
            Random random = new Random(); // 0,1,2,3,4
            System.out.println(window.addCount(random.nextInt(5)));

            window.print();
            System.out.println("--------------------------");
            try {
                Thread.sleep(102);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 初始化队列,由于此初始化会申请一些内容空间,为了节省空间,延迟初始化
     */
    private void reset() {
        beginTimestamp = System.currentTimeMillis();

        if (timeSlices != null) {
            return;
        }
        //窗口个数
        AtomicInteger[] localTimeSlices = new AtomicInteger[timeSliceSize];
        for (int i = 0; i < timeSliceSize; i++) {
            localTimeSlices[i] = new AtomicInteger(0);
        }
        timeSlices = localTimeSlices;
    }

    private void print() {
        for (AtomicInteger integer : timeSlices) {
            System.out.print(integer + "-");
        }
    }

    /**
     * 计算当前所在的时间片的位置
     */
    private int locationIndex() {
        long now = System.currentTimeMillis();
        //如果当前的key已经超出一整个时间片了,那么就直接初始化就行了,不用去计算了
        if (now - lastAddTimestamp > timeMillisPerSlice * windowSize) {
            reset();
        }

        return (int) (((now - beginTimestamp) / timeMillisPerSlice) % timeSliceSize);
    }

    /**
     * 增加count个数量
     */
    public boolean addCount(int count) {
        //当前自己所在的位置,是哪个小时间窗
        int index = locationIndex();
        System.out.println("index:" + index);
        //然后清空自己前面windowSize到2*windowSize之间的数据格的数据
        //譬如1秒分4个窗口,那么数组共计8个窗口
        //当前index为5时,就清空6、7、8、1。然后把2、3、4、5的加起来就是该窗口内的总和
        clearFromIndex(index);

        int sum = 0;
        // 在当前时间片里继续+1
        sum += timeSlices[index].addAndGet(count);
        //加上前面几个时间片
        for (int i = 1; i < windowSize; i++) {
            sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
        }
        System.out.println(sum + "---" + threshold);

        lastAddTimestamp = System.currentTimeMillis();

        return sum >= threshold;
    }

    private void clearFromIndex(int index) {
        for (int i = 1; i <= windowSize; i++) {
            int j = index + i;
            if (j >= windowSize * 2) {
                j -= windowSize * 2;
            }
            timeSlices[j].set(0);
        }
    }
}

限流算法 - 漏桶

  • 将每个请求视作"水滴"放入"漏桶"进行存储;
  • "漏桶"以固定速率向外"漏"出请求来执行如果"漏桶"空了则停止"漏水";
  • 如果"漏桶"满了则多余的"水滴"会被直接丢弃。
  • 优点
    漏桶算法多使用队列实现,服务的请求会存到队列中,服务的提供方则按照固定的速率从队列中取出请求并执行,过多的请求则放在队列中排队或直接拒绝。
  • 问题(缺点)
    漏桶算法的缺陷也很明显,当短时间内有大量的突发请求时,即便此时服务器没有任何负载,每个请求也都得在队列中等待一段时间才能被响应。

/**
 * 漏桶
 */
public class LeakyBucketLimiter {

    // 时间刻度
    private static long timeStamp = System.currentTimeMillis();
    // 桶大小
    private static int bucketSize = 10;
    // 每ms流出的请求
    private static int rate = 1;
    // 当前的水量
    private static long count = 0;

    public static boolean grant() {
        long now = System.currentTimeMillis();
        // 计算出水的数量
        long out = (now - timeStamp) * rate;
        // 先执行漏水,计算剩余水量
        count = Math.max(0, count - out);
        timeStamp = now;
        if ((count + 1) < bucketSize) {
            // 先执行漏水,计算剩余水量
            count++;
            return true;
        } else {
            // 水满,拒绝加水
            return false;
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    if (grant()) {
                        System.out.println("执行业务逻辑");
                    } else {
                        System.out.println("限流");
                    }
                }
            }).start();
        }
    }
}

限流算法 - 令牌桶

  • 令牌以固定速率生成。
  • 生成的令牌放入令牌桶中存放,如果令牌桶满了则多余的令牌会直接丢弃,当请求到达时,会尝试从令牌桶中取令牌,取到了令牌的请求可以执行。
  • 如果桶空了,那么尝试取令牌的请求会被直接丢弃。
  • 优点
    令牌桶算法既能够将所有的请求平均分布到时间区间内,又能接受服务器能够承受范围内的突发请求,因此是目前使用较为广泛的一种限流算法。
  • 问题(缺点)
    牺牲小部分流量
package com.study.current.limiting.allen;

import com.google.common.util.concurrent.RateLimiter;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
 * 限流神器:Guava RateLimiter
 * Guava RateLimiter基于令牌桶算法,我们只需要告诉RateLimiter系统限制的QPS是多少,
 * 那么RateLimiter将以这个速度往桶里面放入令牌,然后请求的时候,通过tryAcquire()方法向RateLimiter获取许可(令牌)。
 */
public class GuavaRateLimiter {

    public static ConcurrentHashMap<String, RateLimiter> resourceRateLimiter = new ConcurrentHashMap<>();

    static {
        createrResourceLimiter("order", 50);
    }

    private static void createrResourceLimiter(String resource, int qps) {
        if (resourceRateLimiter.contains(resource)) {
            resourceRateLimiter.get(resource).setRate(qps);
        } else {
            //每秒50个,2s后达到正常速率
            RateLimiter rateLimiter = RateLimiter.create(qps, 1000L, TimeUnit.MILLISECONDS);
            resourceRateLimiter.putIfAbsent(resource, rateLimiter);
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 500; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    if (resourceRateLimiter.get("order").tryAcquire()) {
                        System.out.println("执行业务逻辑");
                    } else {
                        System.out.println("限流");
                    }
                }
            }).start();
        }
    }
}

基于Guava的限流算法

  • 平滑突发限流(SmoothBursty)
        // 平滑突发限流顾名思义,就是允许突发的流量进入,后面再慢慢的平稳限流。
        // 创建了容量为5的桶,并且每秒新增5个令牌,即每200ms新增一个令牌
        RateLimiter limiter = RateLimiter.create(5);
        while (true) {
            // 获取令牌(可以指定一次获取的个数),获取后可以执行后续的业务逻辑
            System.out.println(limiter.acquire());
        }
        // while循环中执行的limiter.acquire(),当没有令牌时,此方法会阻塞。实际应用当中应当使用tryAcquire()方法,如果获取不到就直接执行拒绝服务。
        RateLimiter limiter = RateLimiter.create(2);
        System.out.println(limiter.acquire());
        // 中途休眠的场景
        Thread.sleep(1500L);
        while (true) {
            System.out.println(limiter.acquire());
        }
        // 当线程休眠时,会囤积令牌,以给后续的acquire()使用。但是上面的代码只能囤积1S的令牌(也就是2个),当睡眠时间超过1.5S时,执行结果还是相同的。
  • 平滑预热限流(SmoothWarmingUp)
        // permitsPerSecond表示每秒钟新增的令牌数,warmupPeriod表示从冷启动速率过渡到平均速率所需要的时间间隔
        // RateLimiter.create(double permitsPerSecond, long warmupPeriod, TimeUnit unit);
        RateLimiter limiter = RateLimiter.create(5, 1000, TimeUnit.MILLISECONDS);
        for (int i = 1; i < 5; i++) {
            System.out.println(limiter.acquire());
        }
        Thread.sleep(1000L);
        for (int i = 1; i < 50; i++) {
            System.out.println(limiter.acquire());
        }
        // 平滑预热限流的耗时是慢慢趋近平均值的

隔离策略 - 线程池

线程池用来控制实际工作的线程数量,通过线程复用的方式来减小内存开销。
线程池可同时工作的线程数量是一定的,超过该数量的线程需进入线程队列等待,直至有可用的工作线程来执行任务。


import java.util.concurrent.*;

/**
 * 线程池
 */
public class ThreadPoolTest {

    public static void main(String[] args) {

        // 创建线程池,为了更好的明白运行流程,增加了一些额外的代码
//        BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
//        BlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>();
//        BlockingQueue<Runnable> queue = new SynchronousQueue<Runnable>();

        // AbortPolicy/CallerRunsPolicy/DiscardOldestPolicy/DiscardPolicy
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 5, TimeUnit.SECONDS,
                queue, new ThreadPoolExecutor.AbortPolicy());

        // 向线程池里面扔任务
        for (int i = 0; i < 10; i++) {
            System.out.println("当前线程池大小[" + threadPool.getPoolSize() + "],当前队列大小[" + queue.size() + "]");

            threadPool.execute(new MyThread("Thread" + i));
        }
        // 关闭线程池
        threadPool.shutdown();
    }

    static class MyThread implements Runnable {
        private String name;

        public MyThread(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            // 做点事情
            try {
                Thread.sleep(1000);

                System.out.println(name + " finished job!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

隔离策略 - 信号量

信号量Semaphore是一个并发工具类,用来控制可同时并发的线程数,其内部维护了一组虚拟许可,通过构造器指定许可的数量,每次线程执行操作时先通过acquire方法获得许可,执行完毕再通过release方法释放许可。
如果无可用许可,那么acquire方法将一直阻塞,直到其它线程释放许可。

  • 线程池和信号量对比

    • 实际工作的线程是谁创建的?
      使用线程池,实际工作线程由线程池创建;使用Semaphore,实际工作的线程由你自己创建。

    • 实际工作的线程是谁创建的?
      ·线程池自动,Semaphore手动。


import java.util.concurrent.Semaphore;

/**
 * 信号量
 */
public class SemaphoreTest {
    public static void main(String[] args) {
        Runnable customer = new Runnable() {
            final Semaphore availableWindow = new Semaphore(5, true);
            int count = 1;

            @Override
            public void run() {
                int time = (int) (Math.random() * 10 + 3);
                int num = count++;
                try {
                    availableWindow.acquire();
                    System.out.println("正在为第【" + num + "】个客户办理业务,需要时间:" + time + "s!");
                    Thread.sleep(time * 1000);
                    if (availableWindow.hasQueuedThreads()) {
                        System.out.println("第【" + num + "】个客户已办理完业务,有请下一位!");
                    } else {
                        System.out.println("第【" + num + "】个客户已办理完业务,没有客户了,休息中!");
                    }
                    availableWindow.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };

        for (int i = 1; i < 10; i++) {
            new Thread(customer).start();
        }
    }
}

如果觉得有收获就点个赞吧,更多知识,请点击关注查看我的主页信息哦~

相关文章

网友评论

    本文标题:单机限流 - 限流算法及隔离策略

    本文链接:https://www.haomeiwen.com/subject/unqpvrtx.html