美文网首页
MQTT源码阅读及问题分析

MQTT源码阅读及问题分析

作者: 某非著名程序员 | 来源:发表于2021-05-21 09:30 被阅读0次

本文主要讲CFSocket框架,WebSocket读者自行阅读。
另外:项目中的代码被改成了DOLP前缀,与MQTT源码基本一致。

MQTT功能概括:socket链接,SSL证书配置,发送数据,接收数据,重连机制等。

类结构图.png

SSL证书配置

//BLDolpServer
- (void)createClient:(NSString *)host port:(int32_t)port callback:(void (^)(BOOL, NSError *))callback {
    
    BLLogInfo(@"【socket】连接host:%@ | port:%@",host,@(port));
    
    // 先获取证书, 此处 DOLPCertificate.bundle 与 pod spec中定义一致
    NSString *bundlePath = [[NSBundle bundleForClass:[self class]].resourcePath stringByAppendingPathComponent:@"/DOLPCertificate.bundle"];
    NSBundle *bundle = [NSBundle bundleWithPath:bundlePath];
    NSString *caPath = [bundle pathForResource:@"ca_chain" ofType:@"der"];

    // 设置tls认证
    DOLPSSLSecurityPolicy *securityPolicy = [DOLPSSLSecurityPolicy policyWithPinningMode:SSLPinningModeCertificate];
    securityPolicy.allowInvalidCertificates = YES;
    securityPolicy.validatesDomainName = NO;
    securityPolicy.validatesCertificateChain = NO;
    securityPolicy.pinnedCertificates = @[[NSData dataWithContentsOfFile:caPath]];

    DolpBaseData *connData = [DolpProtoDataUtil createWithConnection:self.token deviceId:self.deviceId];
    NSString *clientId = [DolpDeviceUtil deviceId];
    self.sessionManager = [[DOLPSessionManager alloc] initWithQueue:self.dolpCodecQueue];
    
    [self.sessionManager connectTo:host port:port tls:YES keepalive:DOLP_KEEP_ALIVE clean:YES payload:[connData data] withClientId:clientId securityPolicy:securityPolicy certificates:nil protocolLevel:DOLPProtocolVersion10 connectHandler:^(NSError *error) {
        if (error) {
            callback(false, error);
            BLLogError(@"【socket】服务器连接失败 %@ | host:%@ | port:%@", error,host,@(port));
        } else {
            callback(true, nil);
            BLLogInfo(@"【socket】服务器连接成功host:%@ | port:%@",host,@(port));
        }
    }];
    self.sessionManager.delegate = self;
}

读取Bundle中的cer路径,配置DOLPSSLSecurityPolicy属性。

//DOLPSessionManager
- (void)connectToInternal:(DolpConnectHandler)connectHandler {
    if (self.session && self.state == SessionManagerStateStarting) {
        [self updateState:SessionManagerStateConnecting];
        DOLPCFSocketTransport *transport;
        if (self.securityPolicy) {
            transport = [[DOLPSSLSecurityPolicyTransport alloc] init];
            ((DOLPSSLSecurityPolicyTransport *) transport).securityPolicy = self.securityPolicy;
        } else {
            transport = [[DOLPCFSocketTransport alloc] init];
        }
        transport.host = self.host;
        transport.port = self.port;
        transport.tls = self.tls;
        transport.certificates = self.certificates;
        transport.voip = self.session.voip;
        transport.queue = self.queue;
        transport.streamSSLLevel = self.streamSSLLevel;
        self.session.transport = transport;
        [self.session connectWithConnectHandler:connectHandler];
    }
}

证书传递给DOLPSSLSecurityPolicyTransport类。

socket链接

//DOLPSession
- (void)connectWithConnectHandler:(DolpConnectHandler)connectHandler {
//    BLLogVerbose(@"[DOLPSession] connectWithConnectHandler:%p", connectHandler);
    self.connectHandler = connectHandler;
    [self connect];
}

