美文网首页
应用层限流:令牌桶算法与RateLimiter

应用层限流:令牌桶算法与RateLimiter

作者: lois想当大佬 | 来源:发表于2020-05-04 16:51 被阅读0次

一、高并发处理思想
建议先看看高并发的处理思想,脑子里先有个全局概念。
高并发处理的5个思想

二、令牌桶算法
以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。令牌桶允许请求某种程度的突发传输。这是令牌桶区别于漏桶的地方。

令牌桶算法

三、代码示例
pom.xml依赖

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>28.1-jre</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>

1、请求速率限制注解类

package com.hello;

import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;

@Target({ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RequestRateLimitAnnotation {
    /**
     * 固定令牌个数
     * @return
     */
    double limitNum();
    /**
     * 获取令牌超时时间
     * @return
     */
    long timeout();
    /**
     * 单位-默认毫秒
     * @return
     */
    TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
    /**
     * 无法获取令牌时的错误信息
     * @return
     */
    String errMsg() default "请求太频繁!";
}

2、aop拦截器

package com.hello;

import com.google.common.collect.Maps;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.servlet.http.HttpServletRequest;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.Objects;

@Slf4j
@Aspect
@Component
public class RequestRateLimitAspect {
    /**
     * 使用url做为key,存放令牌桶 防止每次重新创建令牌桶
     */
    private Map<String, RateLimiter> limitMap = Maps.newConcurrentMap();

    // 拦截带RequestRateLimitAnnotation注解的接口         
  @Pointcut("@annotation(com.hello.RequestRateLimitAnnotation)")
    public void execMethod() {}

    @Around("execMethod()")
    public Object doAround(ProceedingJoinPoint joinPoint) throws Throwable {
        //获取请求uri
        HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
        String reqUrl=request.getRequestURI();

        // 获取令牌
        RequestRateLimitAnnotation rateLimiter = this.getRequestRateLimiter(joinPoint);
        if (Objects.nonNull(rateLimiter)) {
            RateLimiter limiter = getRateLimiter(reqUrl, rateLimiter);
            // 请求获取令牌,参数为等待超时时间
            boolean acquire = limiter.tryAcquire(rateLimiter.timeout(), rateLimiter.timeUnit());
            if (!acquire) {
                return new Response(200, reqUrl.concat(rateLimiter.errMsg())).toString();
            }
        }

        //获得令牌,继续执行
        return joinPoint.proceed();
    }


    /**
     * 获取RateLimiter
     * @return
     */
    private RateLimiter getRateLimiter(String reqUrl, RequestRateLimitAnnotation rateLimiter) {
        RateLimiter limiter = limitMap.get(reqUrl);
        if (Objects.isNull(limiter)) {
            synchronized (this) {
                limiter = limitMap.get(reqUrl);
                if (Objects.isNull(limiter)) {
                    // 创建一个限流器,参数代表每秒生成的令牌数
                    limiter = RateLimiter.create(rateLimiter.limitNum());
                    limitMap.put(reqUrl, limiter);
                    log.info("RequestRateLimitAspect请求{},创建令牌桶,容量{} 成功!!!", reqUrl, rateLimiter.limitNum());
                }
            }
        }
        return limiter;
    }

    /**
     * 获取注解对象
     * @param joinPoint 对象
     * @return ten LogAnnotation
     */
    private RequestRateLimitAnnotation getRequestRateLimiter(final JoinPoint joinPoint) {
        Method[] methods = joinPoint.getTarget().getClass().getDeclaredMethods();
        String name = joinPoint.getSignature().getName();
        if (!StringUtils.isEmpty(name)) {
            for (Method method : methods) {
                RequestRateLimitAnnotation annotation = method.getAnnotation(RequestRateLimitAnnotation.class);
                if (!Objects.isNull(annotation) && name.equals(method.getName())) {
                    return annotation;
                }
            }
        }
        return null;
    }
}

3、控制器类

package com.hello;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/test")
public class TestController {

    @RequestRateLimitAnnotation(limitNum = 2, timeout = 10)
    @GetMapping("/rate_limit")
    public String testRateLimit() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        /**
         * 测试代码
         */
        return "success";
    }
}

4、测试类

package com.hello;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class AccessClient {

    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);

    /**
     * get请求
     * @param realUrl
     * @return
     */
    public static String sendGet(URL realUrl) {
        String result = "";
        BufferedReader in = null;
        try {
            // 打开和URL之间的连接
            URLConnection connection = realUrl.openConnection();
            // 设置通用的请求属性
            connection.setRequestProperty("accept", "*/*");
            connection.setRequestProperty("connection", "Keep-Alive");
            connection.setRequestProperty("user-agent",
                    "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1;SV1)");
            // 建立实际的连接
            connection.connect();

            // 定义 BufferedReader输入流来读取URL的响应
            in = new BufferedReader(new InputStreamReader(
                    connection.getInputStream()));
            String line;
            while ((line = in.readLine()) != null) {
                result += line;
            }
        } catch (Exception e) {
            System.out.println("发送GET请求出现异常!" + e);
            e.printStackTrace();
        }
        // 使用finally块来关闭输入流
        finally {
            try {
                if (in != null) {
                    in.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
        return result;
    }

    public void access() throws Exception{
        final URL url = new URL("http://localhost:8080/test/rate_limit");

        for(int i=0;i<4;i++) {
            fixedThreadPool.submit(new Runnable() {
                public void run() {
                    System.out.println(sendGet(url));
                }
            });
        }

        fixedThreadPool.shutdown();
        fixedThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
    }

    public static void main(String[] args) throws Exception{
        AccessClient accessClient = new AccessClient();
        accessClient.access();
    }

}

测试结果:

Response(code=200, msg=/test/rate_limit请求太频繁!)
success
success
success

结果分析:
  部分请求由于获取的令牌可以成功执行,其余请求没有拿到令牌,我们可以根据实际业务来做区分处理。还有一点要注意,我们通过RateLimiter.create(2.0)配置的是每一秒2枚令牌,但是限流的时候发出的是3枚,改用其他值验证,也是实际的比配置的大1。

四、RateLimiter源码解析
待补充。

五、qps多大合适?
一直再说高并发,多少QPS才算高并发?
Web开发中,什么级别才算是高并发
总结我比较关心的几点:
1、如果某个系统的日pv在千万级别以上,他就可能是一个高并发的系统。
2、PV和QPS
比如微博每天1亿多pv的系统一般也就1500QPS,5000QPS峰值。
比如有人说:
2C4G机器单机一般1000QPS。
8C8G机器单机可承受7000QPS。
3、具体多少QPS跟业务强相关,只读接口读缓存,将压力给到缓存单机3000+没问题,写请求1000+也正常,也复杂些可能也就几百+QPS。
所以QPS和业务场景和设计相关性很大,比如可以通过浏览器本地缓存,用缓存做热点数据查询,写事务MQ异步处理等方式提升QPS。

参考资料

https://www.cnblogs.com/itfly8/p/12589212.html

相关文章

网友评论

      本文标题:应用层限流:令牌桶算法与RateLimiter

      本文链接:https://www.haomeiwen.com/subject/wfkeghtx.html