在分布式系统中,缓存, 限流, 熔断, 消息队列解耦是处理并发的利器。
#一些原则:
客户端熔断,服务端限流。
1.限流
1.1 限流的基本概念
限流的常用处理手段有:计数器、滑动窗口、漏桶、令牌。
1.1.1 计数器
计数器是一种比较简单的限流算法,用途比较广泛,在接口层面,很多地方使用这种方式限流。
在一段时间内,进行计数,与阀值进行比较,到了时间临界点,将计数器清0。
这里需要注意的是,存在一个时间临界点的问题。
举个栗子,在12:01:00到12:01:58这段时间内没有用户请求,
然后在12:01:59这一瞬时发出100个请求,OK,然后在12:02:00这一瞬时又发出了100个请求。
这里你应该能感受到,在这个临界点可能会承受恶意用户的大量请求,甚至超出系统预期的承受。
基于计数器的流控方案.png
1.1.2 滑动窗口
由于计数器存在临界点缺陷,后来出现了滑动窗口算法来解决。
滑动窗口的意思是说把固定时间片,进行划分,并且随着时间的流逝,
进行移动,这样就巧妙的避开了计数器的临界点问题。
也就是说这些固定数量的可以移动的格子,将会进行计数判断阀值,
因此格子的数量影响着滑动窗口算法的精度。
基于滑动窗口的流控方案.png
1.1.3 漏桶
虽然滑动窗口有效避免了时间临界点的问题,但是依然有时间片的概念,
而漏桶算法在这方面比滑动窗口而言,更加先进。
有一个固定的桶,进水的速率是不确定的,但是出水的速率是恒定的,当水满的时候是会溢出的。
漏桶算法实现限流可以解决突刺现象,当请求进来时,相当于水倒入漏斗,然后从下端小口慢慢匀速的流出。
不管上面流量多大,下面流出的速度始终保持不变。如果桶满了,那么新进来的请求就丢弃。
#算法实现
可以准备一个队列,用来保存请求,另外通过一个线程池(ScheduledExecutorService)
来定期从队列中获取请求并执行,可以一次性获取多个并发执行。
#弊端
无法应对短时间的突发流量。
基于漏桶的流控方案.png
1.1.4 令牌桶
从某种意义上讲,令牌桶算法是对漏桶算法的一种改进, 漏桶算法能够限制请求调用的速率,
而令牌桶算法能够在限制调用的平均速率的同时还允许一定程度的突发调用。
#算法实现思路
令牌桶算法,存在一个桶,用来存放固定数量的令牌。
算法中存在一种机制,以一定的速率往桶中放令牌。
每次请求调用需要先获取令牌,只有拿到令牌,才有机会继续执行,
否则选择选择等待可用的令牌、或者直接拒绝。
#算法实现过程
放令牌这个动作是持续不断的进行,如果桶中令牌数达到上限,就丢弃令牌,
所以就存在这种情况,桶中一直有大量的可用令牌,这时进来的请求就可以直接拿到令牌执行,
比如设置qps为100,那么限流器初始化完成一秒后,桶中就已经有100个令牌了,
等启动完成对外提供服务时,该限流器可以抵挡瞬时的100个请求。
所以,只有桶中没有令牌时,请求才会进行等待,最后相当于以一定的速率执行。
#算法实现
可以准备一个队列,用来保存令牌,另外通过一个线程池定期生成令牌放到队列中,
每来一个请求,就从队列中获取一个令牌,并继续执行。
#开源实现
Google开源的guava包
基于令牌桶的流控方案.png
1.4 常见实现方案
1.4.1 基于 guava 的 RateLimiter 实现 (令牌桶算法)
Guava RateLimiter基于令牌桶算法,
我们只需要告诉RateLimiter系统限制的QPS是多少,那么RateLimiter将以这个速度往桶里面放入令牌,
然后请求的时候,通过tryAcquire()方法向RateLimiter获取许可(令牌)。
// 下述示例, 自行引入 guava 的依赖即可.
SpringBoot中基本结构.png
流控工具类--基于 guava 的 RateLimiter
package com.zy.eureka.limit;
import com.google.common.util.concurrent.RateLimiter;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
public class FlowControlUtils {
private FlowControlUtils() {
throw new RuntimeException("FlowControlUtils can not be Instantiated.");
}
private static ConcurrentMap<String, RateLimiter> map = new ConcurrentHashMap<>();
public static void createResourceLimiter(String resource, double qps) {
if (!map.containsKey(resource)) {
map.putIfAbsent(resource, RateLimiter.create(qps));
}
}
public static boolean tryAcquire(String resource, long timeout) {
return map.get(resource).tryAcquire(timeout, TimeUnit.MILLISECONDS);
}
}
FlowControlAnno注解
package com.zy.eureka.limit;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface FlowControlAnno {
/**
* 是否开启流控功能
* @return
*/
boolean enabled() default false;
/**
* 流控的 qps
* @return
*/
int qps() default 50;
/**
* 资源名称
* @return
*/
String resource() default "";
/**
* 超时时间
* @return
*/
long timeout() default 0L;
}
@FlowControlAnno注解解析
package com.zy.eureka.limit;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import java.util.Objects;
@Aspect
@Component
@Order(Ordered.LOWEST_PRECEDENCE - 1)
@Slf4j
public class FlowControlAspect {
@Around(value = "@annotation(flowControlAnno)")
public Object process(ProceedingJoinPoint point, FlowControlAnno flowControlAnno) throws Throwable {
FlowControlAnno flowControl = flowControlAnno != null ? flowControlAnno : point.getTarget().getClass().getAnnotation(FlowControlAnno.class);
if (Objects.isNull(flowControl)) {
log.warn("@FlowControlAnno is null.");
return point.proceed();
}
// 如果没有流控, 则直接放行
if (!flowControl.enabled()) {
return point.proceed();
}
// 如果需要进行流控, 则尝试获取资源
String resource = flowControl.resource();
FlowControlUtils.createResourceLimiter(resource, flowControl.qps());
if (FlowControlUtils.tryAcquire(resource, flowControl.timeout())) {
return point.proceed();
}
throw new RuntimeException("流控啦!");
}
}
业务中需被流控的方法/资源
package com.zy.eureka.limit;
import org.springframework.stereotype.Service;
@Service
public class MyServiceImpl {
@FlowControlAnno(enabled = true, qps = 10, timeout = 100L, resource = "hello")
public String hello(int i) {
return "...." + i;
}
}
单测
package com.zy.eureka.limit;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class MyServiceImplTest {
@Autowired
private MyServiceImpl myService;
@Test
public void fn01() {
for (int i = 0; i < 100; i++) {
int finalI = i;
new Thread(() -> System.out.println(myService.hello(finalI))).start();
}
}
}
1.4.2 基于计数器实现
下述工具类, 可参考 本文 1.4.1 步骤, 搞一个 AOP, 来简化开发过程
流控工具类--基于计数器
package com.zy.eureka.limit.counter;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* 基于计数器的流控处于方案
* 也可以搞个注解, 使用 AOP 来简化其开发过程
*/
public class FlowControlCounterUtils {
private static ConcurrentMap<String, Counter> map = new ConcurrentHashMap<>();
/**
* @param resource 资源, 可以是一个接口, 也可以是某个方法的调用情况
* @param permits 单位时间内, 1000 ms 内, 允许的最大请求数
* @return 是否允许请求, 即是否处于流控中
*/
public static synchronized boolean tryAcquire(String resource, long permits) {
if (Objects.isNull(map.get(resource))) {
map.putIfAbsent(resource, new Counter());
}
Counter counter = map.get(resource);
long now = System.currentTimeMillis();
long amount = counter.getCounter();
if (now - counter.getStartTime() > Counter.INTERNAL_TIME) {
counter.setStartTime(now);
counter.setCounter(0);
return false;
} else {
if (amount < permits) {
counter.setCounter(++amount);
return true;
}
return false;
}
}
@Data
@NoArgsConstructor
private static class Counter {
private static final long INTERNAL_TIME = 1000;
private long startTime = System.currentTimeMillis();
private long counter;
}
}
测试方法
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new Thread(() -> {
if (FlowControlCounterUtils.tryAcquire("order", 5)) {
System.out.println(Thread.currentThread().getName() + "success");
} else {
System.out.println(Thread.currentThread().getName() + "流控啦");
}
}, "thread#" + i).start();
}
}
2. 分布式限流方案
https://blog.51cto.com/sflyq/2405521 (示例)
#需要说明的是:
>> 上述的流控是单机版的流控, 即限制的是单台服务器单位时间内接收请求的数量.
>> 如果下层是DB, 则需要考虑如果部署了多台服务器集群时, 限流后流量是否会超过DB的耐受范围(DB不超过2000).
#分布式限流方案
即让系统的流量,先到队列中排队、限流,不要让流量直接打到系统上。
1.从负载均衡层(nginx + lua进行流控)
2.reids + lua.
比如需要限制某个用户访问/query接口的次数,只需要拼接用户id和接口名生成redis的key,
每次该用户访问此接口时,只需要对这个key执行incr命令,在这个key带上过期时间,就可以实现指定时间的访问频率。
2.1 应用场景
#场景1
留言功能限制,30秒 内只能评论 10次,超出次数不让能再评论,
并提示:过于频繁
#场景2
点赞功能限制,10秒 内只能点赞 10次,超出次数后不能再点赞,并禁止操作 1个小时,
并提示:过于频繁,被禁止操作1小时
#场景3
上传记录功能,限制一天只能上传 100次,超出次数不让能再上传,
并提示:超出今日上限
限制频率流程图.png
https://www.jianshu.com/p/7112a8d3d869 (dubbo限流)
https://www.jianshu.com/p/9edebaa446d3 (常用限流算法)
https://www.jianshu.com/p/88ff90519ab1 (常用限流算法)
https://blog.csdn.net/tianyaleixiaowu/article/details/74942405 (限流算法)
https://www.cnblogs.com/mushroom/archive/2015/07/21/4659200.html (WebApiThrottle 手册)
https://github.com/springside/springside4/wiki/Rate-Limiter (WebApiThrottle 限流)
https://en.wikipedia.org/wiki/Token_bucket (令牌同限流)
https://en.wikipedia.org/wiki/Leaky_bucket (漏桶限流)
https://www.jianshu.com/p/dab0da3db173 (流控)
网友评论