美文网首页
MixPanel代码阅读笔记-websocket

MixPanel代码阅读笔记-websocket

作者: 云舒卷_js | 来源:发表于2018-03-21 18:44 被阅读0次

WebSocket介绍

WebSocket protocol 是HTML5一种新的协议。它实现了浏览器与服务器全双工通信(full-duplex)。一开始的握手需要借助HTTP请求完成。 

WebSocket和HTTP、FTP一样也是应用层的协议,但是它是一种双向通信协议,是建立在TCP/IP之上的。 

连接的过程是: 

首先,客户端发起http请求,http请求里存放WebSocket支持的版本号等信息,如:Upgrade、Connection、WebSocket-Version等; 

然后,服务器收到客户端的握手请求后,同样采用HTTP协议回馈数据; 

最后,客户端收到连接成功的消息后,开始借助于TCP传输信道进行全双工通信。

MPWebsocket

mixpanel的websocket代码和facebook的socketRocket类似,而SRWebsocket代码也是开源的。。

它主要的功能api有:

- (void)webSocket:(MPWebSocket *)webSocket didReceiveMessage:(id)message;

- (void)webSocketDidOpen:(MPWebSocket *)webSocket;

- (void)webSocket:(MPWebSocket *)webSocket didFailWithError:(NSError *)error;

- (void)webSocket:(MPWebSocket *)webSocket didCloseWithCode:(NSInteger)code reason:(NSString *)reason wasClean:(BOOL)wasClean;

使用起来比较简单,我们用这几个代理方法就能实现我们大部分的功能了。

初始化流程

包括对schem进行断言,只支持ws/wss/http/https四种。

当前socket状态,是正在连接,还是已连接、断开等等。

初始化工作队列,以及流回调线程等等。

初始化读写缓冲区:_readBuffer、_outputBuffer

- (void)_MP_commonInit;

{

    NSString *scheme = _url.scheme.lowercaseString;

assert([scheme isEqualToString:@"ws"] || [scheme isEqualToString:@"http"] || [scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]); 

    if ([scheme isEqualToString:@"wss"] || [scheme isEqualToString:@"https"]) {

        _secure = YES;

    }

    _readyState = MPWebSocketStateConnecting;

    _consumerStopped = YES;

    _webSocketVersion = 13;

    _workQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);

    // Going to set a specific on the queue so we can validate we're on the work queue

    dispatch_queue_set_specific(_workQueue, (__bridge void *)self, maybe_bridge(_workQueue), NULL);

    _delegateDispatchQueue = dispatch_get_main_queue();

    mp_dispatch_retain(_delegateDispatchQueue);

    _readBuffer = [[NSMutableData alloc] init];

    _outputBuffer = [[NSMutableData alloc] init];

    _currentFrameData = [[NSMutableData alloc] init];

    _consumers = [NSMutableArray array];

    _consumerPool = [[MPIOConsumerPool alloc] init];

    _scheduledRunloops = [NSMutableSet set];

    [self _initializeStreams];

    // default handlers

}

 开启连接 (给输入输出流绑定一个runloop)

- (void)_connect

{

    if (!_scheduledRunloops.count) { // 判断有没有runloop,

        [self scheduleInRunLoop:[NSRunLoop mp_networkRunLoop] forMode:NSDefaultRunLoopMode];

        // 创建一个带有runloop的常驻线程,模式为 NSDefaultRunLoopMode

    }

//    开启输入输出流

    [_outputStream open];

    [_inputStream open];

}

 把socket放入run loop中 并打开流

- (void)scheduleInRunLoop:(NSRunLoop *)aRunLoop forMode:(NSString *)mode;

{

    [_outputStream scheduleInRunLoop:aRunLoop forMode:mode];

    [_inputStream scheduleInRunLoop:aRunLoop forMode:mode];

    [_scheduledRunloops addObject:@[aRunLoop, mode]];

}

 打开成功后会发送一个http请求 GET,作为第一次握手,WebSocket建立连接前,都会以http请求作为握手的方式,这个方法就是在构造http的请求头

- (void)didConnect

