美文网首页
限流器的简单实现

限流器的简单实现

作者: 大风过岗 | 来源:发表于2020-11-04 16:36 被阅读0次

问题分析

场景

场景如下:我们有10w台设备,这10w台设备会不定时向服务端上报数据,如果消息的并发量比较大的话,服务端很容易处理不过来,为此,我们需要对单台设备的消息上报速率进行控制,比如,一台设备每秒钟最多发送
10条消息,如果超过10条,就不再处理此设备的后续消息。

分析

这里我们对消息进行限流的原因其实是我们处理不过来,服务没有能力在某一段时间内处理大量的消息。问题的本质是:服务端在每秒钟内对单台设备的处理能力。如果服务端在1秒钟内最多处理10条消息,那就是说服务端在每秒钟内对单台设备的处理能力是10,可见,这里受限的是 单台设备的处理能力。这时,我们就可以把单台设备的处理能力,抽象为一种资源池,处理能力的大小即为资源池的大小。对单台设备的消息限流,也就转化为对有限资源池的使用。这时,我们就可以利用Semopher来构建一个资源池,来达到对资源合理使用。

semaphore介绍

Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。
Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池,我们也可以创建计数为1的Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。

代码实现

利用Semaphore实现限流器

限流器接口

public interface IRateLimiter {


    boolean tryAcquire();

    void stop();

}

限流器实现类

public class SimpleRateLimiter implements IRateLimiter {


   private static  final Logger infoLogger = HerculesLoggerFactory.getServerInfoLogger(SimpleRateLimiter.class);
   private static  final Logger errorLogger= HerculesLoggerFactory.getServerErrorLogger(SimpleRateLimiter.class);

   private Semaphore semaphore;
   private int maxPermits;
   private long timePeriod;
   private ScheduledExecutorService scheduler;


   /**
    * 创建一个限流器
    *
    *
    * @param permits
    * @param timePeriod
    * @return
    */
   public static  SimpleRateLimiter create(int permits, long timePeriod){
       SimpleRateLimiter limiter = new SimpleRateLimiter(permits,timePeriod);
       limiter.schedulePermitReplenishment();
       return limiter;
   }

   public SimpleRateLimiter(int maxPermits, long timePeriod) {
       //资源
       Semaphore semaphoreInde = new Semaphore(maxPermits);
       ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
       this.semaphore = semaphoreInde;
       this.maxPermits = maxPermits;
       this.timePeriod = timePeriod;
       this.scheduler = scheduledExecutorService;
   }

   public SimpleRateLimiter(Semaphore semaphore, int maxPermits, long  timePeriod, ScheduledExecutorService scheduler) {
       this.semaphore = semaphore;
       this.maxPermits = maxPermits;
       this.timePeriod = timePeriod;
       this.scheduler = scheduler;
   }


   /**
    * 没过一段时间释放一次锁
    */


   @Override
   public boolean tryAcquire() {
       return semaphore.tryAcquire();
   }

   @Override
   public void stop() {
       //关闭
       if(scheduler != null){
           scheduler.shutdown();//拒绝接收新任务
           try {
               if(!scheduler.awaitTermination(500,TimeUnit.MILLISECONDS)){
                   scheduler.shutdownNow();//暴力关闭
                   //再次检查是否关闭
                   if(!scheduler.awaitTermination(200,TimeUnit.MILLISECONDS)){
                       errorLogger.error("【限流器调度线程池关闭失败!】");
                   }
               }
           }catch (InterruptedException ex){
               errorLogger.error("【限流器关闭时遭遇中断异常:{} !】",ex.getCause());
               //强行关闭
               scheduler.shutdownNow();
               //恢复中断状态
               Thread.currentThread().interrupt();
           }
       }
       //
       if(semaphore != null){
           semaphore = null;
       }
       if(scheduler != null){
           scheduler = null;
       }
   }


   private void  schedulePermitReplenishment(){
       if(scheduler != null){
           scheduler.scheduleAtFixedRate(()->{
               //每过一段时间,释放一次
               //把已经占用的资源释放掉
               semaphore.release(maxPermits - semaphore.availablePermits());
           },1,timePeriod,TimeUnit.SECONDS);
       }
   }
}

使用限流器,完成消息限流

