美文网首页
MQTT 链接

MQTT 链接

作者: 请叫我啊亮 | 来源:发表于2019-08-27 09:25 被阅读0次

    TCP属于长连接,会一直占用服务器资源。http即用即断,占用资源短
    TCP“三次握手”:
    第一次握手:客户端发送syn包(syn=j)到服务器,并进入SYN_SEND状态,等待服务器确认;
    第二次握手:服务器收到syn包,必须确认客户的SYN(ack=j+1),同时自己也发送一个SYN包(syn=k),即SYN+ACK包,此时服务器进入SYN_RECV状态;
    第三次握手:客户端收到服务器的SYN+ACK包,向服务器发送确认包ACK(ack=k+1),此包发送完毕,客户端和服务器进入ESTABLISHED状态,完成三次握手。
    我的理解大概是:
    1: C端发送一个包给S端,告诉S端我要连接你
    2: S端收到C端的连接请求,判断下自身资源等情况,然后返回一个包个C端,允许你连接
    3: C端收到包,知道S端已就绪,返回一个确认包给S,双方准备就绪

    四次挥手理解:
    1: C端发送一个包给S端,告诉S端我要断开连接了
    2:S端收到包,返回一个确认包给C端,表示我知道了
    3:S端准备就绪后,发送一个包给C端,告诉C我准备断开连接了
    4 C发送一个确认包给S,表示我知道你断开了。
    为什么有4次挥手?2、3步不可以合并的原因是,2步发生时S端可能正在往C端发送数据,所以只能等数据发送完毕再执行3.

    TCP的底层机制能保证发送方和接收方的数据一致。
    发送数据时,即使只发送一个字节的数据,也会包装上ip,端口等信息而导致数据变大,为了效率,数据很可能会先存储在一个缓存池中,等待一小会一起发送。数据量大时因为网络问题,如网络拥堵等,包的大小也有各种限制,会导致一段数据拆分成若干小包发送,数据粘包现象由此产生。 接收数据时需要按照一定的格式进行拆包处理,还原成发送时的样子。现在普遍是采用header标示,按照约定好的格式统一发送和接受处理。

    以MQTT框架为例。
    发送数据组装如下

    - (NSData *)wireFormat {
        NSMutableData *buffer = [[NSMutableData alloc] init];
        UInt8 header; // header共一个字节,即8位
        header = (self.type & 0x0f) << 4; //  左边4位存储type,
        if (self.dupFlag) { // 第5位存储dupFlag
            header |= 0x08;
        }
        header |= (self.qos & 0x03) << 1; // 第6、7位存储qos,因为qos有三种状态,需要2位才能存储的下
        if (self.retainFlag) {// 第8位存储retainFlag
            header |= 0x01;
        }
        [buffer appendBytes:&header length:1];
        [buffer appendVariableLength:self.data.length];
        if (self.data != nil) {
            [buffer appendData:self.data];
        }
        return buffer;
    }
    

    一个完整的包由header(1个字节),包长(一个或多个字节),包内容组成, 例:心跳包为\xc0,占据两个字节,二进制为1100 0000 0000 0000,前面8位一个字节为header,后面八位为包长. 其中header左边4位为c,对应为心跳包MQTTPingreq类型。 包长一个字节,内容为0,表示这个包没有内容

    数据解析如下

    - (void)decodeMessage:(NSData *)data {
       NSInputStream *stream = [NSInputStream inputStreamWithData:data];
       CFReadStreamRef readStream = (__bridge CFReadStreamRef)stream;
       CFReadStreamSetDispatchQueue(readStream, self.queue);
       [self openStream:stream];
    }
    
    - (void)openStream:(NSInputStream *)stream {
       [self.streams addObject:stream];
       stream.delegate = self;
       // 解析流的时候先用数组将流装起来,防止上一个流还没解析完成。只有一个流时才开始解析
       // 若数组中有多个流,则当第一个流解析完成,再解析第二个,类似串形队列
       if (self.streams.count == 1) {
           [stream open];
       }
    }
    
    - (void)stream:(NSStream *)sender handleEvent:(NSStreamEvent)eventCode {
       MQTTDecoder *strongDecoder = self;
       (void)strongDecoder;
       NSInputStream *stream = (NSInputStream *)sender;
       if (eventCode & NSStreamEventHasBytesAvailable) {// 流中还有字节时会来到这里
           // self.state 默认值是MQTTDecoderStateDecodingHeader,先解析header
           if (self.state == MQTTDecoderStateDecodingHeader) {
               UInt8 buffer;
               NSInteger n = [stream read:&buffer maxLength:1];
               if (n == -1) {
                   self.state = MQTTDecoderStateConnectionError;
                   [self.delegate decoder:self handleEvent:MQTTDecoderEventConnectionError error:stream.streamError];
               } else if (n == 1) {
                   // header是一个字节,解析完会进入这里,设置self.state为MQTTDecoderStateDecodingLength,解析包长
                   self.length = 0;
                   self.lengthMultiplier = 1;
                   self.state = MQTTDecoderStateDecodingLength;
                   self.dataBuffer = [[NSMutableData alloc] init];
                   [self.dataBuffer appendBytes:&buffer length:1];
                   self.offset = 1;
               }
           }
           while (self.state == MQTTDecoderStateDecodingLength) {// 解析包长
               UInt8 digit;
               NSInteger n = [stream read:&digit maxLength:1];
               if (n == -1) {
                   self.state = MQTTDecoderStateConnectionError;
                   [self.delegate decoder:self handleEvent:MQTTDecoderEventConnectionError error:stream.streamError];
                   break;
               } else if (n == 0) {
                   break;
               }
               [self.dataBuffer appendBytes:&digit length:1];
               self.offset++;
               self.length += ((digit & 0x7f) * self.lengthMultiplier);
               if ((digit & 0x80) == 0x00) {
                   // 0x80为1000 0000,代码来到这里表示digit的第一位为0,包长解析完成,设置self.state为MQTTDecoderStateDecodingData,退出循环
                   self.state = MQTTDecoderStateDecodingData;
               } else {
                   self.lengthMultiplier *= 128;
               }
           }
           if (self.state == MQTTDecoderStateDecodingData) {// 解析包实际数据
               if (self.length > 0) {
                   NSInteger n, toRead;
                   UInt8 buffer[768];
                   toRead = self.length + self.offset - self.dataBuffer.length;
                   if (toRead > sizeof buffer) {
                       toRead = sizeof buffer;
                   }
                   n = [stream read:buffer maxLength:toRead];
                   if (n == -1) {
                       self.state = MQTTDecoderStateConnectionError;
                       [self.delegate decoder:self handleEvent:MQTTDecoderEventConnectionError error:stream.streamError];
                   } else {
                       [self.dataBuffer appendBytes:buffer length:n];
                   }
               }
               if (self.dataBuffer.length == self.length + self.offset) {
                   // 因为tcp保证发送方和接收方会完全一致,所以肯定会来到这里,代表一条发送方的完整数据解析完成
                   // 重新设置self.state为MQTTDecoderStateDecodingHeader,则会重新解析header
                   // 因为粘包问题,这里有可能这个流还没有解析完成,因为流中还有数据,这个代理方法还会再次被调用,重走流程
                   [self.delegate decoder:self didReceiveMessage:self.dataBuffer];
                   self.dataBuffer = nil;
                   self.state = MQTTDecoderStateDecodingHeader;
               }
           }
       }
       if (eventCode & NSStreamEventEndEncountered) {// 一个流解析完成,关闭流。若数组中还有流则取出第一个流开始解析
           if (self.streams) {
               [stream setDelegate:nil];
               [stream close];
               [self.streams removeObject:stream];
               if (self.streams.count) {
                   NSInputStream *stream = (self.streams)[0];
                   [stream open];
               }
           }
       }
    }
    

    相关文章

      网友评论

          本文标题:MQTT 链接

          本文链接:https://www.haomeiwen.com/subject/nadsectx.html