美文网首页SpringBoot极简教程 · Spring Boot 程序员
使用AOP和自定义注解实现限流策略

使用AOP和自定义注解实现限流策略

作者: 固安李庆海 | 来源:发表于2018-09-28 09:23 被阅读13次

    1 背景

    业务背景

    在大数据量高并发访问时,经常会出现服务或接口面对暴涨的请求而不可用的情况,甚至引发连锁反映导致整个系统崩溃。此时需要使用的技术手段之一就是限流:当请求达到一定的并发数或速率,就进行等待、排队、降级、拒绝服务等。在限流时,常见的算法是计数器算法和令牌桶算法。

    技术背景

    SpringBoot2.X
    JDK1.8
    guava 23.6-jre
    aop

    算法简介

    令牌桶算法

    令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。 当桶满时,新添加的令牌被丢弃或拒绝。

    计数器算法

    计数器限流算法主要用来限制总并发数,比如数据库连接池大小、线程池大小、程序访问并发数等都是使用计数器算法。

    2 技术实现

    为达到复用、简便、代码零污染等目的,使用AOP+自定义注解技术进行实现。

    创建一个SpringBoot Starter工程

    具体步骤可参考使用STS创建Spring Boot 项目。然后将pom.xml文件清理成下面这个样子。

    <groupId>cn.com.yd.commons</groupId>
    <artifactId>currentlimiter</artifactId>
    <version>1.0.0</version>
    <packaging>jar</packaging>
    
    <name>currentlimiter</name>
    <description>基于 spring aop 限流器</description>
    
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
    </properties>
    
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>2.0.2.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-log4j2</artifactId>
        </dependency>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
        </dependency>
    
        <!-- 项目单独需要的 jar -->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>23.6-jre</version>
        </dependency>
    </dependencies>
    

    自定义注解

    令牌桶算法注解

    import java.lang.annotation.*;
    
    @Documented
    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    
    /**
     *
     * 令牌桶限流注解,默认流量1000
     * @author 李庆海
     *
     */
    public @interface TbLimiter {
        int value() default 1000;
    }
    

    计数器算法注解

    import java.lang.annotation.*;
    
    @Documented
    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    
    /**
     *
     * 限流注解,默认流量1000
     * @author 李庆海
     *
     */
    public @interface ShLimiter {
        int value() default 1000;
    }
    

    基于AOP技术实现的拦截器

    令牌桶算法拦截器

    import cn.com.yd.commons.currentlimiter.annotations.TbLimiter;
    import com.google.common.util.concurrent.RateLimiter;
    import org.aspectj.lang.JoinPoint;
    import org.aspectj.lang.annotation.Around;
    import org.aspectj.lang.annotation.Aspect;
    import org.aspectj.lang.annotation.Pointcut;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.cglib.core.ReflectUtils;
    import org.springframework.stereotype.Component;
    
    import java.lang.reflect.Method;
    import java.util.Arrays;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * 令牌桶算法限流拦截器
     *
     * @author 李庆海、张蕴
     */
    @Aspect
    @Component
    public class TokenBucketLimiterInterceptor {
        private final Map<String, RateLimiter> rateLimiters = new ConcurrentHashMap<String, RateLimiter>();
        private final static Logger LOG = LoggerFactory.getLogger(TokenBucketLimiterInterceptor.class);
    
     @Pointcut("@annotation(cn.com.yd.commons.currentlimiter.annotations.TbLimiter)")
        public void aspect() {
        }
    
        @Around(value = "aspect()")
        public void around(JoinPoint point) throws Throwable {
            LOG.debug("进入限流器");
            // 返回目标对象
            Object target = point.getTarget();
            String targetName = target.getClass().getName();
            // 返回当前连接点签名
            String methodName = point.getSignature().getName();
            // 获得参数列表
            Object[] arguments = point.getArgs();
    
            Class<?> targetClass = Class.forName(targetName);
            // 获取参数类型数组
            Class<?>[] argTypes = ReflectUtils.getClasses(arguments);
            // 获取目标method,考虑方法的重载等问题
            Method method = targetClass.getDeclaredMethod(methodName, argTypes);
            // 获取目标method上的限流注解@Limiter
            TbLimiter limiter = method.getAnnotation(TbLimiter.class);
            RateLimiter rateLimiter = null;
            if (null != limiter) {
                // 以 class + method + parameters为key,避免重载、重写带来的混乱
                String key = targetName + "." + methodName + Arrays.toString(argTypes);
                rateLimiter = rateLimiters.get(key);
                if (null == rateLimiter) {
                    // 获取限定的流量
                    // 为了防止并发
                    rateLimiters.putIfAbsent(key, RateLimiter.create(limiter.value()));
                    rateLimiter = rateLimiters.get(key);
                }
                // 消耗一个令牌
                rateLimiter.acquire();
                point.proceed();
            } else {
                point.proceed();
            }
            LOG.debug("退出限流器");
        }
    }
    

    计数器算法拦截器

    import cn.com.yd.commons.currentlimiter.annotations.ShLimiter;
    import org.aspectj.lang.ProceedingJoinPoint;
    import org.aspectj.lang.annotation.Around;
    import org.aspectj.lang.annotation.Aspect;
    import org.aspectj.lang.annotation.Pointcut;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.cglib.core.ReflectUtils;
    import org.springframework.stereotype.Component;
    
    import java.lang.reflect.Method;
    import java.util.Arrays;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.Semaphore;
    
    /**
     * 计数器算法限流拦截器,基于Semaphore技术实现
     *
     * @author 李庆海、张蕴
     */
    @Aspect
    @Component
    public class SemaphoreLimiterInterceptor {
        private final Map<String, Semaphore> semaphores = new ConcurrentHashMap<String, Semaphore>();
        private final static Logger LOG = LoggerFactory.getLogger(SemaphoreLimiterInterceptor.class);
    
     @Pointcut("@annotation(cn.com.yd.commons.currentlimiter.annotations.ShLimiter)")
        public void aspect() {
    
        }
    
        @Around(value = "aspect()")
        public void around(ProceedingJoinPoint point) throws Throwable {
            LOG.debug("进入“限流拦截器”");
            // 返回目标对象
            Object target = point.getTarget();
            String targetName = target.getClass().getName();
            // 返回当前连接点签名
            String methodName = point.getSignature().getName();
            // 获得参数列表
            Object[] arguments = point.getArgs();
    
            Class<?> targetClass = Class.forName(targetName);
            // 获取参数类型数组
            Class<?>[] argTypes = ReflectUtils.getClasses(arguments);
            // 获取目标method,考虑方法的重载等问题
            Method method = targetClass.getDeclaredMethod(methodName, argTypes);
            // 获取目标method上的限流注解@Limiter
            ShLimiter limiter = method.getAnnotation(ShLimiter.class);
    
            if (null != limiter) {
                // 以 class + method + parameters为key,避免重载、重写带来的混乱
                String key = targetName + "." + methodName + Arrays.toString(argTypes);
                // 获取限定的流量
                Semaphore semaphore = semaphores.get(key);
                if (null == semaphore) {
                    //为了预防并发
                    semaphores.putIfAbsent(key, new Semaphore(limiter.value()));
                    semaphore = semaphores.get(key);
                }
    
                try {
                    // 消耗一个令牌
                    semaphore.acquire();
                    // 调用被代理方法
                    point.proceed();
                } finally {
                    // 释放令牌
                    if (null != semaphore) {
                        semaphore.release();
                    }
                }
            } else {
                point.proceed();
            }
            LOG.debug("退出“限流拦截器”");
        }
    }
    

    让自定义拦截器具备自动注入的能力

    在resources目录下新建名为META-INF的文件夹,然后新建一个名为spring.factories的文件,在文件中增加下面的内容:

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
      cn.com.yd.commons.currentlimiter.interceptors.SemaphoreLimiterInterceptor,\
      cn.com.yd.commons.currentlimiter.interceptors.TokenBucketLimiterInterceptor
    

    至此,本工程中的代码已经具备了开箱即用的能力。使用Maven将本工程打包,即可在其他项目中引入使用。

    3 在项目中使用

    需要Maven依赖

    <dependency>
        <groupId>cn.com.yd.commons</groupId>
        <artifactId>currentlimiter</artifactId>
        <version>1.0.0</version>
    </dependency>
    

    在需要限流的方法、接口上添加限流注解

    @TbLimiter和@ShLimiter两个注解都可以达到限流的目的,任选其中一个添加到需要限流的方法、接口上即可。注解的参数value默认值为1000。使用那个注解启用那个相关的拦截器。

    /**
     * 登录,1秒钟的时间内只允许1000个登录请求
     * @param loginName 登录帐号
     * @param password 登录密码
     * @return
     * @throws Exception
     */
    @TbLimiter
    public Object login(String loginName,String password)throws Exception;
    
    /**
     * 登录,1秒钟的时间内只允许100个退出请求
     * @param loginName 登录帐号
     * @return
     * @throws Exception
     */
    @TbLimiter(100)
    public Object logout(String loginName)throws Exception;
    

    相关文章

      网友评论

        本文标题:使用AOP和自定义注解实现限流策略

        本文链接:https://www.haomeiwen.com/subject/uheuoftx.html