public class TestRateLimit {

    private static  final Logger log = HerculesLoggerFactory.getServerInfoLogger(TestRateLimit.class);
    private static  final Logger errorLogger= HerculesLoggerFactory.getServerErrorLogger(TestRateLimit.class);

    private static final ConcurrentHashMap<String, SimpleRateLimiter> deviceRateLimitTable = new ConcurrentHashMap<>(1024);

    private static final int  DEVICE_MSG_MAX_LIMIT_PER_SECOND = 10;


    /**
     * 消息到来时,以设备为单位对消息进行限流。单台设备的每秒钟的消息量的上限为:10
     *
     * @param topic
     * @param message
     * @throws Exception
     */
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        log.info("Received message topic:" + topic);
        try{
            MqttMessageReceiveCommon receiveCommon = JSONObject.parseObject(message.getPayload(), MqttMessageReceiveCommon.class);
            if(ObjectUtils.isEmpty(receiveCommon.getDeviceId())){
                return;
            }
            SimpleRateLimiter rateLimiter =  deviceRateLimit(receiveCommon.getDeviceId());
            //判断该设备的消息速率有没有达到上限
            if(!rateLimiter.tryAcquire()){
                //如果此设备的消息已达到上限,则忽略此消息
                return;
            }else {
                //如果没有达到上限,则执行业务逻辑处理
                //TODO handle the message
            }
        }catch (Exception e){
            errorLogger.error("【消息接收时,发送异常: {},消息是: {}】",e.getMessage(),message.toString());
        }
    }

    private SimpleRateLimiter deviceRateLimit(String deviceId) {
        SimpleRateLimiter rateLimiter = deviceRateLimitTable.get(deviceId);
        if(rateLimiter == null){
            rateLimiter = SimpleRateLimiter.create(DEVICE_MSG_MAX_LIMIT_PER_SECOND, 1);
            //放入
            SimpleRateLimiter oldLimiter = deviceRateLimitTable.putIfAbsent(deviceId, rateLimiter);
            if(oldLimiter != null){
                //多创建的,必须要销毁
                rateLimiter.stop();
                rateLimiter = null;
                return oldLimiter;
            }
        }
        return rateLimiter;
    }
}

相关文章

  • 限流器的简单实现

    问题分析 场景 场景如下:我们有10w台设备,这10w台设备会不定时向服务端上报数据,如果消息的并发量比较大的话,...

  • 基于计数器的服务接口限流实例

    计数器限流是服务接口限流策略中最为基本和简单的方式。本实例将实现不同接口设置不同的限流方案。 首先我们需要需要定义...

  • 经典限流算法:令牌桶算法

    Guava 的限流器使用上还是很简单的,那它是如何实现的呢?Guava 采用的是令牌桶算法,其核心是要想通过限流器...

  • 限流算法实现

    并发数限流 1. 计数器并发数限流 2. 信号量(Semaphore) 其实最简单的方法就是用信号量来实现: QP...

  • 限流降级方案

    限流算法 并发数限流 计数器并发数限流:使用共享变量实现 信号量:使用java中的Semaphore QPS限流 ...

  • Guava RateLimiter的实现

    限流 高并发系统有三大利器:缓存 、限流 、降级。对于限流的实现,有多种算法:计数器,漏桶法,令牌桶法。计数器法无...

  • SpringBoot基于RateLimiter+AOP动态的为不

    一 限流实现: RateLimiter是guava提供的基于令牌桶算法的实现类,可以非常简单的完成限流特技,并且根...

  • 单机限流 - 限流算法及隔离策略

    限流算法 - 计数器 计数器是一种比较简单的限流算法,用途比较广泛,在接口层面,很多地方使用这种方式限流。在一段时...

  • 简单限流器封装

    简单限流器封装 开发过程中有时候 我们会做一些简单的限流 操作,比如 告警提醒,发送验证码 等,希望在 一段时间 ...

  • 限流器实现原理

    在开发高并发系统时,有三把利器用来保护系统:缓存、降级和限流。缓存这个概念是大家共识的,没有什么异议,但在好些文章...

网友评论

      本文标题:限流器的简单实现

      本文链接:https://www.haomeiwen.com/subject/kamgvktx.html