- (void)connect {
    ...
    // note.fitz 移除MQTT5 协议的检测

    if (self.cleanSessionFlag) {
        [self.persistence deleteAllFlowsForClientId:self.clientId];
        [self.publishHandlers removeAllObjects];
    }
    [self tell];

    self.status = DOLPSessionStatusConnecting;

    self.decoder = [[DOLPDecoder alloc] init];
    self.decoder.queue = self.queue;
    self.decoder.delegate = self;
    [self.decoder open];

    self.transport.delegate = self;
    [self.transport open];
}
  1. 省略号部分处理已知的错误
  2. [self.transport open]建立链接
//DOLPSSLSecurityPolicyTransport
- (void)open {
    BLLogVerbose(@"[DOLPSSLSecurityPolicyTransport] open");
    self.state = DOLPTransportOpening;

    NSError* connectError;

    CFReadStreamRef readStream;
    CFWriteStreamRef writeStream;

    CFStreamCreatePairWithSocketToHost(NULL, (__bridge CFStringRef)self.host, self.port, &readStream, &writeStream);
    
    CFReadStreamSetProperty(readStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanTrue);
    CFWriteStreamSetProperty(writeStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanTrue);

    if (self.tls) {
        NSMutableDictionary *sslOptions = [[NSMutableDictionary alloc] init];
        
        // Delegate certificates verify operation to our secure policy.
        // by disabling chain validation, it becomes our responsibility to verify that the host at the other end can be trusted.
        // the server's certificates will be verified during MQTT encoder/decoder processing.
        sslOptions[(NSString *)kCFStreamSSLValidatesCertificateChain] = @NO;
        sslOptions[(NSString *)kCFStreamSSLLevel] = self.streamSSLLevel;

        if (self.certificates) {
            sslOptions[(NSString *)kCFStreamSSLCertificates] = self.certificates;
        }
        
        if (!CFReadStreamSetProperty(readStream, kCFStreamPropertySSLSettings, (__bridge CFDictionaryRef)(sslOptions))){
            connectError = [NSError errorWithDomain:@"DOLP"
                                               code:errSSLInternal
                                           userInfo:@{NSLocalizedDescriptionKey : @"Fail to init ssl input stream!"}];
        }
        if (!CFWriteStreamSetProperty(writeStream, kCFStreamPropertySSLSettings, (__bridge CFDictionaryRef)(sslOptions))){
            connectError = [NSError errorWithDomain:@"DOLP"
                                               code:errSSLInternal
                                           userInfo:@{NSLocalizedDescriptionKey : @"Fail to init ssl output stream!"}];
        }
    }
    
    if (!connectError) {
        self.encoder = [[DOLPSSLSecurityPolicyEncoder alloc] init];
        CFWriteStreamSetDispatchQueue(writeStream, self.queue);
        self.encoder.stream = CFBridgingRelease(writeStream);
        self.encoder.securityPolicy = self.tls ? self.securityPolicy : nil;
        self.encoder.securityDomain = self.tls ? self.host : nil;
        self.encoder.delegate = self;
        if (self.voip) {
            [self.encoder.stream setProperty:NSStreamNetworkServiceTypeVoIP forKey:NSStreamNetworkServiceType];
        }
        [self.encoder open];
        
        self.decoder = [[DOLPSSLSecurityPolicyDecoder alloc] init];
        CFReadStreamSetDispatchQueue(readStream, self.queue);
        self.decoder.stream =  CFBridgingRelease(readStream);
        self.decoder.securityPolicy = self.tls ? self.securityPolicy : nil;
        self.decoder.securityDomain = self.tls ? self.host : nil;
        self.decoder.delegate = self;
        if (self.voip) {
            [self.decoder.stream setProperty:NSStreamNetworkServiceTypeVoIP forKey:NSStreamNetworkServiceType];
        }
        [self.decoder open];
        
    } else {
        [self close];
    }
}
  1. CFStreamCreatePairWithSocketToHost:Creates readable and writable streams connected to a TCP/IP port of a particular host.这是官方文档的一段描述。MQTT是基于TCP/IP的链接。
  2. CFReadStreamRef readStream;接受数据时解析解码,通过CFBridgingRelease(readStream)转换成NSInputStream,数据解析式响应对应的代理方法。
  3. CFWriteStreamRef writeStream;发送数据时进行编码,与readStream同理。