{

    MPLogInfo(@"Connected");

    CFHTTPMessageRef request = CFHTTPMessageCreateRequest(NULL, CFSTR("GET"), (__bridge CFURLRef)_url, kCFHTTPVersion1_1);

    // Set host first so it defaults

    CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Host"), (__bridge CFStringRef)(_url.port ? [NSString stringWithFormat:@"%@:%@", _url.host, _url.port] : _url.host));

    // 生成密钥,

    NSMutableData *keyBytes = [[NSMutableData alloc] initWithLength:16];

    int result = SecRandomCopyBytes(kSecRandomDefault, keyBytes.length, keyBytes.mutableBytes);

    if (result != 0) {

        MPLogError(@"Failed to generate random bytes with status: %d", result);

    }

    // 用base64转码

    _secKey = [keyBytes base64EncodedStringWithOptions:NSDataBase64Encoding64CharacterLineLength];

    // 断言编码后长度24

    assert(_secKey.length == 24);

    //web socket规范head

    CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Upgrade"), CFSTR("websocket"));

    CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Connection"), CFSTR("Upgrade"));

    CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Sec-WebSocket-Key"), (__bridge CFStringRef)_secKey);

    CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Sec-WebSocket-Version"), (__bridge CFStringRef)[NSString stringWithFormat:@"%ld", (long)_webSocketVersion]);

    CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Origin"), (__bridge CFStringRef)_url.mp_origin);

    if (_requestedProtocols) {//用户初始化的协议数组,可以约束websocket的一些行为

        CFHTTPMessageSetHeaderFieldValue(request, CFSTR("Sec-WebSocket-Protocol"), (__bridge CFStringRef)[_requestedProtocols componentsJoinedByString:@", "]);

    }

//    吧 _urlRequest中原有的head 设置到request中去

    [_urlRequest.allHTTPHeaderFields enumerateKeysAndObjectsUsingBlock:^(id key, id obj, BOOL *stop) {

        CFHTTPMessageSetHeaderFieldValue(request, (__bridge CFStringRef)key, (__bridge CFStringRef)obj);

    }];

//返回一个序列化 , CFBridgingRelease和 __bridge transfer一个意思, CFHTTPMessageCopySerializedMessage copy一份新的并且序列化,返回CFDataRef

    NSData *message = CFBridgingRelease(CFHTTPMessageCopySerializedMessage(request));

//释放request

    CFRelease(request);

//把这个request当成data去写

    [self _writeData:message];

    //读取http的头部

    [self _readHTTPHeader];

}

处理消息帧方法

数据是通过CFStream流的方式回调回来的,每次拿到流数据,都是先放在数据缓冲区中,然后去读当前消息帧的头部,得到当前数据包的大小,然后再去创建消费者对象consumer,去读取缓冲区指定数据包大小的内容,读完才会回调给我们上层用户,所以,我们如果用SRWebSocket完全不需要考虑数据断包、粘包的问题,每次到达的数据,都是一条完整的数据

- (void)_readHTTPHeader;

{

    if (_receivedHTTPHeaders == NULL) {

        //序列化的http消息

        _receivedHTTPHeaders = CFHTTPMessageCreateEmpty(NULL, NO);

    }

    //不停的add consumer去读数据

    [self _readUntilHeaderCompleteWithCallback:^(MPWebSocket *websocket,  NSData *data) {

        //拼接数据,拼到头部

        CFHTTPMessageAppendBytes(websocket->_receivedHTTPHeaders, (const UInt8 *)data.bytes, (CFIndex)data.length);

//        持续进行反序列化直到CFHTTPMessageIsHeaderComplete 返回TRUE。如果不去检测CFHTTPMessageIsHeaderComplete返回TRUE,这个消息可能不完整和不可靠。

        //判断是否接受完

        if (CFHTTPMessageIsHeaderComplete(websocket->_receivedHTTPHeaders)) {

            MPLogDebug(@"Finished reading headers %@", CFBridgingRelease(CFHTTPMessageCopyAllHeaderFields(websocket->_receivedHTTPHeaders)));

            [websocket _HTTPHeadersDidFinish]; //

        } else {

            //没读完递归调

            [websocket _readHTTPHeader];

        }

    }];

}

 检查握手信息

//我们发出这个http请求后,得到服务端的响应头,去按照服务端的方式加密Sec-WebSocket-Key,判断与Sec-WebSocket-Accept是否相同,相同则表明握手成功,否则失败处理。

- (BOOL)_checkHandshake:(CFHTTPMessageRef)httpMessage;

{

    // 是不是允许的header

    NSString *acceptHeader = CFBridgingRelease(CFHTTPMessageCopyHeaderFieldValue(httpMessage, CFSTR("Sec-WebSocket-Accept")));

    // 为nil被服务器拒绝

    if (acceptHeader == nil) {

        return NO;

    }

    //得到

    NSString *concatenatedString = [_secKey stringByAppendingString:MPWebSocketAppendToSecKeyString];

      //期待accept的字符串

    NSString *expectedAccept = [concatenatedString stringBySHA1ThenBase64Encoding];

//判断是否相同,相同就握手信息对了

    return [acceptHeader isEqualToString:expectedAccept];

}

至此都成功的话,一个WebSocket连接建立完毕。


MPWebsocket基于NSStreamDelegate来完成数据流的读写

