![](https://img.haomeiwen.com/i2145947/c96d0f5b6128cb93.png)
![](https://img.haomeiwen.com/i2145947/4b419739afdde1d7.png)
遇到的问题:
目前流行的直播行业中,产品会鼓励用户多与主播互动,而互动火热的体现,则是通过上图最右方的人气值体现。
目前常规的互动为聊天,送礼,点赞,关注,宋星星,分享。其中聊天,送礼,点赞,都是频率较高的互动。
系统要根据这些互动计算图右侧的人气值,并返回给用户,而且人气值需要持久处理。
笔者所遇到的问题是,实时计算互动,并存储时,能否降低写库的次数,以及推送IM的次数。
如果当前房间十分火热,
聊天tps500 送礼tps300 点赞tps500 我不希望我每秒会为了更新人气,更新1000多次,并且IM推送上行1000多次,IM下行是给房间里的
每一个人推送,那么次数更是不可设想。
如何减轻库的写压力,以及IM服务的压力,是这次要解决的问题。
总体图:
![](https://img.haomeiwen.com/i2145947/d2214456a6bbfc46.png)
上图是该业务的总体结构图,每个业务有自己的业务消息总线,如发言,点赞等,消息中间件采用kafka,比如我现在是需要接受消息更新人气,用户经验等,所以我定义了一个新的消费组community-esb,消费组收到各业务线的消息后,通过调用人气服务,以及经验服务来更新主播的人气值,互动发生源用户的经验值。消息body基本都是这样的,比如聊天:
{userId:<userId>,targetUserId<targetUserId>,roomId<roomId>,message<message>}
消费者会根据消息体内的详细内容进行加人气处理,比如送礼会看礼物个数,点赞会看点赞次数等,这里不再赘述。
上述规划图中,这个部分是要解决的问题
![](https://img.haomeiwen.com/i2145947/a399b357f9b24720.png)
起初想到的三种解决方式:
A,
//增加人气值,用户进入房间获取,或推送都从这里获取
Long total=Increase(AnchorPopularKey,value);
//获取锁
String timeStamp = getAnchorLockKey(anchor) 1
如果锁存在,并且未失效,则直接返回
If(value!=null && System.currentTime-value<15s){Return;}
//这里有并发问题
//锁不存在,则设置锁,并更新db 推送IM
setAnchorLockKey(anchor,random(3,12)s)
Insert DB total
sendIM total
A方式的大致思路为:缓存中存储主播的总人气值,业务需要读取时也从这里读取,人气服务计算好需要增加的人气值时,
直接调用redis increase操作,那么什么时候进行落地呢?设置另外一个ttl key,失效时间为3~10秒,当key失效时,则进行落地操作,
以及推送IM操作,如果希望前端更新快,则时间可设置短一些。
优点:这种方式网络开销相对较少,策略验证是否需要落地只用了一次网络IO
缺点:
1,因为redis存的是人气总值,而需要保证redis里的总值与数据库总值完全相同较难。而且最后落地时,是将redis的总值
直接做覆盖操作,很有可能总值之前是100,覆盖后变98
2,最后的落地操作,以及推送IM操作,存在并发问题,因为为了达到较大吞吐量,增加消费消息效率,
消费组的进程数与kafka设置的patition数量相同,当多个进程同时消费时,可能会同时进行落地操作,
以及推送IM操作,比较浪费。
B,
//增加人气值,//增加人气值,用户进入房间获取,或推送都从这里获取
Long total=Increase(AnchorPopularKey,value);
//获取锁
Boolean set = setIfAbsent(anchorLock2(anchor),System.currentTime); 1
If(set){
//获取到了锁,入库,推IM
Insert DB total;
sendIM total;
重新设置锁(3~12秒)失效
setExpire(anchorLock2(anchor),random(3,12)s);
}else{
//获取锁的设置时间
timeStamp = get(anchorLock2(anchor)); 2
//如果锁设置时间已过15秒,说明锁已经失效,之前设置失效失败,删除锁
if(System.currentTime-timeStamp>15s){
delete(anchorLock2);
}
}
方式B算是方式A的改进,为了解决并发数据落地的问题,不过这种方式,增加了一个分布式锁,多了一次网络IO消耗。
C,
TASK 60秒一次。。。或10秒一次
for(Room room:ShowingRoomList){
//获取缓存中的人气值
Popular p = popularService.get(room);
//上次推送至今,房间人气值有变化,给予推送更新
if(redis.get(room)!=p.getPopular()){
insertDb;
sendIm
redis.set(room)
}
}
C方式与A B方式完全不同,C是通过另起一个进程跑task,然后异步将人气值落地,以及推送IM,但是
还是需要消费者做redis.increase操作,只是消费者不再负责数据落地,以及推送了。
结论:
1,起初设置一个缓存总人气值是为了顺便抗读压力,不过上述几种方式经过反复推演,首先难以解决的问题就是,如果你在缓存中存储一个总值,那么你用数据库覆盖缓存可以,如果用缓存覆盖数据库,将十分危险,而且难以保证准确性。
2,A,B方式均存在很多不必要的网络开销或数据库开销,需要在两者之间做均衡。C方式另起进程,使得业务不聚合,消费组不能自己
完成任务,需要依赖另一个进程。而且,单进程存在单点问题。不过C方式相对来讲,更可控些,因为单进程,所以不存在过多的数据库开销,以及网络开销。
最后决定,弃用缓存存人气总值的方式,通过增量更新人气总值。
设计图如下:
![](https://img.haomeiwen.com/i2145947/ff5ecd26a8f7b15c.png)
代码如下:
![](https://img.haomeiwen.com/i2145947/23a87d103c2b012a.png)
![](https://img.haomeiwen.com/i2145947/66a4de50be7ea5ff.png)
这种方式的思路是,将一分钟时间分为30个时间片,线程将根据当前系统时间,将人气值累加存储到当前时间片内,
落地时则是存储上一个时间片的累加值,并且为增量更新。这样可控制一个房间内的落地以及推送IM频率,最多为30次。
通过redis的原子操作,选取leader,来进行落地,推送,失效操作,防止并发问题。同时,当房间内互动频率不高时,
则实时落地推送,增强用户体验。
当然,我们现在的业务量还不大~服务上线后监控图如下:
由图可见,互动产生累加人气操作tps在晚上比较高,可以达到90左右,而那时,实时落地的tps为2.5。也就是说房间内就算互动再频繁,
右上角的人气变化大约是2秒一变。
![](https://img.haomeiwen.com/i2145947/fd937076a2d2f429.png)
![](https://img.haomeiwen.com/i2145947/be24f774efccc6d1.png)
网友评论