1. 私信整体架构
在线消息下发流程如下:
attach_166cb75a42945105 (1).png
-
当业务系统需要向客户端推送数据时,通过网关提供的 HTTP 接口将数据发向java server。
-
java server在接收到私信请求后,风控后,将数据保存至数据库,判断是否在线,在线将消息写入 redis。
-
消息拉取分发的流程 ,上图其实省略了。
-
go im server 作为消费者 ,节点接收到消息后判断推送的消息目标是否在自己内存中维护的长连接队列里,如果存在则通过长连接推送数据.
-
消息确认回执发给im,im收到了就不会再进行重试了 ,不然三次重试失败,im会推到未读队列消费。
-
客户端与go im server 任一节点握手建立起长连接,节点将其加入到内存维护的长连接队列。客户端定时向服务端发送心跳消息,如果超过设定的时间仍没有收到心跳,则认为客户端与服务端的长连接已断开,服务端会关闭连接,清理内存中的会话。
2. 消息回执
-
确认消息回执(无需处理)
消息到客户端后 -> 通过websocket 返回act消息 -> 回执发给im,im收到了就不会再进行重试了
-
已读消息回执:
打开app -> 拉取会话列表 以及 拉取未读数量 ->点击会话 -> 拉取消息列表 并调已读消息回执(标记消息已读)未读消息数其实和会话绑定在一起的 。议数据库中,更新会话最新记录时,做自加操作 。而在已读回执中,更新index至最新的msqid。 同时修改未读消息数量至(msgid - index )
总未读消息= select sum(unreadnum) from conversation_msg where msg_to =“接收方id”
3. 离线消息
- 整个流程并没有像别的im强调离线消息 。因为我们的消息以已读和未读为划分,即便是在线消息未读也是通过拉取未读消息列表获取。那么是否强调离线消息也不是那么有必要。
- 那么数据库设计这块也没有必要强调离线消息这件事情了 。 已读成为设计的重点。我不希望在变更已读或者未读状态,大量的修改消息状态。 因此做了会话索引表的设计.
- im三次重试失败,im会推到未读队列消费,未读队列消费入库流程,如果im不做线 (app在后台,被杀掉,以及网络问题),收不到在线消息时候,可以通过推送系统推送消息激活app 。
4. 消息类型
- 私信(私信,匿名私信)
- 系统通知(通知,关注红点,被关注,评论,回复通知,点赞通知,新动态)
- 系统通知 通知 比如您获取了一个红包 (无消息回执的过程)
- 被关注红点 (有红点,弹窗,有消息回执过程)
- 关注直播关注红点 (同上 ,点后消失)
- 发布帖子,帖子通过审核后,刷新 ,会有新动态 (无红点)
- 点赞,评论,回复评论都会以消息形式下发在消息tab页 显示(有回执)
- 群消息(暂时无)
直播(和群聊的还是差别很大的)
- 直播 目前获取直播间当前 参与的人群 ,在走直接推送的流程 ,但直播间用户量 超大 直接推 因为1:N,会产生巨量的消息 。只怕是有可能会影响整个推送系统。个人对于在线人数超过 1000的 还是走tqmq 或者 消息队列 ,批量压缩消息下推 。也许会比较好。在线人数少的,还是走直推就可以了。
5. 内容类型
- 普通文字消息
- 图片消息
- 语音消息
- 视频消息
***上面4种私信类型需要调用第三方风控接口鉴定
图片,语音一般先推送文件服务器 ,完毕,推送消息 。文字应该是风控(脚本注入,sql注入,违禁词之类的) , 图片 , 语音 ,视频 内容这块应该会有单独的审核。 *** - 直播礼物 (无需审核)
6. 消息全局唯一id
-
全局唯一id = 会话id(全局唯一id) + 客户端序列号id(客户端生成时间戳) + 服务端序列号id(redis 自增)
- 如果客户端可以给我序列号,我其实是可以在返回拉取消息列表时候 ,再排序,给他严格顺序的把控的。
- 全局唯一id 有利于监控消息的生命周期。调用链监控等等。
- 会话id ,服务端序列号id 可以是会话维度的消息序列号。用于优化查询未读列表,以及未读数量。
7. 会话
- 由于我们系统只需要做到消息的已读回执。
user A -> user B , 对于user B ,会话中存储的是user A发送的所有user B未读的消息。
user B -> user A , 对于user A ,会话中存储的是user B发送的所有user A未读的消息。
会话可以通过全局唯一id ,和发送方id ,接受方id 的映射关系来表示。
未读消息数其实和会话绑定在一起的 。议数据库中,更新会话最新记录时,做自加操作 。而在已读回执中,更新index至最新的msqid。 同时修改未读消息数量至(msgid - index )(消息如果丢失 ,客户端拿到消息数有可能会小于msgid - index)
总未读消息= select sum(unreadnum) from conversation_msg where msg_to =“接收方id”
8. 私聊流程
attach_166b77e42236dd9e.png- 消息通过http到达java server
- 主线程创建3个异步任务 ,3方风控task, 我方风控 和在线,msq 入库和创建会话,记录任务完成时间。
- all task is ok , 进行redis 单推 。 如果不在线,或者验证不通过 返回响应的返回。
9. 直播流程(目前不用考虑)
attach_166b917e663b9f7e.png- 用户消息通过http到达java server,礼物无需风控 ,主播消息无需风控。
- 判断直播间在线人数如果超过500 或者1000人 , 或者开启服务降级 ,走tqmq ,压缩消息批量下发
- 少于1000人的直播间 ,msq批量入库,批量推送redis
10. 消息存储选型
-
redis : 读写性能很高 ,单点压测读写都能到10w左右 ,集群压测能到很夸张的一个等级。缺点:阿里云价格贵。
-
polardb : 并发读的能力高,写能力略弱 但是 阿里云的顶配写tps 尽然高达5w tps 。也真是非常夸张了,当然价格7w 每月也是超高。
-
hbase:并发读写能力很高,缺点:不是关系型数据库,必须依赖于主键key的前缀匹配查询才可以做到高效的读。公司研发同学对hbase熟悉程度也不是很高(这个其实不算是hbase的缺点)。
新的app ,一开始应该是流量b不大 ,polardb 作为离线消息存储,是一个合适的选择 。如果说,写的tps已经接近4w-5w上限了 ,可以考虑hbase去替换,如果到达hbase上限了,则建议redis,做热缓存。一开始建议polarDB。如果某些大主播入驻,有可能快速打满polardb的情况建议tqmq 削峰平谷。
11. 数据库设计
数据存储方案,以用户维度作为拆分
conversation_msg 会话消息表 (32个表 ,以接收方id取模)
字段名 | 描述 | 类型 | 约束 |
---|---|---|---|
conversation_id | 会话id | bigint | 唯一约束 |
msg_to | 接收方id | bigint | |
msg_from | 发送方id | bigint | |
sender_nickname | 发送方昵称 | varchar | |
msg_id | 会话的最新的msgid | int | |
msg_img | 会话展示图片 红点地址 | varchar | |
msg_content | msg content | varchar | |
client_seq | 客户端 时间戳 | int | |
unread_num | 未读消息数 | int | |
ext_field1 | 扩展字段1 会话 更多展示的内容 | varchar | |
ext_field2 | 扩展字段2 | varchar | |
is_del | 是否删除 | boolean | |
create_time | 创建时间 | datetime |
conversation_index 会话已读, (32个表 ,以接收方id取模)
字段名 | 描述 | 类型 | 约束 |
---|---|---|---|
conversation_id | 会话id | bigint | 主键 |
read_msg_id | 已读的消息id | int | |
create_time | 创建时间 | datetime | |
ext_index1 | 扩展index1 | bigint | |
ext_index2 | 扩展index2 | bigint | |
ext_field1 | 扩展字段1 | varchar |
msg_record 消息记录表 (32个表 ,以接收方id取模)
字段名 | 描述 | 类型 | 约束 |
---|---|---|---|
conversation_id | 会话id | bigint | 联合主键 |
msg_id | 消息id | int | 联合主键 |
msg_content | 消息内容 | varchar | |
msg_from | 发送方id | bigint | |
msg_to | 接收方id | bigint | |
msg_seq | 客户端发送消息时带上的序列号 | int | |
content_type | content 类型 | int | |
push_time | msg push time | datetime | |
read_time | 回执 read time | datetime | |
ext_field1 | 扩展字段1 | varchar | |
ext_field2 | 扩展字段2 | varchar | |
ext_field3 | 扩展字段3 | varchar | |
ext_field4 | 扩展字段4 | varchar | |
create_time | 创建时间 | datetime |
system_msg 系统消息表作为消息记录表的扩展表(32个表 ,以接收方id取模)
字段名 | 描述 | 类型 | 约束 |
---|---|---|---|
conversation_id | 会话id | bigint | 联合主键 |
msg_id | 消息id | int | 联合主键 |
action | 红包点击跳转 | varchar | |
file | 红包图片地址 or 文件地址 or 语音地址 or 视频 | varchar | |
content | 保存title 或者富文本消息 | varchar | |
ext_content1 | 扩展文字1 | varchar | |
ext_content2 | 扩展文字2 | varchar | |
ext_file1 | 扩展文件字段1 | varchar | |
ext_file2 | 扩展文件字段2 | varchar | |
ext_field1 | 扩展字段1 | varchar | |
ext_field2 | 扩展字段2 | varchar | |
create_time | 创建时间 | datetime |
12. 推入redis
- redis pubsub,没有通过tqmq,使用的pubsub 模型。
- redis pubsub优点很明显,快 ,但不能保证消息至少一次消费。同样不会被持久化。
- Redis Streams ,kafka ,tqmq 其实是一类。 都是消息队列。目前Redis Streams ,kafka 支持多消费组消费。tqmq是在redis 上,封装了一层。而Redis Streams 也同样实现了与tqmq相同的功能。kafka 相对redis stream而言,便宜吧,且可以和大数据团队快速对接。也是一个优点。
以简单为前提,还是以redis pubsub 为基础吧。 后面如果有变化 ,其实可以迁移到 Redis Streams ,tqmq,或者kafka
13. 监控
监控指标 通过埋点统计收集im指标 。如果要了解消息的整个生命周期,必须要有一全局唯一id 。而我们java server 只是私信系统的一部分。只有部分指标是可以监控的
- 业务指标
- 消息已读率
- 离线消息推送 (根据离线消息队列埋点)
- 在线消息推送 (redis 埋点 - 离线消息推送)
- 系统性能指标
- 在线消息平均推送时间,最大推送时间 (每分钟维度)
- 离线消息平均推送时间,最大推送时间 (每分钟维度)
- 任务队列囤积请求数 (当前tomcat任务队列囤积请求数)
直播消息平均推送时间,最大推送时间- 消息入库平均时间,已读消息回执平均时间。(每分钟维度)
- 监控各个接口实际调用量(每分钟维度)
14. 告警
- 任务队列囤积请求数上升到某一指标时,告警,需要快速扩容。
- 数据库因为某些原因造成ops 大幅下降,消息入库平均时间,已读消息回执平均时间直线上升。需要及时告警.
- redis 由于hotkey 问题,导致ops 大幅下降,这个时候 ,在线消息平均推送时间,最大推送时间都会上升,需要告警。
- 防止恶意接口调用。
15. 限流 ,熔断,降级
~~1. 特殊场景,入住了某个热门主播,导致流量放大到我们平台数据库极限的时候。这个时候,私信功能会受到其影响的时候 。必要情况,可以在tqmq 做一些特殊降级处理
~~
- 如果redis 出现了hotkey ,cpu 高达100%或者消费端出现了问题,导致内存已达上限。 那么这时候必要情况也要做一些削峰平谷,缓存消息。通过开关控制。
- 如果瞬间 tomcat任务队列囤积了大量的请求。有可能要内纯溢出了。这时候也可以做一些处理。发送私信,可以在线变未读。直接客户端拉取。已读回执请求直接走tqmq,获取未读消息数,可以暂时用缓存兜底数据等。(告警,自动切换开关。囤积大量消息下降到警戒线下后)
16. 一个无状态的系统才是真正的好系统
- 无状态的系统,可以快速的横向扩展。
- java server 本身除了tomcat 请求队列外 ,并没有缓存任何请求信息 。
- 消息如果先入 tqmq , 再被各个pod 消费 。即便某pod 内存溢出后,也可以不丢失消息。但是这样子会损失一些实时性。 这点还需讨论哈
- 数据库这块 java 有shardingjdbc ,客户端路由请求指定数据分片。 ks 好像也行。
17. 网络模型
【spring-webmvc + Servlet + Tomcat】命令式的、同步阻塞的。
【spring-webflux + Reactor + Netty】响应式的、异步非阻塞的。
盗一下spring的图 , 从技术选型上来来说响应式编程+R2DBC 更适合当前使用场景,但,soa不能提供响应式支持,且团队中没人用过,上手成本比较大。但未来肯定是趋势。所以还是选择了spring-webflux 。
java 线程模型最大的问题 ,就是线程池被慢sql或者下游服务耗尽了怎么办。其他业务场景也只能添加到任务队列中等待执行。 而java响应式编程就不存在这种问题,即便下游服务超时,也不会阻塞上游服务。也不会影响其他服务。
[图片上传失败...(image-1cfc18-1627628486913)]
18. 对于所有业务场景做好压测
- 我希望看到对于所有业务场景的压测数据。这些数据可以告知单个节点,或者整个集群,他能够承载一个怎样的负载。我可以依据现实情况,伸缩扩展节点
网友评论