问题分析
场景
场景如下:我们有10w台设备,这10w台设备会不定时向服务端上报数据,如果消息的并发量比较大的话,服务端很容易处理不过来,为此,我们需要对单台设备的消息上报速率进行控制,比如,一台设备每秒钟最多发送
10条消息,如果超过10条,就不再处理此设备的后续消息。
分析
这里我们对消息进行限流的原因其实是我们处理不过来,服务没有能力在某一段时间内处理大量的消息。问题的本质是:服务端在每秒钟内对单台设备的处理能力。如果服务端在1秒钟内最多处理10条消息,那就是说服务端在每秒钟内对单台设备的处理能力是10,可见,这里受限的是 单台设备的处理能力。这时,我们就可以把单台设备的处理能力,抽象为一种资源池,处理能力的大小即为资源池的大小。对单台设备的消息限流,也就转化为对有限资源池的使用。这时,我们就可以利用Semopher来构建一个资源池,来达到对资源合理使用。
semaphore介绍
Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。
Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池,我们也可以创建计数为1的Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。
代码实现
利用Semaphore实现限流器
限流器接口
public interface IRateLimiter {
boolean tryAcquire();
void stop();
}
限流器实现类
public class SimpleRateLimiter implements IRateLimiter {
private static final Logger infoLogger = HerculesLoggerFactory.getServerInfoLogger(SimpleRateLimiter.class);
private static final Logger errorLogger= HerculesLoggerFactory.getServerErrorLogger(SimpleRateLimiter.class);
private Semaphore semaphore;
private int maxPermits;
private long timePeriod;
private ScheduledExecutorService scheduler;
/**
* 创建一个限流器
*
*
* @param permits
* @param timePeriod
* @return
*/
public static SimpleRateLimiter create(int permits, long timePeriod){
SimpleRateLimiter limiter = new SimpleRateLimiter(permits,timePeriod);
limiter.schedulePermitReplenishment();
return limiter;
}
public SimpleRateLimiter(int maxPermits, long timePeriod) {
//资源
Semaphore semaphoreInde = new Semaphore(maxPermits);
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
this.semaphore = semaphoreInde;
this.maxPermits = maxPermits;
this.timePeriod = timePeriod;
this.scheduler = scheduledExecutorService;
}
public SimpleRateLimiter(Semaphore semaphore, int maxPermits, long timePeriod, ScheduledExecutorService scheduler) {
this.semaphore = semaphore;
this.maxPermits = maxPermits;
this.timePeriod = timePeriod;
this.scheduler = scheduler;
}
/**
* 没过一段时间释放一次锁
*/
@Override
public boolean tryAcquire() {
return semaphore.tryAcquire();
}
@Override
public void stop() {
//关闭
if(scheduler != null){
scheduler.shutdown();//拒绝接收新任务
try {
if(!scheduler.awaitTermination(500,TimeUnit.MILLISECONDS)){
scheduler.shutdownNow();//暴力关闭
//再次检查是否关闭
if(!scheduler.awaitTermination(200,TimeUnit.MILLISECONDS)){
errorLogger.error("【限流器调度线程池关闭失败!】");
}
}
}catch (InterruptedException ex){
errorLogger.error("【限流器关闭时遭遇中断异常:{} !】",ex.getCause());
//强行关闭
scheduler.shutdownNow();
//恢复中断状态
Thread.currentThread().interrupt();
}
}
//
if(semaphore != null){
semaphore = null;
}
if(scheduler != null){
scheduler = null;
}
}
private void schedulePermitReplenishment(){
if(scheduler != null){
scheduler.scheduleAtFixedRate(()->{
//每过一段时间,释放一次
//把已经占用的资源释放掉
semaphore.release(maxPermits - semaphore.availablePermits());
},1,timePeriod,TimeUnit.SECONDS);
}
}
}
使用限流器,完成消息限流
public class TestRateLimit {
private static final Logger log = HerculesLoggerFactory.getServerInfoLogger(TestRateLimit.class);
private static final Logger errorLogger= HerculesLoggerFactory.getServerErrorLogger(TestRateLimit.class);
private static final ConcurrentHashMap<String, SimpleRateLimiter> deviceRateLimitTable = new ConcurrentHashMap<>(1024);
private static final int DEVICE_MSG_MAX_LIMIT_PER_SECOND = 10;
/**
* 消息到来时,以设备为单位对消息进行限流。单台设备的每秒钟的消息量的上限为:10
*
* @param topic
* @param message
* @throws Exception
*/
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.info("Received message topic:" + topic);
try{
MqttMessageReceiveCommon receiveCommon = JSONObject.parseObject(message.getPayload(), MqttMessageReceiveCommon.class);
if(ObjectUtils.isEmpty(receiveCommon.getDeviceId())){
return;
}
SimpleRateLimiter rateLimiter = deviceRateLimit(receiveCommon.getDeviceId());
//判断该设备的消息速率有没有达到上限
if(!rateLimiter.tryAcquire()){
//如果此设备的消息已达到上限,则忽略此消息
return;
}else {
//如果没有达到上限,则执行业务逻辑处理
//TODO handle the message
}
}catch (Exception e){
errorLogger.error("【消息接收时,发送异常: {},消息是: {}】",e.getMessage(),message.toString());
}
}
private SimpleRateLimiter deviceRateLimit(String deviceId) {
SimpleRateLimiter rateLimiter = deviceRateLimitTable.get(deviceId);
if(rateLimiter == null){
rateLimiter = SimpleRateLimiter.create(DEVICE_MSG_MAX_LIMIT_PER_SECOND, 1);
//放入
SimpleRateLimiter oldLimiter = deviceRateLimitTable.putIfAbsent(deviceId, rateLimiter);
if(oldLimiter != null){
//多创建的,必须要销毁
rateLimiter.stop();
rateLimiter = null;
return oldLimiter;
}
}
return rateLimiter;
}
}
网友评论