美文网首页
大规模业务数据延迟落地,以及实时IM消息通知

大规模业务数据延迟落地,以及实时IM消息通知

作者: 一品悟技术_张驰 | 来源:发表于2016-10-08 14:46 被阅读332次

遇到的问题:

目前流行的直播行业中,产品会鼓励用户多与主播互动,而互动火热的体现,则是通过上图最右方的人气值体现。

目前常规的互动为聊天,送礼,点赞,关注,宋星星,分享。其中聊天,送礼,点赞,都是频率较高的互动。

系统要根据这些互动计算图右侧的人气值,并返回给用户,而且人气值需要持久处理。

笔者所遇到的问题是,实时计算互动,并存储时,能否降低写库的次数,以及推送IM的次数。

如果当前房间十分火热,

聊天tps500  送礼tps300 点赞tps500 我不希望我每秒会为了更新人气,更新1000多次,并且IM推送上行1000多次,IM下行是给房间里的

每一个人推送,那么次数更是不可设想。

如何减轻库的写压力,以及IM服务的压力,是这次要解决的问题。

总体图:

上图是该业务的总体结构图,每个业务有自己的业务消息总线,如发言,点赞等,消息中间件采用kafka,比如我现在是需要接受消息更新人气,用户经验等,所以我定义了一个新的消费组community-esb,消费组收到各业务线的消息后,通过调用人气服务,以及经验服务来更新主播的人气值,互动发生源用户的经验值。消息body基本都是这样的,比如聊天:

{userId:<userId>,targetUserId<targetUserId>,roomId<roomId>,message<message>}

消费者会根据消息体内的详细内容进行加人气处理,比如送礼会看礼物个数,点赞会看点赞次数等,这里不再赘述。

上述规划图中,这个部分是要解决的问题

起初想到的三种解决方式:

A,

//增加人气值,用户进入房间获取,或推送都从这里获取

Long total=Increase(AnchorPopularKey,value);

//获取锁

String timeStamp = getAnchorLockKey(anchor)  1

如果锁存在,并且未失效,则直接返回

If(value!=null && System.currentTime-value<15s){Return;}

//这里有并发问题

//锁不存在,则设置锁,并更新db 推送IM

setAnchorLockKey(anchor,random(3,12)s)

Insert DB total

sendIM total

A方式的大致思路为:缓存中存储主播的总人气值,业务需要读取时也从这里读取,人气服务计算好需要增加的人气值时,

直接调用redis increase操作,那么什么时候进行落地呢?设置另外一个ttl key,失效时间为3~10秒,当key失效时,则进行落地操作,

以及推送IM操作,如果希望前端更新快,则时间可设置短一些。

优点:这种方式网络开销相对较少,策略验证是否需要落地只用了一次网络IO

缺点:

        1,因为redis存的是人气总值,而需要保证redis里的总值与数据库总值完全相同较难。而且最后落地时,是将redis的总值

              直接做覆盖操作,很有可能总值之前是100,覆盖后变98

         2,最后的落地操作,以及推送IM操作,存在并发问题,因为为了达到较大吞吐量,增加消费消息效率,

               消费组的进程数与kafka设置的patition数量相同,当多个进程同时消费时,可能会同时进行落地操作,

               以及推送IM操作,比较浪费。

B,

//增加人气值,//增加人气值,用户进入房间获取,或推送都从这里获取

Long total=Increase(AnchorPopularKey,value);

//获取锁

Boolean set = setIfAbsent(anchorLock2(anchor),System.currentTime);  1

If(set){

//获取到了锁,入库,推IM

Insert DB total;

sendIM total;

重新设置锁(3~12秒)失效

setExpire(anchorLock2(anchor),random(3,12)s);

}else{

//获取锁的设置时间

timeStamp = get(anchorLock2(anchor));  2

//如果锁设置时间已过15秒,说明锁已经失效,之前设置失效失败,删除锁

if(System.currentTime-timeStamp>15s){

delete(anchorLock2);

}

}

方式B算是方式A的改进,为了解决并发数据落地的问题,不过这种方式,增加了一个分布式锁,多了一次网络IO消耗。

C,

TASK 60秒一次。。。或10秒一次

for(Room room:ShowingRoomList){

//获取缓存中的人气值

Popular p = popularService.get(room);

//上次推送至今,房间人气值有变化,给予推送更新

if(redis.get(room)!=p.getPopular()){

insertDb;

sendIm

redis.set(room)

}

}

C方式与A B方式完全不同,C是通过另起一个进程跑task,然后异步将人气值落地,以及推送IM,但是

还是需要消费者做redis.increase操作,只是消费者不再负责数据落地,以及推送了。

结论:

1,起初设置一个缓存总人气值是为了顺便抗读压力,不过上述几种方式经过反复推演,首先难以解决的问题就是,如果你在缓存中存储一个总值,那么你用数据库覆盖缓存可以,如果用缓存覆盖数据库,将十分危险,而且难以保证准确性。

2,A,B方式均存在很多不必要的网络开销或数据库开销,需要在两者之间做均衡。C方式另起进程,使得业务不聚合,消费组不能自己

完成任务,需要依赖另一个进程。而且,单进程存在单点问题。不过C方式相对来讲,更可控些,因为单进程,所以不存在过多的数据库开销,以及网络开销。

最后决定,弃用缓存存人气总值的方式,通过增量更新人气总值。

设计图如下:

代码如下:

这种方式的思路是,将一分钟时间分为30个时间片,线程将根据当前系统时间,将人气值累加存储到当前时间片内,

落地时则是存储上一个时间片的累加值,并且为增量更新。这样可控制一个房间内的落地以及推送IM频率,最多为30次。

通过redis的原子操作,选取leader,来进行落地,推送,失效操作,防止并发问题。同时,当房间内互动频率不高时,

则实时落地推送,增强用户体验。

当然,我们现在的业务量还不大~服务上线后监控图如下:

由图可见,互动产生累加人气操作tps在晚上比较高,可以达到90左右,而那时,实时落地的tps为2.5。也就是说房间内就算互动再频繁,

右上角的人气变化大约是2秒一变。

相关文章

网友评论

      本文标题:大规模业务数据延迟落地,以及实时IM消息通知

      本文链接:https://www.haomeiwen.com/subject/jxpuyttx.html