本文主要讲CFSocket框架,WebSocket读者自行阅读。
另外:项目中的代码被改成了DOLP前缀,与MQTT源码基本一致。
MQTT功能概括:socket链接,SSL证书配置,发送数据,接收数据,重连机制等。
![](https://img.haomeiwen.com/i1387554/cc3f27557df12503.png)
SSL证书配置
//BLDolpServer
- (void)createClient:(NSString *)host port:(int32_t)port callback:(void (^)(BOOL, NSError *))callback {
BLLogInfo(@"【socket】连接host:%@ | port:%@",host,@(port));
// 先获取证书, 此处 DOLPCertificate.bundle 与 pod spec中定义一致
NSString *bundlePath = [[NSBundle bundleForClass:[self class]].resourcePath stringByAppendingPathComponent:@"/DOLPCertificate.bundle"];
NSBundle *bundle = [NSBundle bundleWithPath:bundlePath];
NSString *caPath = [bundle pathForResource:@"ca_chain" ofType:@"der"];
// 设置tls认证
DOLPSSLSecurityPolicy *securityPolicy = [DOLPSSLSecurityPolicy policyWithPinningMode:SSLPinningModeCertificate];
securityPolicy.allowInvalidCertificates = YES;
securityPolicy.validatesDomainName = NO;
securityPolicy.validatesCertificateChain = NO;
securityPolicy.pinnedCertificates = @[[NSData dataWithContentsOfFile:caPath]];
DolpBaseData *connData = [DolpProtoDataUtil createWithConnection:self.token deviceId:self.deviceId];
NSString *clientId = [DolpDeviceUtil deviceId];
self.sessionManager = [[DOLPSessionManager alloc] initWithQueue:self.dolpCodecQueue];
[self.sessionManager connectTo:host port:port tls:YES keepalive:DOLP_KEEP_ALIVE clean:YES payload:[connData data] withClientId:clientId securityPolicy:securityPolicy certificates:nil protocolLevel:DOLPProtocolVersion10 connectHandler:^(NSError *error) {
if (error) {
callback(false, error);
BLLogError(@"【socket】服务器连接失败 %@ | host:%@ | port:%@", error,host,@(port));
} else {
callback(true, nil);
BLLogInfo(@"【socket】服务器连接成功host:%@ | port:%@",host,@(port));
}
}];
self.sessionManager.delegate = self;
}
读取Bundle中的cer路径,配置DOLPSSLSecurityPolicy属性。
//DOLPSessionManager
- (void)connectToInternal:(DolpConnectHandler)connectHandler {
if (self.session && self.state == SessionManagerStateStarting) {
[self updateState:SessionManagerStateConnecting];
DOLPCFSocketTransport *transport;
if (self.securityPolicy) {
transport = [[DOLPSSLSecurityPolicyTransport alloc] init];
((DOLPSSLSecurityPolicyTransport *) transport).securityPolicy = self.securityPolicy;
} else {
transport = [[DOLPCFSocketTransport alloc] init];
}
transport.host = self.host;
transport.port = self.port;
transport.tls = self.tls;
transport.certificates = self.certificates;
transport.voip = self.session.voip;
transport.queue = self.queue;
transport.streamSSLLevel = self.streamSSLLevel;
self.session.transport = transport;
[self.session connectWithConnectHandler:connectHandler];
}
}
证书传递给DOLPSSLSecurityPolicyTransport类。
socket链接
//DOLPSession
- (void)connectWithConnectHandler:(DolpConnectHandler)connectHandler {
// BLLogVerbose(@"[DOLPSession] connectWithConnectHandler:%p", connectHandler);
self.connectHandler = connectHandler;
[self connect];
}
- (void)connect {
...
// note.fitz 移除MQTT5 协议的检测
if (self.cleanSessionFlag) {
[self.persistence deleteAllFlowsForClientId:self.clientId];
[self.publishHandlers removeAllObjects];
}
[self tell];
self.status = DOLPSessionStatusConnecting;
self.decoder = [[DOLPDecoder alloc] init];
self.decoder.queue = self.queue;
self.decoder.delegate = self;
[self.decoder open];
self.transport.delegate = self;
[self.transport open];
}
- 省略号部分处理已知的错误
- [self.transport open]建立链接
//DOLPSSLSecurityPolicyTransport
- (void)open {
BLLogVerbose(@"[DOLPSSLSecurityPolicyTransport] open");
self.state = DOLPTransportOpening;
NSError* connectError;
CFReadStreamRef readStream;
CFWriteStreamRef writeStream;
CFStreamCreatePairWithSocketToHost(NULL, (__bridge CFStringRef)self.host, self.port, &readStream, &writeStream);
CFReadStreamSetProperty(readStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanTrue);
CFWriteStreamSetProperty(writeStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanTrue);
if (self.tls) {
NSMutableDictionary *sslOptions = [[NSMutableDictionary alloc] init];
// Delegate certificates verify operation to our secure policy.
// by disabling chain validation, it becomes our responsibility to verify that the host at the other end can be trusted.
// the server's certificates will be verified during MQTT encoder/decoder processing.
sslOptions[(NSString *)kCFStreamSSLValidatesCertificateChain] = @NO;
sslOptions[(NSString *)kCFStreamSSLLevel] = self.streamSSLLevel;
if (self.certificates) {
sslOptions[(NSString *)kCFStreamSSLCertificates] = self.certificates;
}
if (!CFReadStreamSetProperty(readStream, kCFStreamPropertySSLSettings, (__bridge CFDictionaryRef)(sslOptions))){
connectError = [NSError errorWithDomain:@"DOLP"
code:errSSLInternal
userInfo:@{NSLocalizedDescriptionKey : @"Fail to init ssl input stream!"}];
}
if (!CFWriteStreamSetProperty(writeStream, kCFStreamPropertySSLSettings, (__bridge CFDictionaryRef)(sslOptions))){
connectError = [NSError errorWithDomain:@"DOLP"
code:errSSLInternal
userInfo:@{NSLocalizedDescriptionKey : @"Fail to init ssl output stream!"}];
}
}
if (!connectError) {
self.encoder = [[DOLPSSLSecurityPolicyEncoder alloc] init];
CFWriteStreamSetDispatchQueue(writeStream, self.queue);
self.encoder.stream = CFBridgingRelease(writeStream);
self.encoder.securityPolicy = self.tls ? self.securityPolicy : nil;
self.encoder.securityDomain = self.tls ? self.host : nil;
self.encoder.delegate = self;
if (self.voip) {
[self.encoder.stream setProperty:NSStreamNetworkServiceTypeVoIP forKey:NSStreamNetworkServiceType];
}
[self.encoder open];
self.decoder = [[DOLPSSLSecurityPolicyDecoder alloc] init];
CFReadStreamSetDispatchQueue(readStream, self.queue);
self.decoder.stream = CFBridgingRelease(readStream);
self.decoder.securityPolicy = self.tls ? self.securityPolicy : nil;
self.decoder.securityDomain = self.tls ? self.host : nil;
self.decoder.delegate = self;
if (self.voip) {
[self.decoder.stream setProperty:NSStreamNetworkServiceTypeVoIP forKey:NSStreamNetworkServiceType];
}
[self.decoder open];
} else {
[self close];
}
}
- CFStreamCreatePairWithSocketToHost:Creates readable and writable streams connected to a TCP/IP port of a particular host.这是官方文档的一段描述。MQTT是基于TCP/IP的链接。
- CFReadStreamRef readStream;接受数据时解析解码,通过CFBridgingRelease(readStream)转换成NSInputStream,数据解析式响应对应的代理方法。
- CFWriteStreamRef writeStream;发送数据时进行编码,与readStream同理。
小结:mqtt底层使用的是CFSocket进行链接。
问题:链接超时由系统控制的,大约在60s以上。
建立链接如果成功,会响应DOLPCFSocketEncoder,stream:handleEvent:方法
- (void)open {
(self.stream).delegate = self;
[self.stream open];
dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(10 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
[self connectTimedOut];
});
}
//连接时:如果是ip地址不可用,依赖NSOutStream回调错误时间太长,检查如果state状态一直是初始状态,认为socket没有连接成功
- (void)connectTimedOut {
if (!self.connectResponse){
self.state = DOLPCFSocketEncoderStateError;
[self.delegate encoder:self didFailWithError:[NSError errorWithDomain:@"Socket没连接上" code:400 userInfo:nil]];
NSLog(@"socket TCP没连上");
}else{
NSLog(@"socket TCP连上了");
}
}
- (void)stream:(NSStream *)sender handleEvent:(NSStreamEvent)eventCode {
_connectResponse = YES;
}
在open时,设置超时操作,如果stream:handleEvent:还没有响应可抛给上层,进行重连操作。
发送数据
//BLDolpServer
- (void)send:(id)message callback:(BLIdResultBlock)callbackBlock {
DOLPSessionManager *manager = self.sessionManager;
...
dispatch_async(self.sendMessageQueue, ^{
// 将要发送的数据编码为DOLP的 protocolBuffer 格式
GPBMessage *payload = [DolpActionRouteDispatcher encodeWithSend:actionType data:message];
DolpBaseData *baseData = [DolpProtoDataUtil initWithAction:actionType payload:payload];
NSData *sendData = [baseData data];
[manager sendData:sendData topic:DOLP_DEFAULT_TOPIC qos:QosLevelAtMostOnce retain:false];
DolpRequestTransmit *request = [DolpRequestTransmit requestWithCallback:callbackBlock message:baseData];
[self.requestCache setObject:request forKey:baseData.id_p];
});
}
项目中是PB+MQTT,对象会先转成CPBMessage再经由DOLPSessionManager对象进行发送。
//DOLPSessionManager
- (UInt16)sendData:(NSData *)data topic:(NSString *)topic qos:(QosLevel)qos retain:(BOOL)retainFlag{
if (self.state != SessionManagerStateConnected) {
[self connectToLast:nil];
}
UInt16 msgId = [self.session publishData:data onTopic:topic retain:retainFlag qos:qos];
return msgId;
}
发送时会判断state,如果没有连接,先进行重连,再发送数据。
发送时除了data,还有topic和qos两个重要属性。
qos是mqtt对消息机制的处理。这里使用的QosLevelAtMostOnce
topic是订阅的主题。
//DOLPSession
- (UInt16)publishData:(NSData *)data onTopic:(NSString *)topic retain:(BOOL)retainFlag qos:(QosLevel)qos publishHandler:(DolpPublishHandler)publishHandler {
...异常处理
UInt16 msgId = 0;
if (!qos) {
DOLPMessage *msg = [DOLPMessage publishMessageWithData:data onTopic:topic qos:qos msgId:msgId dupFlag:FALSE];
NSError *error = nil;
if (![self encode:msg]) {
error = [NSError errorWithDomain:DOLPSessionErrorDomain code:DOLPSessionErrorEncoderNotReady userInfo:@{NSLocalizedDescriptionKey: @"Encoder not ready"}];
}
if (publishHandler) {
[self onPublish:publishHandler error:error];
}
} else {
...
}
[self tell];
return msgId;
}
publishMessageWithData构建DOLPMessage对象。
[self encode:msg]
//DOLPMessage
+ (DOLPMessage *)publishMessageWithData:(NSData *)payload onTopic:(NSString *)topic qos:(QosLevel)qosLevel msgId:(UInt16)msgId dupFlag:(BOOL)dup {
NSMutableData *data = [[NSMutableData alloc] init];
[data appendMQTTString:topic];
if (msgId) [data appendUInt16BigEndian:msgId];
// note.fitz 移除MQTT5的代码
[data appendData:payload];
DOLPMessage *msg = [[DOLPMessage alloc] initWithType:Publish qos:qosLevel dupFlag:dup data:data];
msg.mid = msgId;
return msg;
}
data追加topic,追加payload,生成msg对象。
[self encode:msg]
//DOLPSession
- (BOOL)encode:(DOLPMessage *)message {
if (message) {
NSData *wireFormat = message.wireFormat;
if (wireFormat) {
if (self.delegate) {
if ([self.delegate respondsToSelector:@selector(sending:type:qos:retained:duped:mid:data:)]) {
[self.delegate sending:self type:message.type qos:message.qos retained:message.retainFlag duped:message.dupFlag mid:message.mid data:message.data];
}
}
// BLLogVerbose(@"[DOLPSession] mqttTransport send");
return [self.transport send:wireFormat];
} else {
BLLogError(@"[DOLPSession] trying to send message without wire format");
return false;
}
} else {
BLLogError(@"[DOLPSession] trying to send nil message");
return false;
}
}
//DOLPMessage
- (NSData *)wireFormat {
NSMutableData *buffer = [[NSMutableData alloc] init];
// 固定报头 控制报文的类型+标志位
UInt8 header;
// 向右移位运算符(>>)把值的位向右移动。从值的低位移出的位将丢失。把无符号的值向右移位总是左侧(就是高位)移入0。
header = (self.type & 0x0f) << 4;
if (self.dupFlag) {
// 等同 header = header | 0x08
header |= 0x08;
}
header |= (self.qos & 0x03) << 1;
[buffer appendBytes:&header length:1];
[buffer appendVariableLength:self.data.length];
// encode message data
if (self.data != nil) {
[buffer appendData:self.data];
}
// BLLogVerbose(@"[DOLPMessage] wireFormat(%lu)=%@", (unsigned long) buffer.length, buffer);
return buffer;
}
拼接报文头:type,dupFlag,qos等再追加data。
得到调用wireFormat通过[self.transport send:wireFormat],最后调用到DOLPCFSocketEncoder send:方法
- (BOOL)send:(NSData *)data {
@synchronized(self) {
if (self.state != DOLPCFSocketEncoderStateReady) {
return NO;
}
if (data) {
[self.buffer appendData:data];
}
if (self.buffer.length) {
NSInteger n = [self.stream write:self.buffer.bytes maxLength:self.buffer.length];
if (n == -1) {
self.state = DOLPCFSocketEncoderStateError;
self.error = self.stream.streamError;
return NO;
} else {
[self.buffer replaceBytesInRange:NSMakeRange(0, n) withBytes:NULL length:0];
}
}
return YES;
}
}
NSOutputStream *stream在socket中连接时与CFWriteStreamRef writeStream;绑定,经由系统的[stream write:maxLength]发送数据。
接收数据
发送数据是由上层逐渐调用系统方法,而接收数据则是由系统方法逐渐调到上层。
//DOLPCFSocketDecoder
- (void)stream:(NSStream *)sender handleEvent:(NSStreamEvent)eventCode {
if (eventCode & NSStreamEventOpenCompleted) {
BLLogVerbose(@"[DOLPCFSocketDecoder] NSStreamEventOpenCompleted");
self.state = DOLPCFSocketDecoderStateReady;
[self.delegate decoderDidOpen:self];
}
if (eventCode & NSStreamEventHasBytesAvailable) {
if (self.state == DOLPCFSocketDecoderStateInitializing) {
self.state = DOLPCFSocketDecoderStateReady;
}
if (self.state == DOLPCFSocketDecoderStateReady) {
UInt8 buffer[768];
self.startTimestamp = [[NSDate date] timeIntervalSince1970];
NSInteger n = [self.stream read:buffer maxLength:sizeof(buffer)];
self.endTimestamp = [[NSDate date] timeIntervalSince1970];
if (n == -1) {
self.state = DOLPCFSocketDecoderStateError;
[self.delegate decoder:self didFailWithError:nil];
} else {
NSData *data = [NSData dataWithBytes:buffer length:n];
[self.delegate decoder:self didReceiveMessage:data];
}
}
}
if (eventCode & NSStreamEventHasSpaceAvailable) {
BLLogVerbose(@"[DOLPCFSocketDecoder] NSStreamEventHasSpaceAvailable");
}
if (eventCode & NSStreamEventEndEncountered) {
self.state = DOLPCFSocketDecoderStateInitializing;
self.error = nil;
[self.delegate decoderdidClose:self];
}
if (eventCode & NSStreamEventErrorOccurred) {
self.state = DOLPCFSocketDecoderStateError;
self.error = self.stream.streamError;
[self.delegate decoder:self didFailWithError:self.error];
}
}
- 服务端发送数据首先会回调DOLPCFSocketDecoder 中的[stream:handleEvent:]方法,
- DOLPCFSocketDecoder中的NSInputStream *stream在socket链接时与CFBridgingRelease(readStream)绑定。
- [stream:handleEvent:]会实时接收eventCode状态,回调给上层。
问题:快速发送socket请求会造成[self.stream read:buffer maxLength:sizeof(buffer)]阻塞。
- (void)readBufferTimeOut{
NSTimeInterval timeDifference = self.endTimestamp-self.startTimestamp;
if (timeDifference<0 && self.delegate) {
[[BLDolpServer sharedServer].sessionManager.session keepAlive];
}
}
此方法阻塞是由于系统原因导致的,可通过发送心跳打破这种平衡。才有了上述的读取超时发送心跳机制。此处timer与上层检验socket超时timer共用一个。
[self.delegate decoder:self didReceiveMessage:data];通过delege调用到DOLPCFSocketTransport中的[decoder: didReceiveMessage:],再回调到DOLPSession中的[dolpTransport:didReceiveMessage:]
//DOLPSession
- (void)dolpTransport:(nonnull id <DOLPTransport>)mqttTransport didReceiveMessage:(nonnull NSData *)message {
[self.decoder decodeMessage:message];
}
[decodeMessage:]调用到DOLPDecoder中的[decodeMessage:]
继而响应了[stream:handleEvent:]
//DOLPDecoder
- (void)decodeMessage:(NSData *)data {
NSInputStream *stream = [NSInputStream inputStreamWithData:data];
CFReadStreamRef readStream = (__bridge CFReadStreamRef)stream;
CFReadStreamSetDispatchQueue(readStream, self.queue);
[self openStream:stream];
}
- (void)stream:(NSStream *)sender handleEvent:(NSStreamEvent)eventCode {
...
if (self.dataBuffer.length == self.length + self.offset) {
[self.delegate decoder:self didReceiveMessage:self.dataBuffer];
self.dataBuffer = nil;
self.state = DecoderStateDecodingHeader;
}
...
}
self.dataBuffer.length == self.length + self.offset:判断数据完整接收,调用上层DOLPSession的[decoder:didReceiveMessage:]方法
//DOLPSession
- (void)decoder:(DOLPDecoder *)sender didReceiveMessage:(NSData *)data {
DOLPMessage *message = [DOLPMessage messageFromData:data];
...
@synchronized (sender) {
...
switch (self.status) {
...
case DOLPSessionStatusConnected:
switch (message.type) {
case Publish:
[self handlePublish:message];
break;
case Puback:
[self handlePuback:message];
break;
case Pubrec:
[self handlePubrec:message];
break;
case Pubrel:
[self handlePubrel:message];
break;
case Pubcomp:
[self handlePubcomp:message];
break;
case Suback:
[self handleSuback:message];
break;
case Unsuback:
[self handleUnsuback:message];
break;
case Disconnect: {
NSError *error = [NSError errorWithDomain:DOLPSessionErrorDomain
code:(message.returnCode).intValue
userInfo:@{NSLocalizedDescriptionKey: @"MQTT protocol DISCONNECT received"}];
[self protocolError:error];
}
default:
break;
}
break;
default:
BLLogError(@"[DOLPSession] other state");
break;
}
}
}
- (void)handlePublish:(DOLPMessage *)msg {
NSData *data = msg.data;
if (data.length < 2) {
return;
}
UInt8 const *bytes = data.bytes;
UInt16 topicLength = 256 * bytes[0] + bytes[1];
if (data.length < 2 + topicLength) {
return;
}
NSData *topicData = [data subdataWithRange:NSMakeRange(2, topicLength)];
NSString *topic = [[NSString alloc] initWithData:topicData
encoding:NSUTF8StringEncoding];
if (!topic) {
topic = [[NSString alloc] initWithData:topicData
encoding:NSISOLatin1StringEncoding];
BLLogError(@"non UTF8 topic %@", topic);
}
NSRange range = NSMakeRange(2 + topicLength, data.length - topicLength - 2);
data = [data subdataWithRange:range];
if (msg.qos == 0) {
...
} else {
if (data.length >= 2) {
bytes = data.bytes;
UInt16 msgId = 256 * bytes[0] + bytes[1];
msg.mid = msgId;
data = [data subdataWithRange:NSMakeRange(2, data.length - 2)];
if (msg.qos == 1) {
// note.fitz 移除MQTT5的代码
BOOL processed = true;
if ([self.delegate respondsToSelector:@selector(newMessage:data:onTopic:qos:retained:mid:)]) {
[self.delegate newMessage:self data:data onTopic:topic qos:msg.qos retained:msg.retainFlag mid:msgId];
}
if ([self.delegate respondsToSelector:@selector(newMessageWithFeedback:data:onTopic:qos:retained:mid:)]) {
processed = [self.delegate newMessageWithFeedback:self data:data onTopic:topic qos:msg.qos retained:msg.retainFlag mid:msgId];
}
if (self.messageHandler) {
self.messageHandler(data, topic);
}
if (processed) {
(void) [self encode:[DOLPMessage pubackMessageWithMessageId:msgId]];
}
return;
} else {
...
}
}
}
}
处理socket已连接的Publish消息进入handlePublish,继而调用下面的方法:
//DOLPSessionManager
- (void)newMessage:(DOLPSession *)session data:(NSData *)data onTopic:(NSString *)topic qos:(QosLevel)qos retained:(BOOL)retained mid:(unsigned int)mid {
if (self.delegate) {
if ([self.delegate respondsToSelector:@selector(sessionManager:didReceiveMessage:onTopic:retained:)]) {
[self.delegate sessionManager:self didReceiveMessage:data onTopic:topic retained:retained];
}
if ([self.delegate respondsToSelector:@selector(handleMessage:onTopic:retained:)]) {
[self.delegate handleMessage:data onTopic:topic retained:retained];
}
}
}
下面是业务处理代码:
//BLDolpServer
// 接收数据
- (void)handleMessage:(NSData *)data onTopic:(NSString *)topic retained:(BOOL)retained {
NSError *error = nil;
DolpBaseData *baseData = [DolpBaseData parseFromData:data error:&error];
if (error) {
BLLogError(@"转换数据发生错误%@", error.description);
return;
}
///通过相应的数据解码器,将返回的proto数据解码为对应的DTO
enum DolpActionType actionType = baseData.action;
dispatch_async(self.messageHandlerQueue, ^{
DolpDelegateResponse *serverData = [DolpActionRouteDispatcher decodeForServerData:actionType data:baseData];
if (self.logRecord) {
BLLogInfo(@"DOLP Start-------收到服务端消息 %i \n%@\nDOLP END ------", actionType, [serverData.data modelToJSONString]);
}
DolpRequestTransmit *request = self.requestCache[baseData.id_p];
if (request) {
request.isCallback = YES;
BLIdResultBlock rCallback = request.callback;
if (serverData.code != 0) {
NSError *respError = [BLErrorHelper errorWithCode:serverData.code reason:@""];
// 暂时回到主线程调用回调处理
dispatch_main_async_safe(^{
BLBlockExec(rCallback, serverData, respError);
})
// 回调完成后,清除信息
[self removeRequestCache:baseData.id_p];
return;
}
// 暂时回到主线程调用回调处理
dispatch_async(dispatch_get_main_queue(), ^{
BLBlockExec(rCallback, serverData, nil);
});
// 回调完成后,清除信息
[self removeRequestCache:baseData.id_p];
return;
}
//处理业务代码
});
}
小结
socket证书配置,socket连接,发送数据编码,接收数据解码返回上层处理的大致流程。
心跳
//DOLPSession
- (void)decoder:(DOLPDecoder *)sender didReceiveMessage:(NSData *)data {
...
@synchronized (sender) {
...
switch (self.status) {
case DOLPSessionStatusConnecting:
switch (message.type) {
case Connack:
// note.fitz 移除MQTT5的代码
if (message.returnCode && (message.returnCode).intValue == DOLPSuccess) {
...
if (message.properties) {
self.serverKeepAlive = message.properties.serverKeepAlive;
}
if (self.serverKeepAlive) {
self.effectiveKeepAlive = (self.serverKeepAlive).unsignedShortValue;
} else {
self.effectiveKeepAlive = self.keepAliveInterval;
}
if (self.effectiveKeepAlive > 0) {
self.keepAliveTimer = [GCDTimer scheduledTimerWithTimeInterval:self.effectiveKeepAlive
repeats:YES
queue:self.queue
block:^() {
[weakSelf keepAlive];
}];
}
...
}
}
- 心跳在socket连接确认后即Connack,开启心跳。
- 心跳间隔以服务端返回优先级最高,否则以传入参数为准。
NSTimer优化
心跳起个timer,间隔自定义;超时起个timer,每秒一次,检测发送请求是否超时;解决上述快速发送消息卡顿起个timer。
起一个timer,统一处理上述业务。超时timer间隔最小,在要求不是很精确的情况下,带一个变量就能解决timer共用问题。
超时重连
//MQTTSessionManager
- (MQTTSessionManager *)initWithPersistence:(BOOL)persistent
maxWindowSize:(NSUInteger)maxWindowSize
maxMessages:(NSUInteger)maxMessages
maxSize:(NSUInteger)maxSize
maxConnectionRetryInterval:(NSTimeInterval)maxRetryInterval
connectInForeground:(BOOL)connectInForeground
streamSSLLevel:(NSString *)streamSSLLevel
queue:(dispatch_queue_t)queue {
...
__weak MQTTSessionManager *weakSelf = self;
self.reconnectTimer = [[ReconnectTimer alloc] initWithRetryInterval:RECONNECT_TIMER
maxRetryInterval:maxRetryInterval
queue:self.queue
reconnectBlock:^{
[weakSelf reconnect:nil];
}];
#if TARGET_OS_IPHONE == 1
if (connectInForeground) {
self.foregroundReconnection = [[ForegroundReconnection alloc] initWithMQTTSessionManager:self];
}
#endif
...
}
初始化时会检测开启重连timer,进行重连。
前后台处理
//ForegroundReconnection
- (void)appWillResignActive {
[self.sessionManager disconnectWithDisconnectHandler:nil];
}
- (void)appDidBecomeActive {
[self.sessionManager connectToLast:nil];
[self endBackgroundTask];
}
进入后台会断开socket,在进入前台会重新检测链接。
业务迭代
服务端通过lbs接口返回了两个ip,如果第一个无效则使用第二个进行链接。打破了mqtt原来的重连机制。我们在mqtt上也解决了文中提到的问题,socket超时如何自定义事件等。也添加了两个ip问题。
MQTT小结
MQTT使用什么通讯?快速点击时,主线程卡顿?服务端返回两个ip,超时时间修改?
- MQTT传输过程
- 建立链接SocketTransport使用的是CFStream框架。CFReadStreamRef、CFWriteStreamRef代理给NSOutputStream、NSInputStream。CFStreamCreatePairWithSocketToHost(基于TCP/IP建立链接)建立socket链接
- 加密 SSL证书
- 读取CA证书,DOLPSSLSecurityPolicy认证,CFReadStreamSetProperty设置对应的属性
- CFReadStreamSetProperty(readStream, kCFStreamPropertySSLSettings, (__bridge CFDictionaryRef) (sslOptions))配置输出流加密
- CFWriteStreamSetProperty(writeStream, kCFStreamPropertySSLSettings, (__bridge CFDictionaryRef) (sslOptions))配置输入流加密
- 发送消息编码SocketEncoder使用的NSOutputStream.
- 接收消息解码SocketDecoder使用的NSInputStream。
- 消息质量怎么保证?
没有使用MQTT的QOS,使用了最多一次,消息丢失靠消息融合算法与socket推送来保证。
- 超时怎么设置的
lbs接口返回了多个地址,第一个不可用,使用第二个
- 心跳是怎么做的,如何高效的设置心跳?
正常的做法起timer,固定时间发心跳。
智能心跳
- 如何解决消息重复
应答机制,基于TCP/IP。
完全自已开发的IM该如何设计“失败重试”机制?
- 上层做重发,例如可以失败后第一次10秒重发,再失败30秒,再失败60秒,再失败提示用户。
- 失败直接提示用户,让用户主动去重发。
- 简单介绍
- 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合
- 使用 TCP/IP 提供网络连接
- 有三种消息发布服务质量
- 客户端如果互相通信,必须在同一订阅主题下,即都订阅了同一个topic。
网友评论