小结:mqtt底层使用的是CFSocket进行链接。

问题:链接超时由系统控制的,大约在60s以上。

建立链接如果成功,会响应DOLPCFSocketEncoder,stream:handleEvent:方法

- (void)open {
    (self.stream).delegate = self;
    [self.stream open];
    
    dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(10 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^{
        [self connectTimedOut];
    });
}

//连接时:如果是ip地址不可用,依赖NSOutStream回调错误时间太长,检查如果state状态一直是初始状态,认为socket没有连接成功
- (void)connectTimedOut {
    if (!self.connectResponse){
        self.state = DOLPCFSocketEncoderStateError;
        [self.delegate encoder:self didFailWithError:[NSError errorWithDomain:@"Socket没连接上" code:400 userInfo:nil]];
        NSLog(@"socket TCP没连上");
    }else{
        NSLog(@"socket TCP连上了");
    }
}

- (void)stream:(NSStream *)sender handleEvent:(NSStreamEvent)eventCode {
    _connectResponse = YES;
}

在open时,设置超时操作,如果stream:handleEvent:还没有响应可抛给上层,进行重连操作。

发送数据

//BLDolpServer
- (void)send:(id)message callback:(BLIdResultBlock)callbackBlock {
    DOLPSessionManager *manager = self.sessionManager;
    ...

    dispatch_async(self.sendMessageQueue, ^{
        // 将要发送的数据编码为DOLP的 protocolBuffer 格式
        GPBMessage *payload = [DolpActionRouteDispatcher encodeWithSend:actionType data:message];
        DolpBaseData *baseData = [DolpProtoDataUtil initWithAction:actionType payload:payload];
        NSData *sendData = [baseData data];
        [manager sendData:sendData topic:DOLP_DEFAULT_TOPIC qos:QosLevelAtMostOnce retain:false];
    
        DolpRequestTransmit *request = [DolpRequestTransmit requestWithCallback:callbackBlock message:baseData];
        [self.requestCache setObject:request forKey:baseData.id_p];
    });
}

项目中是PB+MQTT,对象会先转成CPBMessage再经由DOLPSessionManager对象进行发送。

//DOLPSessionManager
- (UInt16)sendData:(NSData *)data topic:(NSString *)topic qos:(QosLevel)qos retain:(BOOL)retainFlag{
    if (self.state != SessionManagerStateConnected) {
        [self connectToLast:nil];
    }
    UInt16 msgId = [self.session publishData:data onTopic:topic retain:retainFlag qos:qos];
    return msgId;
}

发送时会判断state,如果没有连接,先进行重连,再发送数据。
发送时除了data,还有topic和qos两个重要属性。
qos是mqtt对消息机制的处理。这里使用的QosLevelAtMostOnce
topic是订阅的主题。

//DOLPSession
- (UInt16)publishData:(NSData *)data onTopic:(NSString *)topic retain:(BOOL)retainFlag qos:(QosLevel)qos publishHandler:(DolpPublishHandler)publishHandler {

    ...异常处理

    UInt16 msgId = 0;
    if (!qos) {
        DOLPMessage *msg = [DOLPMessage publishMessageWithData:data onTopic:topic qos:qos msgId:msgId dupFlag:FALSE];
        NSError *error = nil;
        if (![self encode:msg]) {
            error = [NSError errorWithDomain:DOLPSessionErrorDomain code:DOLPSessionErrorEncoderNotReady userInfo:@{NSLocalizedDescriptionKey: @"Encoder not ready"}];
        }
        if (publishHandler) {
            [self onPublish:publishHandler error:error];
        }
    } else {
        ...
    }
    [self tell];
    return msgId;
}

