一、前言
相信现在很多App都会被要求有IM功能,社交成为了必不可少的一项功能,IM的价值和重要性也就不言自明,但从技术上,IM对没有经验的开发者来说还是存在很多坑点和难点的,而接入第三方又存在成本、受限于他人等问题,所以本文旨意在打造一个通用的可配置化的IM SDK,文笔有限,如有不妥之处还请批评指正,希望对你有用。转载请注明出处https://www.jianshu.com/p/5b01f4d6e4f4
二、本文主要介绍
-
Netty
-
Protobuf
-
框架设计
-
重试机制
-
消息可靠性和消息回执
-
握手连接认证
-
心跳机制
-
离线消息
这里直接模拟两个用户聊天,还可以模拟上线、下线状态,开发者可以直接移步github, fork源码查看Github地址
Netty
什么是Netty?
Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。
Netty 是一个广泛使用的 Java 网络编程框架(Netty 在 2011 年获得了Duke's Choice Award,见https://www.java.net/dukeschoice/2011)。它活跃和成长于用户社区,像大型公司 Facebook 和 Instagram 以及流行 开源项目如 Infinispan, HornetQ, Vert.x, Apache Cassandra 和 Elasticsearch 等,都利用其强大的对于网络抽象的核心代码。
以上是摘自《Essential Netty In Action》这本书
为什么选择Netty?
Netty是业界最流行的NIO框架之一,它的健壮性、功能、性能、可定制性和可扩展性在同类框架中都是首屈一指的,它已经得到成百上千的商用项目验证,例如Hadoop的RPC框架avro使用Netty作为底层通信框架。很多其它业界主流的RPC框架,也使用Netty来构建高性能的异步通信能力。
通过对Netty的分析,我们将它的优点总结如下:
- API使用简单,开发门槛低;
- 功能强大,预置了多种编解码功能,支持多种主流协议;
- 定制能力强,可以通过ChannelHandler对通信框架进行灵活的扩展;
- 性能高,通过与其它业界主流的NIO框架对比,Netty的综合性能最优;
- 成熟、稳定,Netty修复了已经发现的所有JDK NIO BUG,业务开发人员不需要再为NIO的BUG而烦恼;
- 社区活跃,版本迭代周期短,发现的BUG可以被及时修复,同时,更多的新功能会被加入;
- 经历了大规模的商业应用考验,质量已经得到验证。在互联网、大数据、网络游戏、企业应用、电信软件等众多行业得到成功商用,证明了它可以完全满足不同行业的商业应用。
正是因为这些优点,Netty逐渐成为Java NIO编程的首选框架。
以上是摘自《Netty 权威指南》—— 选择Netty的理由
Protobuf
我们先来看看官方文档给出的定义和描述:
protocol buffers 是一种语言无关、平台无关、可扩展的序列化结构数据的方法,它可用于(数据)通信协议、数据存储等。
Protocol Buffers 是一种灵活,高效,自动化机制的结构数据序列化方法-可类比 XML,但是比 XML 更小(3 ~ 10倍)、更快(20 ~ 100倍)、更为简单。
你可以定义数据的结构,然后使用特殊生成的源代码轻松的在各种数据流中使用各种语言进行编写和读取结构数据。你甚至可以更新数据结构,而不破坏由旧数据结构编译的已部署程序。
简单来讲, ProtoBuf 是结构数据序列化[1] 方法,可简单类比于 XML[2],其具有以下特点:
- 语言无关、平台无关。即 ProtoBuf 支持 Java、C++、Python 等多种语言,支持多个平台
- 高效。即比 XML 更小(3 ~ 10倍)、更快(20 ~ 100倍)、更为简单
-
扩展性、兼容性好。你可以更新数据结构,而不影响和破坏原有的旧程序
所以采用protobuf传输是不错的选择。本文的protobuf实体设计:
message Pack{ //传输的包
enum PackType{
MSG=0;
REPLY=1;
HEART=2;
SHAKEHANDS=3;
}
PackType packType=1;//包类型
oneof body{
Msg msg=2;//聊天消息
Reply reply=3;//通用消息回执
Heart heart=4;//心跳包
ShakeHands shakeHands=5;//握手认证
}
}
关于prototbuf里 oneof 说明
如果你的消息中有很多可选字段, 并且同时至多一个字段会被设置, 你可以加强这个行为,使用oneof特性节省内存.Oneof字段就像可选字段, 除了它们会共享内存, 至多一个字段会被设置。 设置其中一个字段会清除其它字段。 你可以使用case()或者WhichOneof() 方法检查哪个oneof字段被设置, 看你使用什么语言了。本库的protobuf的版本为3.11.4,是在windows用protobuf工具生成然后再导入项目的。
框架设计
本库使用OKhttp的设计模式,是因为IM特性和OKhttp具有一定的共性, 所以本库借鉴OKhttp设计思想,来让我们看一下构造一个IMClient可以有多精简。
private IMClientDemo(DefaultMessageReceiveHandler.onMessageArriveListener onMessageArriveListener){
imClient=new IMClient.Builder()
.setCodec(new DefaultCodec()) //默认的编解码,开发者可以使用自己的protobuf编解码
.setShakeHands(getDefaultHands(),new DefaultShakeHandsHandler()) //设置握手认证,可选
.setHeartBeatMsg(getDefaultHeart()) //设置心跳,可选
.setMessageRespHandler(new DefaultMessageRespHandler()) //消息响应,开发者可自行定制实现MessageRespHandler接口即可
.setMessageReceiveHandler(new DefaultMessageReceiveHandler(onMessageArriveListener)) //客户端消息接收器
.setEventListener(new DefaultEventListener("user id1")) //事件监听,可选
.setAddress(new Address("192.168.69.32",8765,Address.Type.SOCKS))
.setAddress(new Address("www.baidu.com",8765,Address.Type.HTTP))
.build();
}
这里使用了构建者模式,可以自由装配,你可以添加自定义拦截器、自定义channelHandler,设置连接超时、发送超时、重发次数等...
Dispatcher dispatcher;
final List<Interceptor> interceptors = new ArrayList<>();
int connectTimeout;//连接超时
int sendTimeout;//发送超时,规定时间内需服务端响应
int resendCount;//消息发送失败,重发次数
boolean connectionRetryEnabled;//是否连接失败、连接重试
int heartIntervalForeground;//前台心跳间隔
int heartIntervalBackground;
boolean isBackground;
EventListener.Factory eventListenerFactory;
ConnectionPool connectionPool;
Cache cache;
Authenticator authenticator;
List<Address> addressList;
@Nullable Codec codec;
LinkedHashMap<String , ChannelHandler> customChannelHandlerLinkedHashMap;
com.google.protobuf.GeneratedMessageV3 loginAuthMsg;
com.google.protobuf.GeneratedMessageV3 heartBeatMsg;
ShakeHandsHandler shakeHandsHandler;
HeartbeatRespHandler heartbeatRespHandler;
MessageRespHandler messageRespHandler;
MessageReceiveHandler messageReceiveHandler;
核心实现在几个内置拦截器中
Response getResponseWithInterceptorChain(SubsequentCallback callback) throws IOException, InterruptedException, AuthException, SendTimeoutException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
if (client.interceptors()!=null&&client.interceptors().size()>0){
interceptors.addAll(client.interceptors());
}
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client));
// interceptors.add(new CacheInterceptor());
interceptors.add(new ConnectInterceptor(client));
interceptors.add(new CallServerInterceptor(callback));
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest,this, eventListener,client.connectTimeout(),
client.sendTimeout());
return chain.proceed(originalRequest);
}
是不是似曾相识的感觉,这里拦截器功能和okhttp雷同,retryAndFollowUpInterceptor进行连接重试、发送重试、地址切换,BridgeInterceptor主要进行数据的装配,ConnectInterceptor是真正的连接的地方,CallServerInterceptor进行数据的写读,这里的读是服务端的消息回执,完成这一套拦截器那么我们整体流程就有了,那我们怎么进行一个消息的发送呢?
IMClientDemo.getInstance().sendMsg(createRequest(appMessage), new UICallback.OnResultListener<PackProtobuf.Pack>() {
@Override
public void onFailure(IOException e) {
appMessage.msgStatus = MSG_STATUS_FAILED;
messageAdapter1.onItemChange(appMessage); //更新一条消息状态
}
@Override
public void onResponse(PackProtobuf.Pack pack) {
appMessage.msgStatus=pack.getReply().getStatusReport(); //收到服务端响应,即代表消息发送成功,更新UI
messageAdapter1.onItemChange(appMessage);
}
});
该onResponse会收到服务端的该条消息的回执,注意这里不是这条消息的回执将不会回调,那么我们是如何做到这一点呢,这里就要说到request,每一个消息发送我们都把它当做一个request来处理,以下是一个request创建的样例:
private Request createRequest(AppMessage appMessage){
Request request=new Request.Builder().
setRequestTag(appMessage.getHead().getMsgId()).
setBody(getMsgPack(appMessage.buildProto())).
build();
return request;
}
这里创建一个requestTag,如果每一个你发送的消息希望等到消息回执,那么你只要带上requestTag并且在构造IMCient时实现MessageRespHandler接口即可
image.png
request还可以做一些别的操作,例如设置request发送失败不重试(默认失败重试),设置request无需服务端响应(默认需要响应),这种设置就很符合这种场景:发送回执消息给服务端,因为回执消息是无需服务端来响应的
重试机制
为了保证IM高可用,一定要有重试机制,重试机制包括连接失败重试、切换地址、发送重试、重试的次数、超时时间等。
在该库中连接重试主要实现在retryAndFollowUpInterceptor类中,我这里截取了部分
try {
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (IOException e) {
if (request instanceof ConnectRequest){ //连接请求重试
if (!connectRecover(e,request, ++connect_retry)) {
realChain.eventListener().connectFailed(streamAllocation.currentInetSocketAddress(),e);
throw e;
}
System.out.println("连接重试 " + streamAllocation.currentInetSocketAddress().toString());
releaseConnection=false;
continue;
}
if (!sendRecover(e, request,++resendCount)){
realChain.eventListener().sendMsgFailed(realChain.call());
throw e;
}
//发送请求重试
System.out.println("发送重试");
releaseConnection=false;
continue;
} catch (InterruptedException e) {
如果连接抛了异常,先判断能不能恢复,如果能恢复且连接次数没有超出限制,即继续重试,如果连接次数到一定数量我们可以切换下一个可用地址重试,发送失败也是这样处理的,如果满足恢复条件,即发送重试,连接超时则是在netty上配置,发送超时定义为用户发送消息且没在规定时间内服务端没有给出回执,我们则认定此条消息发送失败的也要进行重试。
消息可靠性和消息回执
如果要保证消息的可靠完整性,那么一定就要有确认机制,有人可能会有疑问,TCP/IP不就是可靠性协议嘛,有消息重传,且保证包有序。但是协议的可靠不一定代表应用层的可靠,例如如下场景:A客户端给B客户端发送消息,消息先到达服务端,而此时服务端出现异常,此时TCP/IP协议肯定是认为此条消息已经送达到服务端了,只是服务端处理出现了异常,客户端A就以为发送成功了,但是A客户端此条消息真实情况应该是发送失败的,那么这样就会造成消息丢失。如果我们使用了回执机制,还是同样的场景,当A客户端发送消息给服务端,此时服务端出现异常,当A客户端一段时间内没收到服务端的消息回执即认定为发送失败,A客户端会重发此条消息,同样的道理当服务端发给客户端的消息没有得到响应时,也会重发消息,这样一套确定机制即可保证消息的完整,举一反三,既然我们可以发送确认消息回执,那么我们也可以做一些别的类型的消息回执,例如:消息已读、消息撤回、消息已送达等(这里已送达的概念和已发送是不一样的,已送达代表消息已经到了接收方那,而已发送是代表消息成功发送到服务端)。本文在消息回执设计在protobuf里的是这样的
message Reply{ //消息回执
int32 replyType=1;//回复类型
string msgId=2;//对应的消息ID
string userId=3;//用户ID
int32 statusReport=4;//状态
}
握手连接认证
为了防止一些不明的连接,服务端应该对客户端发起的连接进行身份认证,当客户端和服务端建立连接后,应该立刻发送一个握手连接认证,服务端对握手连接进行认证,如果身份认证成功则将该连接放入内存中,并且发送回执给客户端,否立发送失败回执给客户端并且关闭连接。以下是本库的默认握手实现,开发者可自行替换自己的实现,实现接口即可。
/**
* 默认握手实现
*/
public class DefaultShakeHandsHandler implements ShakeHandsHandler<PackProtobuf.Pack> {
public static final int SHAKE_HANDS_REPLY_TYPE=0x12;
public static final int SHAKE_HANDS_STATUS_SUCCESS=1;
public static final int SHAKE_HANDS_STATUS_FAILED=0;
@Override
public boolean isShakeHands(Object msg) {
PackProtobuf.Pack pack= (PackProtobuf.Pack) msg;
return pack.getPackType()==PackProtobuf.Pack.PackType.REPLY//包类型是回执包且回执类型是握手回执
&&pack.getReply().getReplyType()==SHAKE_HANDS_REPLY_TYPE;
}
@Override
public boolean isShakeHandsOk(PackProtobuf.Pack pack) {
if (pack.getReply().getStatusReport()== SHAKE_HANDS_STATUS_SUCCESS ){
return true;
}else {
return false;
}
}
}
心跳机制
心跳包
作用:其实主要是为了防止NAT超时。其次是探测连接是否断开。
心跳包和轮询的区别
心跳包和轮询看起来类似, 都是客户端主动联系服务器, 但是区别很大:
(1)轮询是为了获取数据, 而心跳是为了保活TCP连接。
(2)轮询得越频繁, 获取数据就越及时, 心跳的频繁与否和数据是否及时没有直接关系
(3)轮询比心跳能耗更高, 因为一次轮询需要经过TCP三次握手, 四次挥手, 单次心跳不需要建立和拆除TCP连接。
NAT耗时
国内移动无线网络运营商在链路上一段时间内没有数据通讯后, 会淘汰NAT表中的对应项, 造成链路中断。
image心跳保活
心跳一般是指某端(绝大多数情况下是客户端)每隔一定时间向对端发送自定义指令,以判断双方是否存活,因其按照一定间隔发送,类似于心跳,故被称为心跳指令
以上采摘于https://www.jianshu.com/p/3cdd52626a1b
以下是本库的心跳实现:
// 3次心跳时间内没得到服务端响应,即可代表连接已断开
channel.pipeline().addFirst(IdleStateHandler.class.getSimpleName(), new IdleStateHandler(
heartbeatInterval * 3, heartbeatInterval, 0, TimeUnit.MILLISECONDS));
// 重新添加HeartbeatHandler
if (channel.pipeline().get(HeartbeatChannelHandler.class.getSimpleName()) != null) {
channel.pipeline().remove(HeartbeatChannelHandler.class.getSimpleName());
}
if (channel.pipeline().get(IdleStateHandler.class.getSimpleName()) != null) {
channel.pipeline().addLast(HeartbeatChannelHandler.class.getSimpleName(),
new HeartbeatChannelHandler(connectionPool,heartBeatMsg,connectionBrokenListener));
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
if (heartbeatMsg==null){
return;
}
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
switch (state) {
case READER_IDLE: {
connectionBrokenListener.connectionBroken();
break;
}
case WRITER_IDLE: {
// 规定时间内没向服务端发送心跳包,则马上发送一个心跳包
if (heartbeatTask == null) {
heartbeatTask = new HeartbeatTask(ctx);
}
connectionPool.execWorkTask(heartbeatTask);
break;
}
}
}
}
离线消息
离线消息这个算服务端的内容,服务端根据判断消息接收方是否在线,如果在线则直接发送,如果不在线,缓存起来或者入库都可以,等接受方上线且认证了立即把离线消息发送给客户端。客户端要考虑的是消息缓存的问题,例如在网络不佳或者突然断网的情况下,发送的消息失败是否需要在连接状态变好的时候重新发送,这时就应该考虑消息入库,本SDK没有具体实现这个部分,但是细想该SDK可以动态添加拦截器,是否可以添加一个消息缓存的拦截器,每次发送的消息经过缓存拦截器先进行入库,得到响应后经过拦截器再更新消息状态。服务端demo有简易的离线消息处理,这里也贴下代码吧
PackProtobuf.Msg message=pack.getMsg();
System.out.println("收到发送方客户端发送过来的消息:"+message.toString());
ChannelContainer.getInstance().getChannelByUserId(message.getHead().getFromId())//回给发送端消息回执已经发送
.writeAndFlush(createMsgReply(message.getHead().getFromId(),message.getHead().getMsgId(),MSG_REPLY_TYPE, NettyServerDemo.MSG_STATUS_SEND));
if (ChannelContainer.getInstance().isOnline(message.getHead().getToId())){ //如果接受发在线
ChannelContainer.getInstance().getChannelByUserId(message.getHead().getToId()) //转发给接受端
.writeAndFlush(pack);
}else { //如果对方离线,缓存起来,等用户上线立马发送
putOffLienMessage(message.getHead().getToId(),pack);
}
三、写在最后
本人文笔着实不好,写文章真的很让人头疼,hhhh!还不如写代码舒心,在此还要感谢FreddyChen提供的netty精简库与部分思路。
但本着开源精神,竟然库写了就应该有一个介绍文章吧,所以就有了这篇文章。欢迎大家指出问题、提交issue或者有bug私聊我都可以哈,如果您觉得此篇文章对你有帮助,欢迎给个star哈!Github地址
网友评论