限流令牌桶实现

作者: Cc_7397 | 来源:发表于2019-11-14 15:27 被阅读0次

    限流令牌桶实现

    我们redis最开始的限流只是用Semaphore信号量来限流,一个请求先acquire 然后在release

    但是这样的方法没有时间的概念,限流情况并不好。

    比如Semaphore容量为1000,一个请求耗时100ms,那么理论的1s的最大流量应该是

    1000/10 * 1000 **一秒一个并发可以走10个请求,最多1000并发 **

    具体的qps是和请求执行时间有关的。

    令牌桶

    令牌桶可以保证最大qps为固定值,原来为先有一个固定容量的桶来存令牌。每个请求要先从桶中拿到令牌才能进行。然后拿不到就堵塞。

    一个单独的线程以一定频率向桶中放令牌,这个发入得流量就是限流的最大流量

    实现

    先定义一个接口

    public interface Limit {
    
        void acquire() throws InterruptedException;
    
        void release();
    }
    
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    import java.util.concurrent.locks.LockSupport;
    
    public class TokenBucket implements Limit {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(TokenBucket.class);
        //线程index
        private static final AtomicInteger INTEGER = new AtomicInteger();
        //信号量
        private Semaphore semaphore;
    
        private int putSize;
        private long time;
        private TimeUnit timeUnit;
        //名称
        private String name;
        //put令牌的线程
        private Thread putThread;
        //标志位
        private volatile boolean isStop = true;
    
        public TokenBucket(int initSize, int putSize, long time, TimeUnit timeUnit) {
            this(TokenBucket.class.getName(), initSize, putSize, time, timeUnit);
        }
    
        public TokenBucket(String name, int initSize, int putSize, long time, TimeUnit timeUnit) {
            this.name = name;
            this.semaphore = new Semaphore(initSize);
            this.putSize = putSize;
            this.time = time;
            this.timeUnit = timeUnit;
            start();
        }
    
        //获取令牌
        @Override
        public void acquire() throws InterruptedException {
            if (!isStop) {
                semaphore.acquire(1);
            }
        }
        //释放令牌 (什么也不做)
        @Override
        public void release() {
            //do nothing
        }
    
        private void start() {
            isStop = false;
            Thread thread = new Thread(() -> {
                while (true) {
                    if (Thread.currentThread().isInterrupted()) {
                        if (isStop) {
                            LOGGER.info("TokenBucket " + name + " shutdown ");
                            return;
                        }
                    }
                    put(putSize, time, timeUnit);
                }
            }, "TokenBucket_thread_" + name + "_" + INTEGER.getAndIncrement());
            this.putThread = thread;
            thread.start();
        }
    
        private void put(int putSize, long time, TimeUnit timeUnit) {
            semaphore.release(putSize);
            //挂起线程,可响应中断
            LockSupport.parkNanos(name, timeUnit.toNanos(time));
        }
    
        //停止限流
        public void shutdown() {
            isStop = true;
            //中断 put线程
            putThread.interrupt();
        }
    
    }
    
    

    上面的代码可以严格保证最大qps。

    如果想让流量的曲线更平滑可以增加put的频率,减小每次put的大小

    问题

    如果下游系统出了问题,响应时间非常长,但是令牌桶在发令牌时是不去考虑下游系统的。

    最好可以配合熔断和快速失败来做。

    相关文章

      网友评论

        本文标题:限流令牌桶实现

        本文链接:https://www.haomeiwen.com/subject/ayprictx.html