publishMessageWithData构建DOLPMessage对象。
[self encode:msg]

//DOLPMessage
+ (DOLPMessage *)publishMessageWithData:(NSData *)payload onTopic:(NSString *)topic qos:(QosLevel)qosLevel msgId:(UInt16)msgId dupFlag:(BOOL)dup {
    NSMutableData *data = [[NSMutableData alloc] init];
    [data appendMQTTString:topic];
    if (msgId) [data appendUInt16BigEndian:msgId];
    // note.fitz 移除MQTT5的代码
    [data appendData:payload];
    DOLPMessage *msg = [[DOLPMessage alloc] initWithType:Publish qos:qosLevel dupFlag:dup data:data];
    msg.mid = msgId;
    return msg;
}

data追加topic,追加payload,生成msg对象。

[self encode:msg]

//DOLPSession
- (BOOL)encode:(DOLPMessage *)message {
    if (message) {
        NSData *wireFormat = message.wireFormat;
        if (wireFormat) {
            if (self.delegate) {
                if ([self.delegate respondsToSelector:@selector(sending:type:qos:retained:duped:mid:data:)]) {
                    [self.delegate sending:self type:message.type qos:message.qos retained:message.retainFlag duped:message.dupFlag mid:message.mid data:message.data];
                }
            }
//            BLLogVerbose(@"[DOLPSession] mqttTransport send");
            return [self.transport send:wireFormat];
        } else {
            BLLogError(@"[DOLPSession] trying to send message without wire format");
            return false;
        }
    } else {
        BLLogError(@"[DOLPSession] trying to send nil message");
        return false;
    }
}
//DOLPMessage
- (NSData *)wireFormat {
    NSMutableData *buffer = [[NSMutableData alloc] init];

    // 固定报头 控制报文的类型+标志位
    UInt8 header;
    // 向右移位运算符(>>)把值的位向右移动。从值的低位移出的位将丢失。把无符号的值向右移位总是左侧(就是高位)移入0。
    header = (self.type & 0x0f) << 4;
    if (self.dupFlag) {
        // 等同  header = header | 0x08
        header |= 0x08;
    }
    header |= (self.qos & 0x03) << 1;
    [buffer appendBytes:&header length:1];
    [buffer appendVariableLength:self.data.length];

    // encode message data
    if (self.data != nil) {
        [buffer appendData:self.data];
    }

//    BLLogVerbose(@"[DOLPMessage] wireFormat(%lu)=%@", (unsigned long) buffer.length, buffer);

    return buffer;
}

拼接报文头:type,dupFlag,qos等再追加data。
得到调用wireFormat通过[self.transport send:wireFormat],最后调用到DOLPCFSocketEncoder send:方法

- (BOOL)send:(NSData *)data {
    @synchronized(self) {
        if (self.state != DOLPCFSocketEncoderStateReady) {
            return NO;
        }
        
        if (data) {
            [self.buffer appendData:data];
        }
        
        if (self.buffer.length) {
            NSInteger n = [self.stream write:self.buffer.bytes maxLength:self.buffer.length];
            if (n == -1) {
                self.state = DOLPCFSocketEncoderStateError;
                self.error = self.stream.streamError;
                return NO;
            } else {
                [self.buffer replaceBytesInRange:NSMakeRange(0, n) withBytes:NULL length:0];
            }
        }
        return YES;
    }
}

NSOutputStream *stream在socket中连接时与CFWriteStreamRef writeStream;绑定,经由系统的[stream write:maxLength]发送数据。

接收数据

发送数据是由上层逐渐调用系统方法,而接收数据则是由系统方法逐渐调到上层。

