美文网首页即时通讯
工欲利其事必先利其器-SocketRocket (二)

工欲利其事必先利其器-SocketRocket (二)

作者: Dombo_Y | 来源:发表于2022-12-17 18:06 被阅读0次

    SocketRocket 重头戏~~~~~~~~~

    SRWebSocket


    SRWebSocket.h

    WebSocket 的四种状态

    typedef NS_ENUM(NSInteger, SRReadyState) {
        SR_CONNECTING   = 0, 
        SR_OPEN         = 1,
        SR_CLOSING      = 2,
        SR_CLOSED       = 3,
    };
    
    typedef NS_ENUM(NSInteger, SRStatusCode) {
        // 0-999: Reserved and not used.
        SRStatusCodeNormal = 1000,
        SRStatusCodeGoingAway = 1001,
        SRStatusCodeProtocolError = 1002,
        SRStatusCodeUnhandledType = 1003,
        // 1004 reserved.
        SRStatusNoStatusReceived = 1005,
        SRStatusCodeAbnormal = 1006,
        SRStatusCodeInvalidUTF8 = 1007,
        SRStatusCodePolicyViolated = 1008,
        SRStatusCodeMessageTooBig = 1009,
        SRStatusCodeMissingExtension = 1010,
        SRStatusCodeInternalError = 1011,
        SRStatusCodeServiceRestart = 1012,
        SRStatusCodeTryAgainLater = 1013,
        // 1014: Reserved for future use by the WebSocket standard.
        SRStatusCodeTLSHandshake = 1015,
        // 1016-1999: Reserved for future use by the WebSocket standard.
        // 2000-2999: Reserved for use by WebSocket extensions.
        // 3000-3999: Available for use by libraries and frameworks. May not be used by applications. Available for registration at the IANA via first-come, first-serve.
        // 4000-4999: Available for use by applications.
    };
    

    SRWebSocket.m
    算了.h 文件先不看了,主要看 .m文件

    __attribute__((used)) static void importCategories()
    {
        import_NSURLRequest_SRWebSocket();
        import_NSRunLoop_SRWebSocket();
    }
    

    关于这个 __attribute__ 是编译器指令,告诉编译器 声明的特性,或者让编译器进行更多的错误检查和 高级优化。
    具体嘛,参考这个老哥的文章就行 attribute 机制使用
    后面有时间也会写一篇详细的使用案例。(挖个坑)
    https://gcc.gnu.org/onlinedocs/gcc-4.7.1/gcc/Function-Attributes.html#Function-Attributes

    typedef struct {
        BOOL fin;
        //  BOOL rsv1;
        //  BOOL rsv2;
        //  BOOL rsv3;
        uint8_t opcode;
        BOOL masked;
        uint64_t payload_length;
    } frame_header;
    

    数据头参数 opcode 就是 【SROpCode 枚举对应的值】

    static NSString *const SRWebSocketAppendToSecKeyString = @"258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
    

    将 SecKeyString 用static 延长变量生命周期,直到程序结束才销毁,用const变为 只读

    static inline int32_t validate_dispatch_data_partial_string(NSData *data);
    

    声明一个 静态内联函数,关于什么是内联函数小白可以看下这个
    OC 内联函数 inline

    static uint8_t const SRWebSocketProtocolVersion = 13;
    NSString *const SRWebSocketErrorDomain = @"SRWebSocketErrorDomain"; 
    NSString *const SRHTTPResponseErrorKey = @"HTTPResponseStatusCode";
    
    @interface SRWebSocket ()  <NSStreamDelegate>
    
    @property (atomic, assign, readwrite) SRReadyState readyState; 
    
    // Specifies whether SSL trust chain should NOT be evaluated.
    // By default this flag is set to NO, meaning only secure SSL connections are allowed.
    // For DEBUG builds this flag is ignored, and SSL connections are allowed regardless
    // of the certificate trust configuration
    @property (nonatomic, assign, readwrite) BOOL allowsUntrustedSSLCertificates;
    
    @property (nonatomic, strong, readonly) SRDelegateController *delegateController;
    
    @end
    
    // SRWebSocket 初始化的方法
    //  request : NSURLRequest  // 获取URL
    //  protocols  : NSArray<NSString *>  // 获取 协议
    //  securityPolicy :SRSecurityPolicy   // 获取 策略
    - (instancetype)initWithURLRequest:(NSURLRequest *)request protocols:(NSArray<NSString *> *)protocols securityPolicy:(SRSecurityPolicy *)securityPolicy
    {
        self = [super init];
        if (!self) return self;
    
        assert(request.URL);
        _url = request.URL;
        _urlRequest = request;
        _requestedProtocols = [protocols copy];
        _securityPolicy = securityPolicy;
        _requestRequiresSSL = SRURLRequiresSSL(_url);
    
        _readyState = SR_CONNECTING;
    
        _propertyLock = OS_SPINLOCK_INIT;
        _kvoLock = SRMutexInitRecursive();
        _workQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
    
        // Going to set a specific on the queue so we can validate we're on the work queue
        dispatch_queue_set_specific(_workQueue, (__bridge void *)self, (__bridge void *)(_workQueue), NULL);
    
        _delegateController = [[SRDelegateController alloc] init];
    
    // dispatch_data_empty : A dispatch data object representing a zero-length memory region.
        _readBuffer = dispatch_data_empty;
        _outputBuffer = dispatch_data_empty;
    
        _currentFrameData = [[NSMutableData alloc] init];
    
        _consumers = [[NSMutableArray alloc] init];
    
        _consumerPool = [[SRIOConsumerPool alloc] init];
    
        _scheduledRunloops = [[NSMutableSet alloc] init];
    
        return self;
    }
    
    _workQueue = dispatch_queue_create(NULL, DISPATCH_QUEUE_SERIAL);
    dispatch_queue_set_specific(_workQueue, (__bridge void *)self, (__bridge void *)(_workQueue), NULL);
    

    create 一个串型队列,然后加载入当前的线程中,并用 (__bridge void *)self 作为 标记键

    注:关于 dispatch_queue_set_specific 的一些说明:

    通过键值对,就可以判断当前执行的任务是否包含在某个队列中,因为系统会根据给定的键,沿着队列的层级体系(即父队列)进行查找键所对应的值,如果到根队列还没找到,就说明当前任务不包含在你要判断的队列中,进而可以避免 因使用 dispatch_get_current_queue 出现的 死锁问题
    简单理解就是:给某个队列加个标记,找到这个标记就说明包含在这个队列中

     dispatch_queue_set_specific(<#dispatch_queue_t  _Nonnull queue#>, <#const void * _Nonnull key#>, <#void * _Nullable context#>, <#dispatch_function_t  _Nullable destructor#>) 
    
    void dispatch_queue_set_specific(
        dispatch_queue_t queue, //待设置标记的队列
        const void *key, //标记的键
        void *context, //标记的值。注意,这里键和值是指针,即地址,故context中可以放任何数据,但必须手动管理context的内存
        dispatch_function_t destructor //析构函数,但所在队列内存被回收,或者context值改变时,会被调用
    ); 
    
    几个初始化的方法
    - (instancetype)initWithURLRequest:(NSURLRequest *)request securityPolicy:(SRSecurityPolicy *)securityPolicy
    {
        return [self initWithURLRequest:request protocols:nil securityPolicy:securityPolicy];
    }
    
    - (instancetype)initWithURLRequest:(NSURLRequest *)request
    { // 常用
        return [self initWithURLRequest:request protocols:nil];
    }
    
    - (instancetype)initWithURL:(NSURL *)url;
    { // 常用
        return [self initWithURL:url protocols:nil];
    }
    
    - (instancetype)initWithURL:(NSURL *)url securityPolicy:(SRSecurityPolicy *)securityPolicy
    {
        NSURLRequest *request = [NSURLRequest requestWithURL:url];
        return [self initWithURLRequest:request protocols:nil securityPolicy:securityPolicy];
    }
    
    - (instancetype)initWithURL:(NSURL *)url protocols:(NSArray<NSString *> *)protocols allowsUntrustedSSLCertificates:(BOOL)allowsUntrustedSSLCertificates
    {
        NSURLRequest *request = [NSURLRequest requestWithURL:url];
        return [self initWithURLRequest:request protocols:protocols allowsUntrustedSSLCertificates:allowsUntrustedSSLCertificates];
    }
    
    - (void)assertOnWorkQueue;
    {  // 根据标记键获取 线程队列的线程,如果不等于该线程,则 终止程序
        assert(dispatch_get_specific((__bridge void *)self) == (__bridge void *)_workQueue);
    }
    
    - (void)dealloc
    {
        _inputStream.delegate = nil;
        _outputStream.delegate = nil;
    
        [_inputStream close];
        [_outputStream close];
    
        if (_receivedHTTPHeaders) {
            CFRelease(_receivedHTTPHeaders);
            _receivedHTTPHeaders = NULL;
        }
    
        SRMutexDestroy(_kvoLock);
    }
    

    pragma mark - Accessors

    #pragma mark readyState
    - (void)setReadyState:(SRReadyState)readyState
    {
        @try {
            SRMutexLock(_kvoLock);   // 开启互斥锁
            if (_readyState != readyState) {  // 当前readyState的值和更新进来 readyState的值不一样时 走下面
                [self willChangeValueForKey:@"readyState"];
                OSSpinLockLock(&_propertyLock);  //开启 自旋锁
                _readyState = readyState;
                OSSpinLockUnlock(&_propertyLock);  // 解除自旋锁
                [self didChangeValueForKey:@"readyState"]; 
            }
        }
        @finally {
            SRMutexUnlock(_kvoLock); // 解除互斥锁
        }
    }
    
    - (SRReadyState)readyState
    {
        SRReadyState state = 0;
        OSSpinLockLock(&_propertyLock); // 开启自旋锁
        state = _readyState;
        OSSpinLockUnlock(&_propertyLock); // 解除自旋锁
        return state;
    }
    
    + (BOOL)automaticallyNotifiesObserversOfReadyState {
        return NO;
    }
    

    这里用到了几个关键字 @try、@finally

    @try{是处理可能会出现问题的代码 },
    @{catch里面提供了解决问题的代码},
    @finally{里面是一定会执行的代码,通常用来关闭资源}

    这里用到了互斥锁(递归)、和自旋锁,详细的看下面资料

    谈iOS的锁
    不再安全的 OSSpinLock
    Autosar Os:Spinlock基础

    ///--------------------------------------
    #pragma mark - Open / Close
    ///--------------------------------------
    
    - (void)open
    {
        assert(_url);
        NSAssert(self.readyState == SR_CONNECTING, @"Cannot call -(void)open on SRWebSocket more than once.");
    
        _selfRetain = self;
    
        if (_urlRequest.timeoutInterval > 0) {
            dispatch_time_t popTime = dispatch_time(DISPATCH_TIME_NOW, (int64_t)(_urlRequest.timeoutInterval * NSEC_PER_SEC));
            dispatch_after(popTime, dispatch_get_main_queue(), ^{
                if (self.readyState == SR_CONNECTING) {
                    NSError *error = SRErrorWithDomainCodeDescription(NSURLErrorDomain, NSURLErrorTimedOut, @"Timed out connecting to server.");
                    [self _failWithError:error];
                }
            });
        }
    
        _proxyConnect = [[SRProxyConnect alloc] initWithURL:_url]; //设置proxy Connect
    
        __weak typeof(self) wself = self;
        [_proxyConnect openNetworkStreamWithCompletion:^(NSError *error, NSInputStream *readStream, NSOutputStream *writeStream) {
            [wself _connectionDoneWithError:error readStream:readStream writeStream:writeStream];
        }];
    }
    
    - (void)_connectionDoneWithError:(NSError *)error readStream:(NSInputStream *)readStream writeStream:(NSOutputStream *)writeStream
    {
        if (error != nil) {
            [self _failWithError:error];
        } else {
            _outputStream = writeStream;
            _inputStream = readStream;
    
            _inputStream.delegate = self;
            _outputStream.delegate = self;
            [self _updateSecureStreamOptions];
    
            if (!_scheduledRunloops.count) {
                [self scheduleInRunLoop:[NSRunLoop SR_networkRunLoop] forMode:NSDefaultRunLoopMode];
            }
    
            // If we don't require SSL validation - consider that we connected.
            // Otherwise `didConnect` is called when SSL validation finishes.
            if (!_requestRequiresSSL) {
                dispatch_async(_workQueue, ^{
                    [self didConnect]; // 开启链接~~~~
                });
            }
        }
        // Schedule to run on a work queue, to make sure we don't run this inline and deallocate `self` inside `SRProxyConnect`.
        // TODO: (nlutsenko) Find a better structure for this, maybe Bolts Tasks?
        dispatch_async(_workQueue, ^{
            self->_proxyConnect = nil;
        });
    }
    

    这里的SRProxyConnect 、SR_networkRunloop 看之后的详解【挖坑~】

    - (void)_writeData:(NSData *)data;
    {
        [self assertOnWorkQueue];
    
        if (_closeWhenFinishedWriting) {
            return;
        }
    
        __block NSData *strongData = data;
        dispatch_data_t newData = dispatch_data_create(data.bytes, data.length, nil, ^{ // 使用指定的内存缓冲区创建新的代码块数据对象。
            strongData = nil;
        });
        _outputBuffer = dispatch_data_create_concat(_outputBuffer, newData);
        [self _pumpWriting];
    }
    
    - (void)send:(nullable id)message
    {
        if (!message) {
            [self sendData:nil error:nil]; // Send Data, but it doesn't matter since we are going to send the same text frame with 0 length.
        } else if ([message isKindOfClass:[NSString class]]) {
            [self sendString:message error:nil];
        } else if ([message isKindOfClass:[NSData class]]) {
            [self sendData:message error:nil];
        } else {
            NSAssert(NO, @"Unrecognized message. Not able to send anything other than a String or NSData.");
        }
    }
    

    didConnect

    - (void)didConnect;
    {
        SRDebugLog(@"Connected");
    
        _secKey = SRBase64EncodedStringFromData(SRRandomData(16));
        assert([_secKey length] == 24);
    
        CFHTTPMessageRef message = SRHTTPConnectMessageCreate(_urlRequest,
                                                              _secKey,
                                                              SRWebSocketProtocolVersion,
                                                              self.requestCookies,
                                                              _requestedProtocols);
    
        NSData *messageData = CFBridgingRelease(CFHTTPMessageCopySerializedMessage(message));
    
        CFRelease(message);
    
        [self _writeData:messageData];
        [self _readHTTPHeader];
    }
    

    updateSecureStreamOptions

    - (void)_updateSecureStreamOptions
    {
        if (_requestRequiresSSL) {
            // 设置 TLS 1.2
            SRDebugLog(@"Setting up security for streams.");
            [_securityPolicy updateSecurityOptionsInStream:_inputStream]; 
            [_securityPolicy updateSecurityOptionsInStream:_outputStream];
        }
    
        NSString *networkServiceType = SRStreamNetworkServiceTypeFromURLRequest(_urlRequest);
        if (networkServiceType != nil) {
            [_inputStream setProperty:networkServiceType forKey:NSStreamNetworkServiceType];
            [_outputStream setProperty:networkServiceType forKey:NSStreamNetworkServiceType];
        }
    }
    

    scheduleInRunLoop

    - (void)scheduleInRunLoop:(NSRunLoop *)aRunLoop forMode:(NSString *)mode;
    { //  将 stream  添加到 SR_networkRunLoop 中
        [_outputStream scheduleInRunLoop:aRunLoop forMode:mode];
        [_inputStream scheduleInRunLoop:aRunLoop forMode:mode];
    
        [_scheduledRunloops addObject:@[aRunLoop, mode]];
    }
    
    - (void)unscheduleFromRunLoop:(NSRunLoop *)aRunLoop forMode:(NSString *)mode;
    {
        [_outputStream removeFromRunLoop:aRunLoop forMode:mode];
        [_inputStream removeFromRunLoop:aRunLoop forMode:mode];
    
        [_scheduledRunloops removeObject:@[aRunLoop, mode]];
    }
    

    这里需要补充 【帧协议】这个概念,如下图


    WeChat30d91e4724a5fd1891de94c201cdb242.png

    fin(1 bit): 指该帧是否构成消息的最终帧。大多数情况下,消息适合一个单一的帧,这一点总是默认设置的。实验表明,Firefox在 32k之后创建了第二个帧。

    rsv1rsv2rsv3(1 bit each):必须为0,除非扩展里协商定义了非零值的含义。如果收到一个非零值,并且协商的扩展中没有一个定义这个非零值的含义,那么接受端必须抛出失败连接。

    opcode(4bits): 展示了帧表示什么。以下值目前正在使用:
    0x00: 这个帧继续前面的有效载荷
    0x01:这个帧包含文本数据
    0x02:这个帧包含二进制数据
    0x08:这个帧终止连接
    0x09: 这个帧时一个ping
    0x0a:这个帧是一个pong
    (如果有足够的值为被使用,他们将被保留将来使用)

    mask(1bit): 指示连接是否掩盖。就目前而言,从客户端到服务端的每条消息都必须掩盖,规范就会终止连接

    payload_len(7 bits): 有效载荷的长度。WebSocket的帧有以下长度:
    0-125表示有效载荷的长度。
    126表示以下两个字节表示长度
    127表示接下来的8个字节表示长度。
    所以有效负载的长度在 【7bit,16bit,64bit括号内】

    masking-key(32bits): 从客户端发送到服务端的所有帧都被帧中包含的32位值掩盖。
    payload:最可能被掩盖的实际数值。他的长度是payload_len的长度。


    • (void)closeWithCode:(NSInteger)code reason:(NSString *)reason
    - (void)closeWithCode:(NSInteger)code reason:(NSString *)reason;
    {
        assert(code);
        dispatch_async(_workQueue, ^{
            if (self.readyState == SR_CLOSING || self.readyState == SR_CLOSED) {
                return;
            }
    
            BOOL wasConnecting = self.readyState == SR_CONNECTING;
    
            self.readyState = SR_CLOSING;
    
            SRDebugLog(@"Closing with code %d reason %@", code, reason);
    
            if (wasConnecting) {
                [self closeConnection];
                return;
            }
    
            size_t maxMsgSize = [reason maximumLengthOfBytesUsingEncoding:NSUTF8StringEncoding];
            NSMutableData *mutablePayload = [[NSMutableData alloc] initWithLength:sizeof(uint16_t) + maxMsgSize];
            NSData *payload = mutablePayload;
    
            ((uint16_t *)mutablePayload.mutableBytes)[0] = CFSwapInt16BigToHost((uint16_t)code);
    
            if (reason) {
                NSRange remainingRange = {0};
    
                NSUInteger usedLength = 0;
    
                BOOL success = [reason getBytes:(char *)mutablePayload.mutableBytes + sizeof(uint16_t) maxLength:payload.length - sizeof(uint16_t) usedLength:&usedLength encoding:NSUTF8StringEncoding options:NSStringEncodingConversionExternalRepresentation range:NSMakeRange(0, reason.length) remainingRange:&remainingRange];
    #pragma unused (success)
    
                assert(success);
                assert(remainingRange.length == 0);
    
                if (usedLength != maxMsgSize) {
                    payload = [payload subdataWithRange:NSMakeRange(0, usedLength + sizeof(uint16_t))];
                }
            }
    
    
            [self _sendFrameWithOpcode:SROpCodeConnectionClose data:payload];
        });
    }
    
    - (void)_sendFrameWithOpcode:(SROpCode)opCode data:(NSData *)data
    {
        [self assertOnWorkQueue];
    
        if (!data) {
            return;
        }
    
        size_t payloadLength = data.length;
    
        NSMutableData *frameData = [[NSMutableData alloc] initWithLength:payloadLength + SRFrameHeaderOverhead];
        if (!frameData) {
            [self closeWithCode:SRStatusCodeMessageTooBig reason:@"Message too big"];
            return;
        }
        uint8_t *frameBuffer = (uint8_t *)frameData.mutableBytes;
    
        // set fin
        frameBuffer[0] = SRFinMask | opCode;
    
        // set the mask and header
        frameBuffer[1] |= SRMaskMask;
    
        size_t frameBufferSize = 2;
    
        if (payloadLength < 126) {
            frameBuffer[1] |= payloadLength;
        } else {
            uint64_t declaredPayloadLength = 0;
            size_t declaredPayloadLengthSize = 0;
    
            if (payloadLength <= UINT16_MAX) {
                frameBuffer[1] |= 126;
    
                declaredPayloadLength = CFSwapInt16BigToHost((uint16_t)payloadLength);
                declaredPayloadLengthSize = sizeof(uint16_t);
            } else {
                frameBuffer[1] |= 127;
    
                declaredPayloadLength = CFSwapInt64BigToHost((uint64_t)payloadLength);
                declaredPayloadLengthSize = sizeof(uint64_t);
            }
    
            memcpy((frameBuffer + frameBufferSize), &declaredPayloadLength, declaredPayloadLengthSize);
            frameBufferSize += declaredPayloadLengthSize;
        }
    
        const uint8_t *unmaskedPayloadBuffer = (uint8_t *)data.bytes;
        uint8_t *maskKey = frameBuffer + frameBufferSize;
    
        size_t randomBytesSize = sizeof(uint32_t);
        int result = SecRandomCopyBytes(kSecRandomDefault, randomBytesSize, maskKey);
        if (result != 0) {
            //TODO: (nlutsenko) Check if there was an error.
        }
        frameBufferSize += randomBytesSize;
    
        // Copy and unmask the buffer
        uint8_t *frameBufferPayloadPointer = frameBuffer + frameBufferSize;
    
        memcpy(frameBufferPayloadPointer, unmaskedPayloadBuffer, payloadLength);
        SRMaskBytesSIMD(frameBufferPayloadPointer, payloadLength, maskKey);
        frameBufferSize += payloadLength;
    
        assert(frameBufferSize <= frameData.length);
        frameData.length = frameBufferSize;
    
        [self _writeData:frameData];
    }
    

    相关文章

      网友评论

        本文标题:工欲利其事必先利其器-SocketRocket (二)

        本文链接:https://www.haomeiwen.com/subject/kuldqdtx.html