限流算法 - 计数器
计数器是一种比较简单的限流算法,用途比较广泛,在接口层面,很多地方使用这种方式限流。在一段时间内,进行计数,与阀值进行比较,到了时间临界点,将计数器清0。
- 优点
实现简单 - 问题
临界问题,
例子: 在12:01:00到12:01:58这段时间内没有用户请求,然后在12:01:59这一瞬时发出100个请求,OK,然后在12:02:00这一瞬时又发出了100个请求。 会导致短时间内接受了 200 次请求,可能会超出系统的承受能力。
/**
* 用一个 long型的count变量做计数器:
* 请求前计数器+1,如超过阈值并且与第一个请求的间隔还在1分钟内,则限流。
*/
public class CounterLimiter {
private static long timeStamp = System.currentTimeMillis();
// 限制为1s内限制在100个请求
private static long limitCount = 100;
// 时间间隔(毫秒)
private static long interval = 1000;
// 请求数
private static long reqCount = 0;
public static boolean grant() {
long now = System.currentTimeMillis();
if (now < timeStamp + interval) {
if (reqCount < limitCount) {
++reqCount;
return true;
} else {
return false;
}
} else {
timeStamp = System.currentTimeMillis();
reqCount = 0;
return false;
}
}
public static void main(String[] args) {
for (int i = 0; i < 500; i++) {
new Thread(new Runnable() {
@Override
public void run() {
if (grant()) {
System.out.println("执行业务逻辑");
} else {
System.out.println("限流");
}
}
}).start();
}
}
}
限流算法 - 滑动窗口
- 将时间划分为多个区间;
- 在每个区间内每有一次请求就将计数器加一维持一个时间窗口,占据多个区间;·每经过一个区间的时间,则抛弃最老的一个区间,并纳入最新的一个区间;
- 如果当前窗口内区间的请求计数总和超过了限制数量,则本窗口内所有的请求都被丢弃。
- 优点
滑动窗口计数器是通过将窗口再细分,并且按照时间"滑动",这种算法避免了固定窗口计数器带来的双倍突发请求 - 问题
时间区间的精度越高,算法所需的空间容量就越大。
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 滑动窗口。该窗口同样的key,都是单线程计算。
*
* 代码实现思路就是定义好分片数量,每个分片都有一个独立的计数器,所有的分片合计为一个数组。
* 当请求来时,按照分片规则,判断请求应该划分到哪个分片中去。要判断是否超过阈值,就将前N个统计值相加,对比定义的阈值即可。
*/
public class SlidingWindowLimiter {
/**
* 循环队列,就是装多个窗口用,该数量是windowSize的2倍
*/
private AtomicInteger[] timeSlices;
/**
* 队列的总长度
*/
private int timeSliceSize;
/**
* 每个时间片的时长,以毫秒为单位
*/
private int timeMillisPerSlice;
/**
* 共有多少个时间片(即窗口长度)
*/
private int windowSize;
/**
* 在一个完整窗口期内允许通过的最大阈值
*/
private int threshold;
/**
* 该滑窗的起始创建时间,也就是第一个数据
*/
private long beginTimestamp;
/**
* 最后一个数据的时间戳
*/
private long lastAddTimestamp;
public SlidingWindowLimiter(int timeMillisPerSlice, int windowSize, int threshold) {
this.timeMillisPerSlice = timeMillisPerSlice;
this.windowSize = windowSize;
this.threshold = threshold;
// 保证存储在至少两个window
this.timeSliceSize = windowSize * 2;
reset();
}
//通过修改每个时间片的时间,窗口数量,阈值,来进行测试。
public static void main(String[] args) {
//1秒一个时间片,窗口共5个
SlidingWindowLimiter window = new SlidingWindowLimiter(100, 4, 8);
for (int i = 0; i < 100; i++) {
// 生成范围内的随机数
Random random = new Random(); // 0,1,2,3,4
System.out.println(window.addCount(random.nextInt(5)));
window.print();
System.out.println("--------------------------");
try {
Thread.sleep(102);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 初始化队列,由于此初始化会申请一些内容空间,为了节省空间,延迟初始化
*/
private void reset() {
beginTimestamp = System.currentTimeMillis();
if (timeSlices != null) {
return;
}
//窗口个数
AtomicInteger[] localTimeSlices = new AtomicInteger[timeSliceSize];
for (int i = 0; i < timeSliceSize; i++) {
localTimeSlices[i] = new AtomicInteger(0);
}
timeSlices = localTimeSlices;
}
private void print() {
for (AtomicInteger integer : timeSlices) {
System.out.print(integer + "-");
}
}
/**
* 计算当前所在的时间片的位置
*/
private int locationIndex() {
long now = System.currentTimeMillis();
//如果当前的key已经超出一整个时间片了,那么就直接初始化就行了,不用去计算了
if (now - lastAddTimestamp > timeMillisPerSlice * windowSize) {
reset();
}
return (int) (((now - beginTimestamp) / timeMillisPerSlice) % timeSliceSize);
}
/**
* 增加count个数量
*/
public boolean addCount(int count) {
//当前自己所在的位置,是哪个小时间窗
int index = locationIndex();
System.out.println("index:" + index);
//然后清空自己前面windowSize到2*windowSize之间的数据格的数据
//譬如1秒分4个窗口,那么数组共计8个窗口
//当前index为5时,就清空6、7、8、1。然后把2、3、4、5的加起来就是该窗口内的总和
clearFromIndex(index);
int sum = 0;
// 在当前时间片里继续+1
sum += timeSlices[index].addAndGet(count);
//加上前面几个时间片
for (int i = 1; i < windowSize; i++) {
sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
}
System.out.println(sum + "---" + threshold);
lastAddTimestamp = System.currentTimeMillis();
return sum >= threshold;
}
private void clearFromIndex(int index) {
for (int i = 1; i <= windowSize; i++) {
int j = index + i;
if (j >= windowSize * 2) {
j -= windowSize * 2;
}
timeSlices[j].set(0);
}
}
}
限流算法 - 漏桶
- 将每个请求视作"水滴"放入"漏桶"进行存储;
- "漏桶"以固定速率向外"漏"出请求来执行如果"漏桶"空了则停止"漏水";
- 如果"漏桶"满了则多余的"水滴"会被直接丢弃。
- 优点
漏桶算法多使用队列实现,服务的请求会存到队列中,服务的提供方则按照固定的速率从队列中取出请求并执行,过多的请求则放在队列中排队或直接拒绝。 - 问题(缺点)
漏桶算法的缺陷也很明显,当短时间内有大量的突发请求时,即便此时服务器没有任何负载,每个请求也都得在队列中等待一段时间才能被响应。
/**
* 漏桶
*/
public class LeakyBucketLimiter {
// 时间刻度
private static long timeStamp = System.currentTimeMillis();
// 桶大小
private static int bucketSize = 10;
// 每ms流出的请求
private static int rate = 1;
// 当前的水量
private static long count = 0;
public static boolean grant() {
long now = System.currentTimeMillis();
// 计算出水的数量
long out = (now - timeStamp) * rate;
// 先执行漏水,计算剩余水量
count = Math.max(0, count - out);
timeStamp = now;
if ((count + 1) < bucketSize) {
// 先执行漏水,计算剩余水量
count++;
return true;
} else {
// 水满,拒绝加水
return false;
}
}
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
new Thread(new Runnable() {
@Override
public void run() {
if (grant()) {
System.out.println("执行业务逻辑");
} else {
System.out.println("限流");
}
}
}).start();
}
}
}
限流算法 - 令牌桶
- 令牌以固定速率生成。
- 生成的令牌放入令牌桶中存放,如果令牌桶满了则多余的令牌会直接丢弃,当请求到达时,会尝试从令牌桶中取令牌,取到了令牌的请求可以执行。
- 如果桶空了,那么尝试取令牌的请求会被直接丢弃。
- 优点
令牌桶算法既能够将所有的请求平均分布到时间区间内,又能接受服务器能够承受范围内的突发请求,因此是目前使用较为广泛的一种限流算法。 - 问题(缺点)
牺牲小部分流量
package com.study.current.limiting.allen;
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* 限流神器:Guava RateLimiter
* Guava RateLimiter基于令牌桶算法,我们只需要告诉RateLimiter系统限制的QPS是多少,
* 那么RateLimiter将以这个速度往桶里面放入令牌,然后请求的时候,通过tryAcquire()方法向RateLimiter获取许可(令牌)。
*/
public class GuavaRateLimiter {
public static ConcurrentHashMap<String, RateLimiter> resourceRateLimiter = new ConcurrentHashMap<>();
static {
createrResourceLimiter("order", 50);
}
private static void createrResourceLimiter(String resource, int qps) {
if (resourceRateLimiter.contains(resource)) {
resourceRateLimiter.get(resource).setRate(qps);
} else {
//每秒50个,2s后达到正常速率
RateLimiter rateLimiter = RateLimiter.create(qps, 1000L, TimeUnit.MILLISECONDS);
resourceRateLimiter.putIfAbsent(resource, rateLimiter);
}
}
public static void main(String[] args) {
for (int i = 0; i < 500; i++) {
new Thread(new Runnable() {
@Override
public void run() {
if (resourceRateLimiter.get("order").tryAcquire()) {
System.out.println("执行业务逻辑");
} else {
System.out.println("限流");
}
}
}).start();
}
}
}
基于Guava的限流算法
- 平滑突发限流(SmoothBursty)
// 平滑突发限流顾名思义,就是允许突发的流量进入,后面再慢慢的平稳限流。
// 创建了容量为5的桶,并且每秒新增5个令牌,即每200ms新增一个令牌
RateLimiter limiter = RateLimiter.create(5);
while (true) {
// 获取令牌(可以指定一次获取的个数),获取后可以执行后续的业务逻辑
System.out.println(limiter.acquire());
}
// while循环中执行的limiter.acquire(),当没有令牌时,此方法会阻塞。实际应用当中应当使用tryAcquire()方法,如果获取不到就直接执行拒绝服务。
RateLimiter limiter = RateLimiter.create(2);
System.out.println(limiter.acquire());
// 中途休眠的场景
Thread.sleep(1500L);
while (true) {
System.out.println(limiter.acquire());
}
// 当线程休眠时,会囤积令牌,以给后续的acquire()使用。但是上面的代码只能囤积1S的令牌(也就是2个),当睡眠时间超过1.5S时,执行结果还是相同的。
- 平滑预热限流(SmoothWarmingUp)
// permitsPerSecond表示每秒钟新增的令牌数,warmupPeriod表示从冷启动速率过渡到平均速率所需要的时间间隔
// RateLimiter.create(double permitsPerSecond, long warmupPeriod, TimeUnit unit);
RateLimiter limiter = RateLimiter.create(5, 1000, TimeUnit.MILLISECONDS);
for (int i = 1; i < 5; i++) {
System.out.println(limiter.acquire());
}
Thread.sleep(1000L);
for (int i = 1; i < 50; i++) {
System.out.println(limiter.acquire());
}
// 平滑预热限流的耗时是慢慢趋近平均值的
隔离策略 - 线程池
线程池用来控制实际工作的线程数量,通过线程复用的方式来减小内存开销。
线程池可同时工作的线程数量是一定的,超过该数量的线程需进入线程队列等待,直至有可用的工作线程来执行任务。
import java.util.concurrent.*;
/**
* 线程池
*/
public class ThreadPoolTest {
public static void main(String[] args) {
// 创建线程池,为了更好的明白运行流程,增加了一些额外的代码
// BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(2);
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
// BlockingQueue<Runnable> queue = new PriorityBlockingQueue<Runnable>();
// BlockingQueue<Runnable> queue = new SynchronousQueue<Runnable>();
// AbortPolicy/CallerRunsPolicy/DiscardOldestPolicy/DiscardPolicy
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 5, TimeUnit.SECONDS,
queue, new ThreadPoolExecutor.AbortPolicy());
// 向线程池里面扔任务
for (int i = 0; i < 10; i++) {
System.out.println("当前线程池大小[" + threadPool.getPoolSize() + "],当前队列大小[" + queue.size() + "]");
threadPool.execute(new MyThread("Thread" + i));
}
// 关闭线程池
threadPool.shutdown();
}
static class MyThread implements Runnable {
private String name;
public MyThread(String name) {
this.name = name;
}
@Override
public void run() {
// 做点事情
try {
Thread.sleep(1000);
System.out.println(name + " finished job!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
隔离策略 - 信号量
信号量Semaphore是一个并发工具类,用来控制可同时并发的线程数,其内部维护了一组虚拟许可,通过构造器指定许可的数量,每次线程执行操作时先通过acquire方法获得许可,执行完毕再通过release方法释放许可。
如果无可用许可,那么acquire方法将一直阻塞,直到其它线程释放许可。
-
线程池和信号量对比
-
实际工作的线程是谁创建的?
使用线程池,实际工作线程由线程池创建;使用Semaphore,实际工作的线程由你自己创建。 -
实际工作的线程是谁创建的?
·线程池自动,Semaphore手动。
-
import java.util.concurrent.Semaphore;
/**
* 信号量
*/
public class SemaphoreTest {
public static void main(String[] args) {
Runnable customer = new Runnable() {
final Semaphore availableWindow = new Semaphore(5, true);
int count = 1;
@Override
public void run() {
int time = (int) (Math.random() * 10 + 3);
int num = count++;
try {
availableWindow.acquire();
System.out.println("正在为第【" + num + "】个客户办理业务,需要时间:" + time + "s!");
Thread.sleep(time * 1000);
if (availableWindow.hasQueuedThreads()) {
System.out.println("第【" + num + "】个客户已办理完业务,有请下一位!");
} else {
System.out.println("第【" + num + "】个客户已办理完业务,没有客户了,休息中!");
}
availableWindow.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 1; i < 10; i++) {
new Thread(customer).start();
}
}
}
如果觉得有收获就点个赞吧,更多知识,请点击关注查看我的主页信息哦~
网友评论