流量控制也称为限流。
不具备限流功能的系统,是非常危险和脆弱的,很可能由于瞬间的压力激增,引起“雪崩效应”,导致系统的各个部分都同时崩溃,停止服务。就像没有保险丝的保护下,电压突然变高,导致所有的电器都会被损坏一样,“限流”就是系统的“保险丝”。
因此限流是保护高并发系统的三把利器之一(另外两个是缓存和降级)。限流在很多场景中用来限制并发和请求量,比如说秒杀抢购,保护自身系统和下游系统不被巨型流量冲垮等。
限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务或进行流量整形。
常用的限流方式和场景有:限制总并发数(如数据库连接池、线程池)、限制瞬时并发数(如 nginx 的 limitconn 模块,用来限制瞬时并发连接数,Java 的 Semaphore 也可以实现)、限制时间窗口内的平均速率(如 Guava 的 RateLimiter、nginx 的 limitreq 模块,限制每秒的平均速率);其他还有如限制远程接口调用速率、限制 MQ 的消费速率。另外还可以根据网络连接数、网络流量、CPU或内存负载等来限流。
比如说,我们需要限制方法被调用的并发数不能超过100(同一时间并发数),则我们可以用信号量 Semaphore 实现。可如果我们要限制方法在一段时间内平均被调用次数不超过100,则需要使用 RateLimiter。
一、常用的限流算法
常用的限流算法有两种:漏桶算法 和 令牌桶算法。
漏桶算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,漏桶算法能强行限制数据的传输速率。
漏桶算法令牌桶算法则是一个存放固定数量令牌的桶。每个请求都必须从桶中取出令牌:如果能够获取到令牌,则对持有令牌的请求进行处理,处理完成后把令牌归还;如果获取不到令牌,该请求就要被限流,要么直接丢弃,要么在缓冲区等待。
令牌桶算法令牌桶的好处是可以方便的改变速度。一旦需要提高速率,则按需提高放入桶中的令牌的速率。并且对于瞬时的大流量,只要没有超过令牌的数量,是可以并行处理,因此它适合于具有突发特性的流量。
二、限流工具类 RateLimiter
RateLimiter 是 Google 开源工具包 Guava 提供的限流工具类,该类基于令牌桶算法来完成限流,易于使用。
例如限制 QPS 为 5,可以使用如下方法:
public void testRateLimiter() {
// 创建一个稳定输出令牌的RateLimiter,保证平均每秒不超过{5}个请求
final RateLimiter r = RateLimiter.create(5);
while (true) {
// 获取一个令牌。如果没有令牌则一直等待,返回等待的时间,若有令牌则直接返回
double waitTime = r.acquire();
System.out.println("get a tokens: " + waitTime + "s");
}
}
/**
* 输出基本上都是0.2s执行一次,符合每秒发放5个令牌的设定。
* get a tokens: 0.0s
* get a tokens: 0.182014s
* get a tokens: 0.188464s
* get a tokens: 0.198072s
* get a tokens: 0.196048s
*/
可以看出 RateLimiter 是平均时间内的 QPS 不能超过指定值,1 秒内不会给超过 5 个令牌,并且以固定速率进行放置,达到平滑输出的效果。
若 1 秒内,有 5 个请求并发,第 1 个请求不会等待,其余 4 个都需要等前一个执行完 0.2s 后才能执行。例如,使用线程池,并发 5 个请求,虽然 QPS 是也 5,但这 5 个请求不是并行执行:
public void testRateLimiter() {
// 创建RateLimiter,平均每秒不超过{5}个请求
final RateLimiter r = RateLimiter.create(5);
// 创建线程池
ExecutorService executorService = Executors.newCachedThreadPool();
// 创建5个线程,同时去获取令牌
for (int i = 0; i < 5; i++) {
executorService.execute(() -> {
double waitTime = r.acquire();
System.out.println(Thread.currentThread().getName() + " get a tokens: " + waitTime + "s");
});
}
executorService.shutdown();
}
/**
* pool-1-thread-2 get a tokens: 0.0s
* pool-1-thread-5 get a tokens: 0.124674s
* pool-1-thread-4 get a tokens: 0.324648s
* pool-1-thread-3 get a tokens: 0.52463s
* pool-1-thread-1 get a tokens: 0.724603s
*/
从打印结果可以看出,第一个线程没有等待,第二个线程等待了 0.12s,第三个线程等待 0.32s,这 5 个线程全部执行完使用了 0.72s。可以看出 RateLimiter 是平均时间的 QPS,而不是瞬时值。
限流时,我们不需要一直等待被限制的请求,而是在超过指定 QPS 时,直接丢弃,这就可以使用 tryAcquire(long timeout, TimeUnit unit)
尝试获取令牌,并指定超时时间,该方法返回 是否获取到令牌,若没获取到令牌,则直接丢弃。
public void testRateLimiter() {
// 创建一个稳定输出令牌的RateLimiter,保证平均每秒不超过{5}个请求
final RateLimiter r = RateLimiter.create(5);
// 线程池
ExecutorService executorService = Executors.newCachedThreadPool();
// 创建 10 个线程,同时去获取令牌
for (int i = 0; i < 10; i++) {
executorService.execute(() -> {
// 尝试获取令牌,指定超时时间,若超时时间内获取到令牌返回true,否则返回false
boolean acquire = r.tryAcquire(1, TimeUnit.SECONDS);
if (acquire) {
System.out.println("获取许可证,执行业务逻辑。");
} else {
System.err.println("未获取到许可证,请求可以丢弃。");
}
});
}
executorService.shutdown();
}
/**
* 未获取到许可证,请求可以丢弃。
* 未获取到许可证,请求可以丢弃。
* 未获取到许可证,请求可以丢弃。
* 未获取到许可证,请求可以丢弃。
* 获取许可证,执行业务逻辑。
* 获取许可证,执行业务逻辑。
* 获取许可证,执行业务逻辑。
* 获取许可证,执行业务逻辑。
* 获取许可证,执行业务逻辑。
* 获取许可证,执行业务逻辑。
*/
三、Semaphore 信号量
RateLimiter 是限制平均时间内的 QPS,无法限制并发量。如果需要限制并发量则可以使用 Semaphore。
Semaphore 是计数信号量。Semaphore 管理一系列许可。acquire()
从 Semaphore 中获取令牌,若获取不到令牌,则阻塞当前线程,直到获取到令牌;每个 release()
方法释放一个令牌到 Semaphore 中。
比如:停车场入口的显示屏,每有一辆车进入停车场,显示屏的剩余车位减 1,每有一辆车离开停车场,显示屏上剩余车辆就会加 1;当剩余车位为 0 时,入口的栏杆就不会打开,车辆就无法进入停车场了,直到有一辆车离开停车场为止。
Semaphore 构造方法,包含 令牌数 与 是否使用公平锁:
// 创建具有给定的令牌数和非公平的公平设置的 Semaphore
public Semaphore(int permits);
// 创建具有给定的令牌数和给定的公平设置的 Semaphore
public Semaphore(int permits, boolean fair);
公平锁 与 非公平锁:
公平锁:加锁前先查看是否有排队等待的线程,有的话优先处理排在前面的线程,先来先得。
非公平所:线程加锁时直接尝试获取锁,不管是否有其他等待的线程,谁抢到锁,谁就先处理。
非公平锁性能高于公平锁性能。公平锁需要在多核情况下维护一个队列,如果当前线程不是队列的第一个无法获取锁,增加了线程切换次数。并且,非公平锁能更充分的利用 CPU 的时间片,减少 CPU 空闲的状态时间。
Semaphore 基本的功能可以使用 acquire()
和 release()
实现,其他方法也需要了解一下:
// 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞,或者线程已被中断。
acquire(int permits)
// 释放给定数目的许可,将其返回到信号量。这个是对应于上面的方法
release(int permits)
// 返回此信号量中当前可用的许可数
availablePermits()
// 根据指定的缩减量减小可用许可的数目。
reducePermits(int reduction)
// 如果在给定的等待时间内此信号量有可用的所有许可,并且当前线程未被中断,则从此信号量获取给定数目的许可。
tryAcquire(int permits, long timeout, TimeUnit unit)
// 从此信号量获取给定数目的许可,在提供这些许可前一直将线程阻塞。
acquireUninterruptibly(int permits)
例如下面代码模拟了,总请求数为 10,但每次只能有 3 个线程同时访问。
public void testSemaphore (String[] args) {
// 并发数为3的Semaphore
Semaphore semaphore = new Semaphore(3);
// 线程池
ExecutorService executorService = Executors.newCachedThreadPool();
// 创建 10 个线程,同时去获取令牌
for (int i = 0; i < 10; i++) {
executorService.execute(() -> {
try {
// 获取令牌
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " get permit");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放令牌
semaphore.release();
System.out.println(Thread.currentThread().getName() + " release");
}
});
}
executorService.shutdown();
}
/**
pool-1-thread-1 get permit
pool-1-thread-3 get permit
pool-1-thread-2 get permit
pool-1-thread-1 release
pool-1-thread-3 release
pool-1-thread-9 get permit
pool-1-thread-5 get permit
pool-1-thread-5 release
pool-1-thread-10 get permit
pool-1-thread-4 get permit
pool-1-thread-2 release
pool-1-thread-8 get permit
pool-1-thread-4 release
pool-1-thread-7 get permit
pool-1-thread-7 release
pool-1-thread-10 release
pool-1-thread-6 get permit
pool-1-thread-6 release
pool-1-thread-9 release
pool-1-thread-8 release
*/
可以看出,Semaphore 同时允许 3 个线程并发访问,当其中一个执行完毕,新的线程会立即补上。而 RateLimiter 则只允许每秒的平均请求为 3。
四、集群限流
上述介绍的都是单节点限流方法,但线上是集群系统,布署了多个节点。虽然对单个节点能做到将 QPS 限制在一定范围,但是多节点条件下,若单个节点均是 100/s,那么集群的总请求就是节点数 n x 100/s,限流效果失效。
实现集群限流的关键是,需要把限流实现为原子化,解决方案可以使用 Redis + Lua 来实现。因为Redis是单线程模型,能确保限流服务是线程安全的。
具体思路:
- 每次请求时将 Key 写入到 Redis 中;
- Key 超时时间设置为 1 秒;
- 每次请求时,Redis 将该 Key 的值进行自增;
- 当达到阈值时返回错误。
Lua 脚本如下:
local times = redis.call('incr', KEYS[1]) --设置key(KEY[1])并加1
if times == 1 then --第一次请求
redis.call('expire', KEYS[1], 1) --设置超时时间
end
if times > tonumber(ARGV[1]) then --请求次数超过规定次数
return 0 -- 限流
end
return 1
Java 中的调用逻辑:
public class DistributedLimit {
// 从resources中读取lua脚本
private final ResourceScriptSource resourceScriptSource = new ResourceScriptSource(new ClassPathResource("limit.lua"));
/**
* 获取许可
* @return 0-无许可;1-许可
*/
public Long acquire() throws IOException {
final String luaScript = resourceScriptSource.getScriptAsString();
final Jedis jedis = new Jedis("localhost", 6379);
// 若全局限流,key可设为固定值,若对ip维度限流,key可设置为ip
String key = "key";
String limit = "2"; //限流大小
return (Long)jedis.eval(luaScript, Arrays.asList(key), Arrays.asList(limit));
}
}
然后只需要在需要限流的地方调用该方法对返回值进行判断即可达到限流的目的。
当然这只是利用 Redis 做了一个简单的计数器,如果想实现类似于上文中的令牌桶算法可以基于 Lua 自行实现。
网友评论