//DOLPCFSocketDecoder
- (void)stream:(NSStream *)sender handleEvent:(NSStreamEvent)eventCode {
    if (eventCode & NSStreamEventOpenCompleted) {
        BLLogVerbose(@"[DOLPCFSocketDecoder] NSStreamEventOpenCompleted");
        self.state = DOLPCFSocketDecoderStateReady;
        [self.delegate decoderDidOpen:self];
    }
    
    if (eventCode & NSStreamEventHasBytesAvailable) {
        if (self.state == DOLPCFSocketDecoderStateInitializing) {
            self.state = DOLPCFSocketDecoderStateReady;
        }

        if (self.state == DOLPCFSocketDecoderStateReady) {
            UInt8 buffer[768];
            self.startTimestamp = [[NSDate date] timeIntervalSince1970];
            
            NSInteger n = [self.stream read:buffer maxLength:sizeof(buffer)];
            self.endTimestamp = [[NSDate date] timeIntervalSince1970];

            if (n == -1) {
                self.state = DOLPCFSocketDecoderStateError;
                [self.delegate decoder:self didFailWithError:nil];
            } else {
                NSData *data = [NSData dataWithBytes:buffer length:n];
                [self.delegate decoder:self didReceiveMessage:data];
            }
        }
    }
    if (eventCode & NSStreamEventHasSpaceAvailable) {
        BLLogVerbose(@"[DOLPCFSocketDecoder] NSStreamEventHasSpaceAvailable");
    }

    if (eventCode & NSStreamEventEndEncountered) {
        self.state = DOLPCFSocketDecoderStateInitializing;
        self.error = nil;
        [self.delegate decoderdidClose:self];
    }

    if (eventCode & NSStreamEventErrorOccurred) {
        self.state = DOLPCFSocketDecoderStateError;
        self.error = self.stream.streamError;
        [self.delegate decoder:self didFailWithError:self.error];
    }
}
  1. 服务端发送数据首先会回调DOLPCFSocketDecoder 中的[stream:handleEvent:]方法,
  2. DOLPCFSocketDecoder中的NSInputStream *stream在socket链接时与CFBridgingRelease(readStream)绑定。
  3. [stream:handleEvent:]会实时接收eventCode状态,回调给上层。

问题:快速发送socket请求会造成[self.stream read:buffer maxLength:sizeof(buffer)]阻塞。

- (void)readBufferTimeOut{
    NSTimeInterval timeDifference = self.endTimestamp-self.startTimestamp;
    if (timeDifference<0 && self.delegate) {
        [[BLDolpServer sharedServer].sessionManager.session keepAlive];
    }
}

此方法阻塞是由于系统原因导致的,可通过发送心跳打破这种平衡。才有了上述的读取超时发送心跳机制。此处timer与上层检验socket超时timer共用一个。

[self.delegate decoder:self didReceiveMessage:data];通过delege调用到DOLPCFSocketTransport中的[decoder: didReceiveMessage:],再回调到DOLPSession中的[dolpTransport:didReceiveMessage:]

//DOLPSession
- (void)dolpTransport:(nonnull id <DOLPTransport>)mqttTransport didReceiveMessage:(nonnull NSData *)message {
    [self.decoder decodeMessage:message];
}

[decodeMessage:]调用到DOLPDecoder中的[decodeMessage:]
继而响应了[stream:handleEvent:]

//DOLPDecoder
- (void)decodeMessage:(NSData *)data {
    NSInputStream *stream = [NSInputStream inputStreamWithData:data];
    CFReadStreamRef readStream = (__bridge CFReadStreamRef)stream;
    CFReadStreamSetDispatchQueue(readStream, self.queue);
    [self openStream:stream];
}

- (void)stream:(NSStream *)sender handleEvent:(NSStreamEvent)eventCode {
    ...
    if (self.dataBuffer.length == self.length + self.offset) {
        [self.delegate decoder:self didReceiveMessage:self.dataBuffer];
        self.dataBuffer = nil;
        self.state = DecoderStateDecodingHeader;
    } 
    ...
}

