该文基于开源项目分析,总结了IM相关的一些知识点,如何实现,以及针对客服业务需要补充的几个点。
开源系统使用netty+websocket/socket搭建IM系统,前端实现了jsp和layui,服务端内容较完整,前端可根据自己实际情况搭建。
感谢开源项目的贡献。地址:
https://gitee.com/qiqiim/qiqiim-server
IM服务
1.网络协议
传输层
tcp
面向连接的、可靠的、基于字节流的传输层通信协议,keepalive 机制、ack机制保障连接和消息的可靠性。
应用层
websocket
在TCP连接上进行全双工通信的协议,允许服务端主动向客户端推送数据。适用于IM即时通讯。
http
短连接,业务操作类接口。
2.数据传输格式
protobuf,适用于高并发场景下的消息传输
使用场景:
用户A发送消息时,前端通过protobuf序列化消息,将其send到服务端,服务端接收到后反序列化消息,处理完成后再次序列化消息,发送给客服B,接收到消息时同样也要反序列化消息展示。
ps:客服聊天系统中用到的消息类型为绑定、心跳、普通消息,不同场景下发送的消息类型不同。
项目中用到的消息格式:
消息包 Message.proto,其中content为下面的消息内容:
syntax = "proto3";
package com.enniu.cloud.services.customerIm.common.model.proto;
option java_outer_classname="MessageProto";
message Model {
string version = 1;//接口版本号
string deviceId = 2;//设备uuid
uint32 cmd = 3;//请求接口命令字 1绑定 2心跳 3上线 4下线 5消息
string sender = 4;//发送人
string receiver = 5;//接收人
string groupId =6;//用户组编号(暂时可忽略)
uint32 msgtype = 7;//请求1,应答2,通知3,响应4 format
uint32 flag = 8;//1 rsa加密 2aes加密
string platform = 9;//mobile-ios mobile-android pc-windows pc-mac
string platformVersion = 10;//客户端版本号
string token = 11;//客户端凭证
string appKey = 12;//客户端key
string timeStamp = 13;//时间戳
string sign = 14;//签名
bytes content = 15;//请求数据
}
消息内容 MessageBody.proto:
syntax = "proto3";
package com.enniu.cloud.services.customerIm.common.model.proto;
option java_outer_classname="MessageBodyProto";
message MessageBody {
string title = 1; //标题
string content = 2;//内容
string time = 3;//发送时间
uint32 type = 4;//0 文字 1 文件
string extend = 5;//扩展字段
}
开发可以自定义proto格式(上面.proto那种格式),然后通过protoc命令生成对应的java文件,protoc安装方式:http://google.github.io/proto-lens/installing-protoc.html
前端js库:https://github.com/protocolbuffers/protobuf/tree/master/js
3.连接可靠性
实现心跳保活
websocket受到nginx缺省为60秒的proxy_read_timeout的影响,超过时间没有发送任何消息,连接会自动断开。
解决办法:服务端在连接没有消息传输后,到达一定时间后发送心跳包,客户端收到心跳包回一个响应包,如果心跳发送一定时间后还未收到响应,则关闭连接。
netty使用IdleStateHandler处理,设置readIdleTime(读超时时间)和writeIdleTime(写超时时间),当读超时触发后发送心跳包到客户端(浏览器),客户端(浏览器)收到心跳包后回复一个心跳回应包(需要前端监听类型为心跳包的消息,收到后发送心跳回应);如果服务端心跳请求发出后一定时间内未收到回复,可断开连接。
服务端超时触发的代码:
/**
* 超时触发此方法
*
* @param ctx
* @param o
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object o) throws Exception {
// 服务端发个心跳包,客户端要回一个才行(需要前端实现)
if (o instanceof IdleStateEvent && ((IdleStateEvent) o).state().equals(IdleState.WRITER_IDLE)) {
if (StringUtils.isNotEmpty(sessionId)) {
MessageProto.Model.Builder builder = MessageProto.Model.newBuilder();
builder.setCmd(NettyConstants.CmdType.HEARTBEAT);//心跳包
builder.setMsgtype(NettyConstants.ProtobufType.SEND);
ctx.channel().writeAndFlush(builder);
}
}
//如果心跳请求发出70秒内没收到响应,则关闭连接
if (o instanceof IdleStateEvent && ((IdleStateEvent) o).state().equals(IdleState.READER_IDLE)) {
//服务端收到上一次的心跳响应后会设置这个响应时间
Long lastTime = (Long) ctx.channel().attr(NettyConstants.SessionConfig.SERVER_SESSION_HEARBEAT).get();
if (lastTime == null || ((System.currentTimeMillis() - lastTime) / 1000 >= 70)) {
connertor.close(ctx);
}
}
}
前端心跳响应:
socket.onmessage = function(event) {
//后端发送的是二进制帧,protobuf反序列化
var msg = proto.Model.deserializeBinary(event.data);
//心跳消息
if(msg.getCmd()==2){//对应服务端的NettyConstants.CmdType.HEARTBEAT
//发送心跳回应
var message1 = new proto.Model();
message1.setCmd(2);
message1.setMsgtype(4);
socket.send(message1.serializeBinary());
}else {
//...
}
};
断线重连
当网络情况不稳定,或者用户从移动网切换到无线网等场景下,长连接会断开。需要前端监听websocket的onclose事件,当连接断开后重新创建连接。
socket.onclose = function(event) {
//重新创建websocket连接
};
4.消息可靠性
1.基于TCP,传输层已经保证了消息可靠性
2.应用层消息可靠性,实现Ack消息机制(待补充)
5.安全
ssl,http协议升级为https,对应的ws协议升级为wss
注意:当升级ssl后,前端通过"ws://"开头的url创建不了连接,需要修改成wss;并且nginx增加对应配置。
6.负载均衡
nginx
nginx应用层负载,支持websocket,原因是websocket在创建长连接之前,会通过一次http握手升级。
lvs(待研究)
抗负载能力强,工作在网络四层,仅作流量分发,几乎可以对所有应用做负载均衡。
7.netty服务
服务端由netty搭建:
1.基于nio,支持高并发,可维持大数量的长连接
2.本身支持websocket协议,自带websocket的处理器,方便开发
im聊天实现方式:
用户/客服和服务端之间的连接是netty中的channel,所有聊天的消息写入到channel中,当A给B发送消息后,ChannelInboundHandler从A和netty服务端连接的channel中读取到数据,然后解析消息获取消息的接收者B,再将消息写入B和服务端连接的channel。反之亦然。
8.数据库
1.mysql消息持久化
2.消息较多,需要考虑分表分库
9.缓存
用户进入聊天页面时,是可以和机器人或者人工客服聊天的;默认是机器人聊天,当用户输入“人工”时,切换成人工客服聊天,此时需要在缓存中保存用户的会话状态,来区别用户发送的消息是触达机器人还是客服。
考虑到集群部署,可选择在redis中维护会话状态以及客服的在线状态。
|
业务实现
1.app端开始聊天
1.用户进入聊天页面,new WebSocket,创建和服务端端的长连接。
2.前端监听连接到成功事件,并给连接的服务端发送一条消息,该消息包括用户的信息,消息类型是“绑定”
3.服务端判断此用户的会话状态,如果处于人工客服中,将缓存中的会话id取出,通过会话id查询出消息历史,发送给前端;如果不处于人工客服,给用户发送欢迎语。
4.用户发送消息,消息类型是“普通”,服务端接收到消息,判断用户会话状态。
image.pnga.用户不处于人工会话状态,且用户没有发送“人工”二字,此时调用机器人服务,将消息发送给机器人,得到的结果写到用户的channel中,结果发送给前端。
b.用户不处于人工会话状态,但用户发送“人工”二字,通过redis中客服在线状态,获取空闲客服,和用户之间创建绑定关系,关系存入redis中。(需要考虑队列排队,等待空闲客服的场景)
c.用户处于人工会话状态,直接将消息发送给绑定的客服。
关闭会话状态
分为两种,客服主动关闭,会话超时关闭。
客服主动关闭
1.客服主动发起关闭会话请求,该请求协议为http/https。前端设置聊天框无法输入,消息无法点击发送。
2.在redis中删除用户和客服绑定关系,删除当前客服的聊天用户列表中的对应用户。
3.给用户推送一条满意度调查消息。(如果不希望重复发送满意度调查,可以维护满意度调查的发送次数)
4.用户可选择填写满意度,满意度调查为http/https接口,后续场景有如下情况:
a.离开聊天页面,会断开连接,心跳响应超时服务端会将用户从redis的登录人员集中删除。
b.留在当前页面,继续聊天则会路由到机器人回复。
image.png
超时关闭
设置超时关闭时长,系统自动删除用户会话状态
定时遍历客服聊天用户集合,获取到用户最近一次聊天的时间戳(可以将用户最近一次聊天的时间戳放入redis的有序集合中,设置member为userId,时间戳为score值),当超过超时时间后,关闭会话状态,流程和客服主动关闭相同。
消息推送
push客服发送的消息给用户,类似淘宝,在别的app中可以收到店小二推送的消息。
总结
针对于项目实际业务场景,还是有很多地方需要完善。比如所有客服繁忙时,用户需要在队列中排队等待;比如如何实现应用层的ack机制保证消息不丢失。
该文需要补充的地方还有很多,才能完成真实的业务场景。欢迎大家查缺补漏。
网友评论