- (void)stream:(NSStream *)aStream handleEvent:(NSStreamEvent)eventCode;

{

    if (_secure && !_pinnedCertFound && (eventCode == NSStreamEventHasBytesAvailable || eventCode == NSStreamEventHasSpaceAvailable)) {

        NSArray *sslCerts = [_urlRequest mp_SSLPinnedCertificates];

        if (sslCerts) {

            SecTrustRef secTrust = (__bridge SecTrustRef)[aStream propertyForKey:(__bridge id)kCFStreamPropertySSLPeerTrust];

            if (secTrust) {

                NSInteger numCerts = SecTrustGetCertificateCount(secTrust);

                for (NSInteger i = 0; i < numCerts && !_pinnedCertFound; i++) {

                    SecCertificateRef cert = SecTrustGetCertificateAtIndex(secTrust, i);

                    NSData *certData = CFBridgingRelease(SecCertificateCopyData(cert));

                    for (id ref in sslCerts) {

                        SecCertificateRef trustedCert = (__bridge SecCertificateRef)ref;

                        NSData *trustedCertData = CFBridgingRelease(SecCertificateCopyData(trustedCert));

                        if ([trustedCertData isEqualToData:certData]) {

                            _pinnedCertFound = YES;

                            break;

                        }

                    }

                }

            }

            if (!_pinnedCertFound) {

                dispatch_async(_workQueue, ^{

                    [self _failWithError:[NSError errorWithDomain:@"org.lolrus.SocketRocket" code:23556 userInfo:@{NSLocalizedDescriptionKey: [NSString stringWithFormat:@"Invalid server cert"]}]];

                });

                return;

            }

        }

    }

    dispatch_async(_workQueue, ^{

        switch (eventCode) {

            case NSStreamEventOpenCompleted: {

                MPLogDebug(@"NSStreamEventOpenCompleted %@", aStream);

                if (self.readyState >= MPWebSocketStateClosing) {

                    return;

                }

                assert(self->_readBuffer);

                if (self.readyState == MPWebSocketStateConnecting && aStream == self->_inputStream) {

                    [self didConnect];

                }

                [self _pumpWriting];

                [self _pumpScanner];

                break;

            }

            case NSStreamEventErrorOccurred: {

                MPLogError(@"NSStreamEventErrorOccurred %@ %@", aStream, [[aStream streamError] copy]);

                /// TODO specify error better!

                [self _failWithError:aStream.streamError];

                self->_readBufferOffset = 0;

                [self->_readBuffer setLength:0];

                break;

            }

            case NSStreamEventEndEncountered: {

                [self _pumpScanner];

                MPLogDebug(@"NSStreamEventEndEncountered %@", aStream);

                if (aStream.streamError) {

                    [self _failWithError:aStream.streamError];

                } else {

                    if (self.readyState != MPWebSocketStateClosed) {

                        self.readyState = MPWebSocketStateClosed;

                        [self _scheduleCleanup];

                    }

                    if (!self->_sentClose && !self->_failed) {

                        self->_sentClose = YES;

                        // If we get closed in this state it's probably not clean because we should be sending this when we send messages

                        [self _performDelegateBlock:^{

                            if ([self.delegate respondsToSelector:@selector(webSocket:didCloseWithCode:reason:wasClean:)]) {

                                [self.delegate webSocket:self didCloseWithCode:MPStatusCodeGoingAway reason:@"Stream end encountered" wasClean:NO];

                            }

                        }];

                    }

                }

                break;

            }

            case NSStreamEventHasBytesAvailable: {

                MPLogDebug(@"NSStreamEventHasBytesAvailable %@", aStream);

                const int bufferSize = 2048;

                uint8_t buffer[bufferSize];

                while (self->_inputStream.hasBytesAvailable) {

                    NSInteger bytes_read = [self->_inputStream read:buffer maxLength:bufferSize];

                    if (bytes_read > 0) {

                        [self->_readBuffer appendBytes:buffer length:(NSUInteger)bytes_read];

                    } else if (bytes_read < 0) {

                        [self _failWithError:self->_inputStream.streamError];

                    }

                    if (bytes_read != bufferSize) {

                        break;

                    }

                }

                [self _pumpScanner];

                break;

            }

            case NSStreamEventHasSpaceAvailable: {

                MPLogDebug(@"NSStreamEventHasSpaceAvailable %@", aStream);

                [self _pumpWriting];

                break;

            }

            default:

                MPLogDebug(@"(default) %@", aStream);

                break;

        }

    });

}

相关文章

网友评论

      本文标题:MixPanel代码阅读笔记-websocket

      本文链接:https://www.haomeiwen.com/subject/qibbqftx.html