self.dataBuffer.length == self.length + self.offset:判断数据完整接收,调用上层DOLPSession的[decoder:didReceiveMessage:]方法

//DOLPSession
- (void)decoder:(DOLPDecoder *)sender didReceiveMessage:(NSData *)data {
    DOLPMessage *message = [DOLPMessage messageFromData:data];
    ...
    @synchronized (sender) {
        ...
        switch (self.status) {
            ...
            case DOLPSessionStatusConnected:
                switch (message.type) {
                    case Publish:
                        [self handlePublish:message];
                        break;
                    case Puback:
                        [self handlePuback:message];
                        break;
                    case Pubrec:
                        [self handlePubrec:message];
                        break;
                    case Pubrel:
                        [self handlePubrel:message];
                        break;
                    case Pubcomp:
                        [self handlePubcomp:message];
                        break;
                    case Suback:
                        [self handleSuback:message];
                        break;
                    case Unsuback:
                        [self handleUnsuback:message];
                        break;
                    case Disconnect: {
                        NSError *error = [NSError errorWithDomain:DOLPSessionErrorDomain
                                                             code:(message.returnCode).intValue
                                                         userInfo:@{NSLocalizedDescriptionKey: @"MQTT protocol DISCONNECT received"}];

                        [self protocolError:error];
                    }

                    default:
                        break;
                }
                break;
            default:
                BLLogError(@"[DOLPSession] other state");
                break;
        }
    }
}

- (void)handlePublish:(DOLPMessage *)msg {
    NSData *data = msg.data;
    if (data.length < 2) {
        return;
    }
    UInt8 const *bytes = data.bytes;
    UInt16 topicLength = 256 * bytes[0] + bytes[1];
    if (data.length < 2 + topicLength) {
        return;
    }
    NSData *topicData = [data subdataWithRange:NSMakeRange(2, topicLength)];
    NSString *topic = [[NSString alloc] initWithData:topicData
                                            encoding:NSUTF8StringEncoding];
    if (!topic) {
        topic = [[NSString alloc] initWithData:topicData
                                      encoding:NSISOLatin1StringEncoding];
        BLLogError(@"non UTF8 topic %@", topic);
    }
    NSRange range = NSMakeRange(2 + topicLength, data.length - topicLength - 2);
    data = [data subdataWithRange:range];

    if (msg.qos == 0) {
        ...
    } else {
        if (data.length >= 2) {
            bytes = data.bytes;
            UInt16 msgId = 256 * bytes[0] + bytes[1];
            msg.mid = msgId;
            data = [data subdataWithRange:NSMakeRange(2, data.length - 2)];
            if (msg.qos == 1) {
                // note.fitz 移除MQTT5的代码

                BOOL processed = true;
                if ([self.delegate respondsToSelector:@selector(newMessage:data:onTopic:qos:retained:mid:)]) {
                    [self.delegate newMessage:self data:data onTopic:topic qos:msg.qos retained:msg.retainFlag mid:msgId];
                }
                if ([self.delegate respondsToSelector:@selector(newMessageWithFeedback:data:onTopic:qos:retained:mid:)]) {
                    processed = [self.delegate newMessageWithFeedback:self data:data onTopic:topic qos:msg.qos retained:msg.retainFlag mid:msgId];
                }
                if (self.messageHandler) {
                    self.messageHandler(data, topic);
                }
                if (processed) {
                    (void) [self encode:[DOLPMessage pubackMessageWithMessageId:msgId]];
                }
                return;
            } else {
                ...
            }
        }
    }
}

处理socket已连接的Publish消息进入handlePublish,继而调用下面的方法:

