序
本文主要研究一下spring cloud的RequestRateLimiterGatewayFilter
GatewayAutoConfiguration
@Configuration
@ConditionalOnProperty(name = "spring.cloud.gateway.enabled", matchIfMissing = true)
@EnableConfigurationProperties
@AutoConfigureBefore(HttpHandlerAutoConfiguration.class)
@AutoConfigureAfter({GatewayLoadBalancerClientAutoConfiguration.class, GatewayClassPathWarningAutoConfiguration.class})
@ConditionalOnClass(DispatcherHandler.class)
public class GatewayAutoConfiguration {
//......
@Bean(name = PrincipalNameKeyResolver.BEAN_NAME)
@ConditionalOnBean(RateLimiter.class)
public PrincipalNameKeyResolver principalNameKeyResolver() {
return new PrincipalNameKeyResolver();
}
@Bean
@ConditionalOnBean({RateLimiter.class, KeyResolver.class})
public RequestRateLimiterGatewayFilterFactory requestRateLimiterGatewayFilterFactory(RateLimiter rateLimiter, PrincipalNameKeyResolver resolver) {
return new RequestRateLimiterGatewayFilterFactory(rateLimiter, resolver);
}
//......
}
注意是要有RateLimiter及KeyResolver的bean才生效,默认KeyResolver为PrincipalNameKeyResolver,也是需要有RateLimiter才生效。
RequestRateLimiterGatewayFilterFactory
spring-cloud-gateway-core-2.0.0.RC2-sources.jar!/org/springframework/cloud/gateway/filter/factory/RequestRateLimiterGatewayFilterFactory.java
/**
* User Request Rate Limiter filter. See https://stripe.com/blog/rate-limiters and
*/
public class RequestRateLimiterGatewayFilterFactory extends AbstractGatewayFilterFactory<RequestRateLimiterGatewayFilterFactory.Config> {
public static final String KEY_RESOLVER_KEY = "keyResolver";
private final RateLimiter defaultRateLimiter;
private final KeyResolver defaultKeyResolver;
public RequestRateLimiterGatewayFilterFactory(RateLimiter defaultRateLimiter,
KeyResolver defaultKeyResolver) {
super(Config.class);
this.defaultRateLimiter = defaultRateLimiter;
this.defaultKeyResolver = defaultKeyResolver;
}
public KeyResolver getDefaultKeyResolver() {
return defaultKeyResolver;
}
public RateLimiter getDefaultRateLimiter() {
return defaultRateLimiter;
}
@SuppressWarnings("unchecked")
@Override
public GatewayFilter apply(Config config) {
KeyResolver resolver = (config.keyResolver == null) ? defaultKeyResolver : config.keyResolver;
RateLimiter<Object> limiter = (config.rateLimiter == null) ? defaultRateLimiter : config.rateLimiter;
return (exchange, chain) -> {
Route route = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
return resolver.resolve(exchange).flatMap(key ->
// TODO: if key is empty?
limiter.isAllowed(route.getId(), key).flatMap(response -> {
// TODO: set some headers for rate, tokens left
if (response.isAllowed()) {
return chain.filter(exchange);
}
exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
return exchange.getResponse().setComplete();
}));
};
}
public static class Config {
private KeyResolver keyResolver;
private RateLimiter rateLimiter;
public KeyResolver getKeyResolver() {
return keyResolver;
}
public Config setKeyResolver(KeyResolver keyResolver) {
this.keyResolver = keyResolver;
return this;
}
public RateLimiter getRateLimiter() {
return rateLimiter;
}
public Config setRateLimiter(RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
return this;
}
}
}
- 可以看到这个filterFactory依赖RateLimiter及KeyResolver
- 其中KeyResolver用于从request中提取限流的key
- 而RateLimiter则是相应的针对key的限流规则
实例
java配置
@Bean
public RateLimiter inMemoryRateLimiter(){
return new InMemoryRateLimiter();
}
@Bean(name = IpAddressKeyResolver.BEAN_NAME)
public KeyResolver ipAddressKeyResolver() {
return new IpAddressKeyResolver();
}
这里提供了InMemoryRateLimiter及IpAddressKeyResolver
文件配置
spring:
cloud:
gateway:
routes:
- id: rate-limit-demo
uri: http://www.baidu.com
predicates:
- Path=/rate/**
filters:
- name: RequestRateLimiter
args:
keyResolver: '#{@ipAddressKeyResolver}'
in-memory-rate-limiter:
replenish-rate: 10
burst-capacity: 20
- 这里首先指定了一个name为RequestRateLimiter的filter,然后其参数指定了keyResolver为name为ipAddressKeyResolver的bean
- args中的in-memory-rate-limiter就是InMemoryRateLimiter指定的配置属性,然后下面的两个值就是InMemoryRateLimiter.Config的属性
IpAddressKeyResolver
public class IpAddressKeyResolver implements KeyResolver {
public static final String BEAN_NAME = "ipAddressKeyResolver";
@Override
public Mono<String> resolve(ServerWebExchange exchange) {
return Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress());
}
}
InMemoryRateLimiter.Config
@Validated
public static class Config {
@Min(1)
private int replenishRate;
@Min(0)
private int burstCapacity = 0;
public int getReplenishRate() {
return replenishRate;
}
public InMemoryRateLimiter.Config setReplenishRate(int replenishRate) {
this.replenishRate = replenishRate;
return this;
}
public int getBurstCapacity() {
return burstCapacity;
}
public InMemoryRateLimiter.Config setBurstCapacity(int burstCapacity) {
this.burstCapacity = burstCapacity;
return this;
}
@Override
public String toString() {
return "Config{" +
"replenishRate=" + replenishRate +
", burstCapacity=" + burstCapacity +
'}';
}
}
InMemoryRateLimiter
public class InMemoryRateLimiter extends AbstractRateLimiter<InMemoryRateLimiter.Config> {
public static final String CONFIGURATION_PROPERTY_NAME = "in-memory-rate-limiter";
private InMemoryRateLimiter.Config defaultConfig;
private final Map<String, Bucket> ipBucketMap = new ConcurrentHashMap<>();
public InMemoryRateLimiter() {
super(InMemoryRateLimiter.Config.class, CONFIGURATION_PROPERTY_NAME, null);
}
public InMemoryRateLimiter(int defaultReplenishRate, int defaultBurstCapacity) {
super(Config.class, CONFIGURATION_PROPERTY_NAME, null);
this.defaultConfig = new InMemoryRateLimiter.Config()
.setReplenishRate(defaultReplenishRate)
.setBurstCapacity(defaultBurstCapacity);
}
@Override
public Mono<Response> isAllowed(String routeId, String id) {
InMemoryRateLimiter.Config routeConfig = getConfig().get(routeId);
if (routeConfig == null) {
if (defaultConfig == null) {
throw new IllegalArgumentException("No Configuration found for route " + routeId);
}
routeConfig = defaultConfig;
}
// How many requests per second do you want a user to be allowed to do?
int replenishRate = routeConfig.getReplenishRate();
// How much bursting do you want to allow?
int burstCapacity = routeConfig.getBurstCapacity();
Bucket bucket = ipBucketMap.computeIfAbsent(id, k -> {
Refill refill = Refill.greedy(replenishRate, Duration.ofSeconds(1));
Bandwidth limit = Bandwidth.classic(burstCapacity, refill);
return Bucket4j.builder().addLimit(limit).build();
});
// tryConsume returns false immediately if no tokens available with the bucket
ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);
if (probe.isConsumed()) {
// the limit is not exceeded
return Mono.just(new Response(true, probe.getRemainingTokens()));
} else {
// limit is exceeded
return Mono.just(new Response(false,-1));
}
}
}
继承了AbstractRateLimiter,重写isAllowed方法
限流返回实例
curl -i http://localhost:8080/rate/
HTTP/1.1 429 Too Many Requests
X-Response-Default-Foo: Default-Bar
content-length: 0
小结
RequestRateLimiterGatewayFilter需要有RateLimiter的bean才能生效。另外还需要有个KeyResolver的bean,默认KeyResolver为PrincipalNameKeyResolver。
网友评论