1 背景
业务背景
在大数据量高并发访问时,经常会出现服务或接口面对暴涨的请求而不可用的情况,甚至引发连锁反映导致整个系统崩溃。此时需要使用的技术手段之一就是限流:当请求达到一定的并发数或速率,就进行等待、排队、降级、拒绝服务等。在限流时,常见的算法是计数器算法和令牌桶算法。
技术背景
SpringBoot2.X
JDK1.8
guava 23.6-jre
aop
算法简介
令牌桶算法
令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。 当桶满时,新添加的令牌被丢弃或拒绝。
计数器算法
计数器限流算法主要用来限制总并发数,比如数据库连接池大小、线程池大小、程序访问并发数等都是使用计数器算法。
2 技术实现
为达到复用、简便、代码零污染等目的,使用AOP+自定义注解技术进行实现。
创建一个SpringBoot Starter工程
具体步骤可参考使用STS创建Spring Boot 项目。然后将pom.xml文件清理成下面这个样子。
<groupId>cn.com.yd.commons</groupId>
<artifactId>currentlimiter</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<name>currentlimiter</name>
<description>基于 spring aop 限流器</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.0.2.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<!-- 项目单独需要的 jar -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>23.6-jre</version>
</dependency>
</dependencies>
自定义注解
令牌桶算法注解
import java.lang.annotation.*;
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
/**
*
* 令牌桶限流注解,默认流量1000
* @author 李庆海
*
*/
public @interface TbLimiter {
int value() default 1000;
}
计数器算法注解
import java.lang.annotation.*;
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
/**
*
* 限流注解,默认流量1000
* @author 李庆海
*
*/
public @interface ShLimiter {
int value() default 1000;
}
基于AOP技术实现的拦截器
令牌桶算法拦截器
import cn.com.yd.commons.currentlimiter.annotations.TbLimiter;
import com.google.common.util.concurrent.RateLimiter;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cglib.core.ReflectUtils;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 令牌桶算法限流拦截器
*
* @author 李庆海、张蕴
*/
@Aspect
@Component
public class TokenBucketLimiterInterceptor {
private final Map<String, RateLimiter> rateLimiters = new ConcurrentHashMap<String, RateLimiter>();
private final static Logger LOG = LoggerFactory.getLogger(TokenBucketLimiterInterceptor.class);
@Pointcut("@annotation(cn.com.yd.commons.currentlimiter.annotations.TbLimiter)")
public void aspect() {
}
@Around(value = "aspect()")
public void around(JoinPoint point) throws Throwable {
LOG.debug("进入限流器");
// 返回目标对象
Object target = point.getTarget();
String targetName = target.getClass().getName();
// 返回当前连接点签名
String methodName = point.getSignature().getName();
// 获得参数列表
Object[] arguments = point.getArgs();
Class<?> targetClass = Class.forName(targetName);
// 获取参数类型数组
Class<?>[] argTypes = ReflectUtils.getClasses(arguments);
// 获取目标method,考虑方法的重载等问题
Method method = targetClass.getDeclaredMethod(methodName, argTypes);
// 获取目标method上的限流注解@Limiter
TbLimiter limiter = method.getAnnotation(TbLimiter.class);
RateLimiter rateLimiter = null;
if (null != limiter) {
// 以 class + method + parameters为key,避免重载、重写带来的混乱
String key = targetName + "." + methodName + Arrays.toString(argTypes);
rateLimiter = rateLimiters.get(key);
if (null == rateLimiter) {
// 获取限定的流量
// 为了防止并发
rateLimiters.putIfAbsent(key, RateLimiter.create(limiter.value()));
rateLimiter = rateLimiters.get(key);
}
// 消耗一个令牌
rateLimiter.acquire();
point.proceed();
} else {
point.proceed();
}
LOG.debug("退出限流器");
}
}
计数器算法拦截器
import cn.com.yd.commons.currentlimiter.annotations.ShLimiter;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cglib.core.ReflectUtils;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
/**
* 计数器算法限流拦截器,基于Semaphore技术实现
*
* @author 李庆海、张蕴
*/
@Aspect
@Component
public class SemaphoreLimiterInterceptor {
private final Map<String, Semaphore> semaphores = new ConcurrentHashMap<String, Semaphore>();
private final static Logger LOG = LoggerFactory.getLogger(SemaphoreLimiterInterceptor.class);
@Pointcut("@annotation(cn.com.yd.commons.currentlimiter.annotations.ShLimiter)")
public void aspect() {
}
@Around(value = "aspect()")
public void around(ProceedingJoinPoint point) throws Throwable {
LOG.debug("进入“限流拦截器”");
// 返回目标对象
Object target = point.getTarget();
String targetName = target.getClass().getName();
// 返回当前连接点签名
String methodName = point.getSignature().getName();
// 获得参数列表
Object[] arguments = point.getArgs();
Class<?> targetClass = Class.forName(targetName);
// 获取参数类型数组
Class<?>[] argTypes = ReflectUtils.getClasses(arguments);
// 获取目标method,考虑方法的重载等问题
Method method = targetClass.getDeclaredMethod(methodName, argTypes);
// 获取目标method上的限流注解@Limiter
ShLimiter limiter = method.getAnnotation(ShLimiter.class);
if (null != limiter) {
// 以 class + method + parameters为key,避免重载、重写带来的混乱
String key = targetName + "." + methodName + Arrays.toString(argTypes);
// 获取限定的流量
Semaphore semaphore = semaphores.get(key);
if (null == semaphore) {
//为了预防并发
semaphores.putIfAbsent(key, new Semaphore(limiter.value()));
semaphore = semaphores.get(key);
}
try {
// 消耗一个令牌
semaphore.acquire();
// 调用被代理方法
point.proceed();
} finally {
// 释放令牌
if (null != semaphore) {
semaphore.release();
}
}
} else {
point.proceed();
}
LOG.debug("退出“限流拦截器”");
}
}
让自定义拦截器具备自动注入的能力
在resources目录下新建名为META-INF的文件夹,然后新建一个名为spring.factories的文件,在文件中增加下面的内容:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
cn.com.yd.commons.currentlimiter.interceptors.SemaphoreLimiterInterceptor,\
cn.com.yd.commons.currentlimiter.interceptors.TokenBucketLimiterInterceptor
至此,本工程中的代码已经具备了开箱即用的能力。使用Maven将本工程打包,即可在其他项目中引入使用。
3 在项目中使用
需要Maven依赖
<dependency>
<groupId>cn.com.yd.commons</groupId>
<artifactId>currentlimiter</artifactId>
<version>1.0.0</version>
</dependency>
在需要限流的方法、接口上添加限流注解
@TbLimiter和@ShLimiter两个注解都可以达到限流的目的,任选其中一个添加到需要限流的方法、接口上即可。注解的参数value默认值为1000。使用那个注解启用那个相关的拦截器。
/**
* 登录,1秒钟的时间内只允许1000个登录请求
* @param loginName 登录帐号
* @param password 登录密码
* @return
* @throws Exception
*/
@TbLimiter
public Object login(String loginName,String password)throws Exception;
/**
* 登录,1秒钟的时间内只允许100个退出请求
* @param loginName 登录帐号
* @return
* @throws Exception
*/
@TbLimiter(100)
public Object logout(String loginName)throws Exception;
网友评论