//DOLPSessionManager
- (void)newMessage:(DOLPSession *)session data:(NSData *)data onTopic:(NSString *)topic qos:(QosLevel)qos retained:(BOOL)retained mid:(unsigned int)mid {
    if (self.delegate) {
        if ([self.delegate respondsToSelector:@selector(sessionManager:didReceiveMessage:onTopic:retained:)]) {
            [self.delegate sessionManager:self didReceiveMessage:data onTopic:topic retained:retained];
        }
        if ([self.delegate respondsToSelector:@selector(handleMessage:onTopic:retained:)]) {
            [self.delegate handleMessage:data onTopic:topic retained:retained];
        }
    }
}

下面是业务处理代码:

//BLDolpServer
// 接收数据
- (void)handleMessage:(NSData *)data onTopic:(NSString *)topic retained:(BOOL)retained {
    NSError *error = nil;
    DolpBaseData *baseData = [DolpBaseData parseFromData:data error:&error];
    if (error) {
        BLLogError(@"转换数据发生错误%@", error.description);
        return;
    }
    ///通过相应的数据解码器,将返回的proto数据解码为对应的DTO
    enum DolpActionType actionType = baseData.action;

    dispatch_async(self.messageHandlerQueue, ^{
        
        DolpDelegateResponse *serverData = [DolpActionRouteDispatcher decodeForServerData:actionType data:baseData];
        
        if (self.logRecord) {
            BLLogInfo(@"DOLP Start-------收到服务端消息 %i \n%@\nDOLP END ------", actionType, [serverData.data modelToJSONString]);
        }
        
        DolpRequestTransmit *request = self.requestCache[baseData.id_p];
        
        if (request) {
            request.isCallback = YES;
            BLIdResultBlock rCallback = request.callback;
            if (serverData.code != 0) {
                NSError *respError = [BLErrorHelper errorWithCode:serverData.code reason:@""];
                // 暂时回到主线程调用回调处理
                dispatch_main_async_safe(^{
                    BLBlockExec(rCallback, serverData, respError);
                })
                // 回调完成后,清除信息
                [self removeRequestCache:baseData.id_p];
                return;
            }
            // 暂时回到主线程调用回调处理
            dispatch_async(dispatch_get_main_queue(), ^{
                BLBlockExec(rCallback, serverData, nil);
            });
            // 回调完成后,清除信息
            [self removeRequestCache:baseData.id_p];
            return;
        }
        
        //处理业务代码
    });
}

小结

socket证书配置,socket连接,发送数据编码,接收数据解码返回上层处理的大致流程。

心跳

