美文网首页
限流从概念到实现

限流从概念到实现

作者: Java大生 | 来源:发表于2019-01-23 11:13 被阅读0次

    并发数限流

    并发数限流限制的是同一时刻的并发数,所以不考虑线程安全的话,我们只要用一个int变量就能实现,伪代码如下:

    int maxRequest=100;

    int nowRequest=0;

    public void request(){

        if(nowRequest>=maxRequest){

            return ;

        }

        nowRequest++;

        //调用接口

        try{

            invokeXXX();   

        }finally{

            nowRequest--;

        }

    }

    显然,上述实现会有线程安全的问题,最直接的做法是加锁:

    int maxRequest=100;

    int nowRequest=0;

    public void request(){

        if(nowRequest>=maxRequest){

            return ;

        }

    synchronized(this){

            if(nowRequest>=maxRequest){

            return ;

        }

        nowRequest++;

    }

        //调用接口

        try{

            invokeXXX();   

        }finally{

            synchronized(this){

            nowRequest--;

            }

        }

    }

    当然也可以用AtomicInteger实现:

    int maxRequest=100;

    AtomicInteger nowRequest=new AtomicInteger(0);

    public void request(){

        for(;;){

            int currentReq=nowRequest.get();

            if(currentReq>=maxRequest){

                return;

            }

            if(nowRequest.compareAndSet(currentReq,currentReq+1)){

                break;

            }

        }

        //调用接口

        try{

            invokeXXX();   

        }finally{

            nowRequest.decrementAndGet();

        }

    }

    熟悉JDK并发包的同学会说干嘛这么麻烦,这不就是信号量(Semaphore)做的事情吗? 对的,其实最简单的方法就是用信号量来实现:

    int maxRequest=100;

    Semaphore reqSemaphore = new Semaphore(maxRequest);

    public void request(){

        if(!reqSemaphore.tryAcquire()){

            return ;

        }

        //调用接口

        try{

            invokeXXX();   

        }finally{

          reqSemaphore.release();

        }

    }

    条条大路通罗马,并发数限流比较简单,一般来说用信号量就好。

    QPS限流

    QPS限流限制的是一段时间内(一般指1秒)的请求个数。

    计数器法

    最简单的做法用一个int型的count变量做计数器:请求前计数器+1,如超过阈值并且与第一个请求的间隔还在1s内,则限流。

    伪代码如下:

    int maxQps=100;

    int count;

    long timeStamp=System.currentTimeMillis();

    long interval=1000;

    public synchronized boolean grant(){

    long now=System.currentTimeMillis();

        if(now<timeStamp+interval){

            count++;

            return count<maxQps;

        }else{

            timeStamp=now;

            count=1;

            return true;

        }

    }

    该种方法实现起来很简单,但其实是有临界问题的,假如在第一秒的后500ms来了100个请求,第2秒的前500ms来了100个请求,那在这1秒内其实最大QPS为200。如下图:

    计数器法会有临界问题,主要还是统计的精度太低,这点可以通过滑动窗口算法解决

    滑动窗口

    我们用一个长度为10的数组表示1秒内的QPS请求,数组每个元素对应了相应100ms内的请求数。用一个 sum 变量代码当前1s的请求数。同时每隔100ms将淘汰过期的值。

    伪代码如下:

    int maxQps=100;

    AtomicInteger[] count=new AtomicInteger[10];

    long timeStamp=System.currentTimeMillis();

    long interval=1000;

    AtomicInteger sum;

    volatile int index;

    public void init(){

        for(int i=0;i<count.length;i++){

            count[i]=new AtomicInteger(0);

        }

        sum=new AtomicInteger(0);

    }

    public synchronized boolean  grant(){

        count[index].incrementAndGet();

        return sum.incrementAndGet()<maxQps;

    }

    //每100ms执行一次

    public void run(){

        index=(index+1)%count.length;

        int val=count[index].getAndSet(0);

        sum.addAndGet(-val);

    }

    滑动窗口的窗口越小,则精度越高,相应的资源消耗也更高。

    漏桶算法

    漏桶算法思路是,有一个固定大小的桶,水(请求)忽快忽慢的进入到漏桶里,漏桶以一定的速度出水。当桶满了之后会发生溢出。

    在 维基百科 上可以看到,漏桶算法有两种实现,一种是 as a meter ,另一种是 as a queue。 网上大多数文章都没有提到其有两种实现,且对这两种概念混乱。

    As a meter

    第一种实现是和令牌桶等价的,只是表述角度不同。

    伪代码如下:

    long timeStamp=System.currentTimeMillis();//上一次调用grant的时间

    int bucketSize=100;//桶大小

    int rate=10;//每ms流出多少请求

    int count;//目前的水量

    public synchronized boolean grant(){

        long now = System.currentTimeMillis();

        if(now>timeStamp){

            count = Math.max(0,count-(now-timeStamp)*rate);

            timeStamp = now;

        }

        if(count+1<=bucketSize){

            count++;

            return true;

        }else{

            return false;

        }

    }

    该种实现允许一段时间内的突发流量,比如初始时桶中没有水,这时1ms内来了100个请求,这100个请求是不会被限流的,但之后每ms最多只能接受10个请求(比如下1ms又来了100个请求,那其中90个请求是会被限流的)。

    其达到的效果和令牌桶一样。

    As a queue

    第二种实现是用一个队列实现,当请求到来时如果队列没满则加入到队列中,否则拒绝掉新的请求。同时会以恒定的速率从队列中取出请求执行。

    伪代码如下:

    Queue<Request> queue=new LinkedBlockingQueue(100);

    int gap;

    int rate;

    public synchronized boolean grant(Request req){

    if(!queue.offer(req)){return false;}

    }

    // 单独线程执行

    void consume(){

        while(true){

            for(int i=0;i<rate;i++){

                //执行请求

                Request req=queue.poll();

                if(req==null){break;}

                req.doRequest();

            }

            Thread.sleep(gap);

        }

    }

    对于该种算法,固定的限定了请求的速度,不允许流量突发的情况。

    比如初始时桶是空的,这时1ms内来了100个请求,那只有前10个会被接受,其他的会被拒绝掉。注意与上文中 as a meter 实现的区别。

    **不过,当桶的大小等于每个ticket流出的水大小时,第二种漏桶算法和第一种漏桶算法是等价的。**也就是说, as a queue 是 as a meter 的一种特殊实现。如果你没有理解这句话,你可以再看看上面 as a meter 的伪代码,当 bucketSize==rate 时,请求速度就是恒定的,不允许突发流量。

    令牌桶算法

    令牌桶算法的思想就是,桶中最多有N个令牌,会以一定速率往桶中加令牌,每个请求都需要从令牌桶中取出相应的令牌才能放行,如果桶中没有令牌则被限流。

    令牌桶算法与上文的漏桶算法 as a meter 实现是等价的,能够在限制数据的平均传输速率的同时还允许某种程度的突发传输。伪代码:

    int token;

    int bucketSize;

    int rate;

    long timeStamp=System.currentTimeMillis();

    public synchronized boolean grant(){

    long now=System.currentTimeMillis();

        if(now>timeStamp){

            token=Math.max(bucketSize,token+(timeStamp-now)*rate);

            timeStamp=now;

        }

        if(token>0){

            token--;

        return true;

        }else{

            return false;

        }

    }

    漏桶算法两种实现和令牌桶算法的对比

    as a meter 的漏桶算法和令牌桶算法是一样的,只是思想角度有所不同。

    as a queue 的漏桶算法能强行限制数据的传输速率,而令牌桶和 as a meter 漏桶则 能够在限制数据的平均传输速率的同时还允许某种程度的突发传输。

    一般业界用的比较多的是令牌桶算法,像guava中的 RateLimiter 就是基于令牌桶算法实现的。当然不同的业务场景会有不同的需要,具体的选择还是要结合场景。

    End

    本文介绍了后端系统中常用的限流算法,对于每种算法都有对应的伪代码,结合伪代码理解起来应该不难。但伪代码中只是描述了大致思想,对于一些细节和效率问题并没有关注,所以下篇文章将会分析常用限流API:guava的 RateLimiter 的源码实现,让读者对于限流有个更清晰的认识。

    相关文章

      网友评论

          本文标题:限流从概念到实现

          本文链接:https://www.haomeiwen.com/subject/zdljjqtx.html