//DOLPSession
- (void)decoder:(DOLPDecoder *)sender didReceiveMessage:(NSData *)data {
    ...
    @synchronized (sender) {
        ...
        switch (self.status) {
            case DOLPSessionStatusConnecting:
                switch (message.type) {
                    case Connack:
                        // note.fitz 移除MQTT5的代码
                        if (message.returnCode && (message.returnCode).intValue == DOLPSuccess) {
                            ...

                            if (message.properties) {
                                self.serverKeepAlive = message.properties.serverKeepAlive;
                            }
                            if (self.serverKeepAlive) {
                                self.effectiveKeepAlive = (self.serverKeepAlive).unsignedShortValue;
                            } else {
                                self.effectiveKeepAlive = self.keepAliveInterval;
                            }                            
                            if (self.effectiveKeepAlive > 0) {
                                self.keepAliveTimer = [GCDTimer scheduledTimerWithTimeInterval:self.effectiveKeepAlive
                                                                                       repeats:YES
                                                                                         queue:self.queue
                                                                                         block:^() {
                                                                                             [weakSelf keepAlive];
                                                                                         }];
                            }
    ...
                            
    }
}
  1. 心跳在socket连接确认后即Connack,开启心跳。
  2. 心跳间隔以服务端返回优先级最高,否则以传入参数为准。

NSTimer优化

心跳起个timer,间隔自定义;超时起个timer,每秒一次,检测发送请求是否超时;解决上述快速发送消息卡顿起个timer。
起一个timer,统一处理上述业务。超时timer间隔最小,在要求不是很精确的情况下,带一个变量就能解决timer共用问题。

超时重连

//MQTTSessionManager
- (MQTTSessionManager *)initWithPersistence:(BOOL)persistent
                              maxWindowSize:(NSUInteger)maxWindowSize
                                maxMessages:(NSUInteger)maxMessages
                                    maxSize:(NSUInteger)maxSize
                 maxConnectionRetryInterval:(NSTimeInterval)maxRetryInterval
                        connectInForeground:(BOOL)connectInForeground
                             streamSSLLevel:(NSString *)streamSSLLevel
                                      queue:(dispatch_queue_t)queue {
    ...
    __weak MQTTSessionManager *weakSelf = self;
    self.reconnectTimer = [[ReconnectTimer alloc] initWithRetryInterval:RECONNECT_TIMER
                                                       maxRetryInterval:maxRetryInterval
                                                                  queue:self.queue
                                                         reconnectBlock:^{
                                                             [weakSelf reconnect:nil];
                                                         }];
#if TARGET_OS_IPHONE == 1
    if (connectInForeground) {
        self.foregroundReconnection = [[ForegroundReconnection alloc] initWithMQTTSessionManager:self];
    }
#endif
    ...
}

初始化时会检测开启重连timer,进行重连。

前后台处理

//ForegroundReconnection
- (void)appWillResignActive {
    [self.sessionManager disconnectWithDisconnectHandler:nil];
}

- (void)appDidBecomeActive {
    [self.sessionManager connectToLast:nil];
    [self endBackgroundTask];
}

进入后台会断开socket,在进入前台会重新检测链接。

业务迭代

服务端通过lbs接口返回了两个ip,如果第一个无效则使用第二个进行链接。打破了mqtt原来的重连机制。我们在mqtt上也解决了文中提到的问题,socket超时如何自定义事件等。也添加了两个ip问题。

MQTT小结

MQTT使用什么通讯?快速点击时,主线程卡顿?服务端返回两个ip,超时时间修改?

  1. MQTT传输过程
    1. 建立链接SocketTransport使用的是CFStream框架。CFReadStreamRef、CFWriteStreamRef代理给NSOutputStream、NSInputStream。CFStreamCreatePairWithSocketToHost(基于TCP/IP建立链接)建立socket链接
    2. 加密 SSL证书
      • 读取CA证书,DOLPSSLSecurityPolicy认证,CFReadStreamSetProperty设置对应的属性
      • CFReadStreamSetProperty(readStream, kCFStreamPropertySSLSettings, (__bridge CFDictionaryRef) (sslOptions))配置输出流加密
      • CFWriteStreamSetProperty(writeStream, kCFStreamPropertySSLSettings, (__bridge CFDictionaryRef) (sslOptions))配置输入流加密
    3. 发送消息编码SocketEncoder使用的NSOutputStream.
    4. 接收消息解码SocketDecoder使用的NSInputStream。
  2. 消息质量怎么保证?

没有使用MQTT的QOS,使用了最多一次,消息丢失靠消息融合算法与socket推送来保证。

  1. 超时怎么设置的

lbs接口返回了多个地址,第一个不可用,使用第二个

  1. 心跳是怎么做的,如何高效的设置心跳?

正常的做法起timer,固定时间发心跳。
智能心跳

  1. 如何解决消息重复

应答机制,基于TCP/IP。
完全自已开发的IM该如何设计“失败重试”机制?

  • 上层做重发,例如可以失败后第一次10秒重发,再失败30秒,再失败60秒,再失败提示用户。
  • 失败直接提示用户,让用户主动去重发。
  1. 简单介绍
    • 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合
    • 使用 TCP/IP 提供网络连接
    • 有三种消息发布服务质量
    • 客户端如果互相通信,必须在同一订阅主题下,即都订阅了同一个topic。

相关文章

网友评论

      本文标题:MQTT源码阅读及问题分析

      本文链接:https://www.haomeiwen.com/subject/xtaijltx.html