美文网首页iOS 进阶开发
CocoaAsyncSocket源码分析---Write

CocoaAsyncSocket源码分析---Write

作者: Cooci_和谐学习_不急不躁 | 来源:发表于2018-08-11 14:55 被阅读74次

    本文为CocoaAsyncSocket Write,主要介绍GCDAsyncSpecialPacketGCDAsyncWritePacket类型数据的处理,还有核心写入方法doWriteData三种不同方式的写入
    注:由于该框架源码篇幅过大,且有大部分相对抽象的数据操作逻辑,尽管楼主竭力想要简单的去陈述相关内容,但是阅读起来仍会有一定的难度。如果不是诚心想学习IM相关知识,在这里就可以离场了...

    Write核心逻辑

    iOS- CocoaAsyncSocket源码解析(Connect 上)
    iOS- CocoaAsyncSocket源码解析(Connect 下)
    iOS- CocoaAsyncSocket源码解析(Read 上)
    iOS- CocoaAsyncSocket源码解析(Read 下)

    注:文中涉及代码比较多,建议大家结合源码一起阅读比较容易能加深理解。这里有楼主标注好注释的源码,有需要的可以作为参照:CocoaAsyncSocket源码注释

    我们切入口

    //写数据对外方法
    - (void)writeData:(NSData *)data withTimeout:(NSTimeInterval)timeout tag:(long)tag
    {
        if ([data length] == 0) return;
        
        //初始化写包
        GCDAsyncWritePacket *packet = [[GCDAsyncWritePacket alloc] initWithData:data timeout:timeout tag:tag];
        
        dispatch_async(socketQueue, ^{ @autoreleasepool {
            
            LogTrace();
            
            if ((flags & kSocketStarted) && !(flags & kForbidReadsWrites))
            {
                [writeQueue addObject:packet];
                //离队执行
                [self maybeDequeueWrite];
            }
        }});
        
        // Do not rely on the block being run in order to release the packet,
        // as the queue might get released without the block completing.
    }
    
    

    写法类似Read

    • 初始化写包 :GCDAsyncWritePacket
    • 写入包放入我们的写入队列(数组)[writeQueue addObject:packet];
    • 离队执行 [self maybeDequeueWrite];

    写入包,添加队列没什么讲的了,不太清楚的小伙伴可以参考
    iOS- CocoaAsyncSocket源码解析(Read 上)
    iOS- CocoaAsyncSocket源码解析(Read 下)

    下面重点解析maybeDequeueWrite

    - (void)maybeDequeueWrite
    {
        LogTrace();
        NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
        
        
        // If we're not currently processing a write AND we have an available write stream
        if ((currentWrite == nil) && (flags & kConnected))
        {
            if ([writeQueue count] > 0)
            {
                // Dequeue the next object in the write queue
                currentWrite = [writeQueue objectAtIndex:0];
                [writeQueue removeObjectAtIndex:0];
                
                //TLS
                if ([currentWrite isKindOfClass:[GCDAsyncSpecialPacket class]])
                {
                    LogVerbose(@"Dequeued GCDAsyncSpecialPacket");
                    
                    // Attempt to start TLS
                    flags |= kStartingWriteTLS;
                    
                    // This method won't do anything unless both kStartingReadTLS and kStartingWriteTLS are set
                    [self maybeStartTLS];
                }
                else
                {
                    LogVerbose(@"Dequeued GCDAsyncWritePacket");
                    
                    // Setup write timer (if needed)
                    [self setupWriteTimerWithTimeout:currentWrite->timeout];
                    
                    // Immediately write, if possible
                    [self doWriteData];
                }
            }
            //写超时导致的错误
            else if (flags & kDisconnectAfterWrites)
            {
                //如果没有可读任务,直接关闭socket
                if (flags & kDisconnectAfterReads)
                {
                    if (([readQueue count] == 0) && (currentRead == nil))
                    {
                        [self closeWithError:nil];
                    }
                }
                else
                {
                    [self closeWithError:nil];
                }
            }
        }
    }
    
    • 我们首先做了一些是否连接,写入队列任务是否大于0等等一些判断
    • 接着我们从全局的writeQueue中,拿到第一条任务,去做读取,我们来判断这个任务的类型,如果是GCDAsyncSpecialPacket类型的,我们将开启TLS认证
    • 如果是是我们之前加入队列中的GCDAsyncWritePacket类型,我们则开始读取操作,调用doWriteData
    • 如果没有可读任务,直接关闭socket

    其中 maybeStartTLS我们解析过了,我们就只要来看看核心写入方法:doWriteData

    - (void)doWriteData
    {
        LogTrace();
        
        // This method is called by the writeSource via the socketQueue
        
        //错误,不写
        if ((currentWrite == nil) || (flags & kWritesPaused))
        {
            LogVerbose(@"No currentWrite or kWritesPaused");
            
            // Unable to write at this time
            
            //
            if ([self usingCFStreamForTLS])
            {
                // CFWriteStream only fires once when there is available data.
                // It won't fire again until we've invoked CFWriteStreamWrite.
            }
            else
            {
                // If the writeSource is firing, we need to pause it
                // or else it will continue to fire over and over again.
                
                //如果socket中可接受写数据,防止反复触发写source,挂起
                if (flags & kSocketCanAcceptBytes)
                {
                    [self suspendWriteSource];
                }
            }
            return;
        }
        
        //如果当前socket无法在写数据了
        if (!(flags & kSocketCanAcceptBytes))
        {
            LogVerbose(@"No space available to write...");
            
            // No space available to write.
            
            //如果不是cfstream
            if (![self usingCFStreamForTLS])
            {
                // Need to wait for writeSource to fire and notify us of
                // available space in the socket's internal write buffer.
                //则恢复写source,当有空间去写的时候,会触发回来
                [self resumeWriteSource];
            }
            return;
        }
        
        //如果正在进行TLS认证
        if (flags & kStartingWriteTLS)
        {
            LogVerbose(@"Waiting for SSL/TLS handshake to complete");
            
            // The writeQueue is waiting for SSL/TLS handshake to complete.
            
            if (flags & kStartingReadTLS)
            {
                //如果是安全通道,并且I/O阻塞,那么重新去握手
                if ([self usingSecureTransportForTLS] && lastSSLHandshakeError == errSSLWouldBlock)
                {
                    // We are in the process of a SSL Handshake.
                    // We were waiting for available space in the socket's internal OS buffer to continue writing.
                
                    [self ssl_continueSSLHandshake];
                }
            }
            //说明不走`TLS`了,因为只支持写的TLS
            else
            {
                // We are still waiting for the readQueue to drain and start the SSL/TLS process.
                // We now know we can write to the socket.
                
                //挂起写source
                if (![self usingCFStreamForTLS])
                {
                    // Suspend the write source or else it will continue to fire nonstop.
                    [self suspendWriteSource];
                }
            }
            
            return;
        }
        
        // Note: This method is not called if currentWrite is a GCDAsyncSpecialPacket (startTLS packet)
        
        //开始写数据
        
        BOOL waiting = NO;
        NSError *error = nil;
        size_t bytesWritten = 0;
        
        //安全连接
        if (flags & kSocketSecure)
        {
            //CFStreamForTLS
            if ([self usingCFStreamForTLS])
            {
                #if TARGET_OS_IPHONE
                
                // 
                // Writing data using CFStream (over internal TLS)
                // 
                
                const uint8_t *buffer = (const uint8_t *)[currentWrite->buffer bytes] + currentWrite->bytesDone;
                
                //写的长度为buffer长度-已写长度
                NSUInteger bytesToWrite = [currentWrite->buffer length] - currentWrite->bytesDone;
                
                if (bytesToWrite > SIZE_MAX) // NSUInteger may be bigger than size_t (write param 3)
                {
                    bytesToWrite = SIZE_MAX;
                }
                //往writeStream中写入数据, bytesToWrite写入的长度
                CFIndex result = CFWriteStreamWrite(writeStream, buffer, (CFIndex)bytesToWrite);
                LogVerbose(@"CFWriteStreamWrite(%lu) = %li", (unsigned long)bytesToWrite, result);
            
                //写错误
                if (result < 0)
                {
                    error = (__bridge_transfer NSError *)CFWriteStreamCopyError(writeStream);
                }
                else
                {
                    //拿到已写字节数
                    bytesWritten = (size_t)result;
                    
                    // We always set waiting to true in this scenario.
                    //我们经常设置等待来信任这个方案
                    // CFStream may have altered our underlying socket to non-blocking.
                    //CFStream很可能修改socket为非阻塞
                    // Thus if we attempt to write without a callback, we may end up blocking our queue.
                    //因此,我们尝试去写,而不用回调。 我们可能终止我们的队列。
                    waiting = YES;
                }
                
                #endif
            }
            //SSL写的方式
            else
            {
                // We're going to use the SSLWrite function.
                // 
                // OSStatus SSLWrite(SSLContextRef context, const void *data, size_t dataLength, size_t *processed)
                // 
                // Parameters:
                // context     - An SSL session context reference.
                // data        - A pointer to the buffer of data to write.
                // dataLength  - The amount, in bytes, of data to write.
                // processed   - On return, the length, in bytes, of the data actually written.
                // 
                // It sounds pretty straight-forward,
                //看起来相当直观,但是这里警告你应注意。
                // but there are a few caveats you should be aware of.
                // 
                // The SSLWrite method operates in a non-obvious (and rather annoying) manner.
                // According to the documentation:
                // 这个SSLWrite方法使用着一个不明显的方法(相当讨厌)导致了下面这些事。
                //   Because you may configure the underlying connection to operate in a non-blocking manner,
                //因为你要辨别出下层连接 操纵 非阻塞的方法,一个写的操作将返回errSSLWouldBlock,表明需要写的数据少了。
                //   a write operation might return errSSLWouldBlock, indicating that less data than requested
                //   was actually transferred. In this case, you should repeat the call to SSLWrite until some
                //在这种情况下你应该重复调用SSLWrite,直到一些其他结果被返回
                //   other result is returned.
                // This sounds perfect, but when our SSLWriteFunction returns errSSLWouldBlock,
                //这样听起来很完美,但是当SSLWriteFunction返回errSSLWouldBlock,SSLWrite返回但是却设置了进度长度?
                // then the SSLWrite method returns (with the proper errSSLWouldBlock return value),
                // but it sets processed to dataLength !!
                // 
                // In other words, if the SSLWrite function doesn't completely write all the data we tell it to,
                //另外,SSLWrite方法没有完整的写完我们给的所有数据,因此它没有告诉我们到底写了多少数据,
                // then it doesn't tell us how many bytes were actually written. So, for example, if we tell it to
                //因此。举个例子,如果我们告诉它去写256个字节,它可能只写了128个字节,但是告诉我们写了0个字节
                // write 256 bytes then it might actually write 128 bytes, but then report 0 bytes written.
                // 
                // You might be wondering:
                //你可能会觉得奇怪,如果这个方法不告诉我们写了多少字节,那么该如何去更新参数来应对下一次的SSLWrite?
                // If the SSLWrite function doesn't tell us how many bytes were written,
                // then how in the world are we supposed to update our parameters (buffer & bytesToWrite)
                // for the next time we invoke SSLWrite?
                // 
                // The answer is that SSLWrite cached all the data we told it to write,
                //答案就是,SSLWrite缓存了所有的数据我们要它写的。并且拉出这些数据,只要我们下次调用SSLWrite。
                // and it will push out that data next time we call SSLWrite.
                
                // If we call SSLWrite with new data, it will push out the cached data first, and then the new data.
                //如果我们用新的data调用SSLWrite,它会拉出这些缓存的数据,然后才轮到新数据
                // If we call SSLWrite with empty data, then it will simply push out the cached data.
                // 如果我们调用SSLWrite用一个空的数据,则它仅仅会拉出缓存数据。
                // For this purpose we're going to break large writes into a series of smaller writes.
                //为了这个目的,我们去分开一个大数据写成一连串的小数据,它允许我们去报告进度给代理。
                // This allows us to report progress back to the delegate.
                
                OSStatus result;
                
                //SSL缓存的写的数据
                BOOL hasCachedDataToWrite = (sslWriteCachedLength > 0);
                //是否有新数据要写
                BOOL hasNewDataToWrite = YES;
                
                if (hasCachedDataToWrite)
                {
                    size_t processed = 0;
                    
                    //去写空指针,就是拉取了所有的缓存SSL数据
                    result = SSLWrite(sslContext, NULL, 0, &processed);
                    
                    //如果写成功
                    if (result == noErr)
                    {
                        //拿到写的缓存长度
                        bytesWritten = sslWriteCachedLength;
                        //置空缓存长度
                        sslWriteCachedLength =  0;
                        //判断当前需要写的buffer长度,是否和已写的大小+缓存 大小相等
                        if ([currentWrite->buffer length] == (currentWrite->bytesDone + bytesWritten))
                        {
                            // We've written all data for the current write.
                            //相同则不需要再写新数据了
                            hasNewDataToWrite = NO;
                        }
                    }
                    //有错
                    else
                    {
                        //IO阻塞,等待
                        if (result == errSSLWouldBlock)
                        {
                            waiting = YES;
                        }
                        //报错
                        else
                        {
                            error = [self sslError:result];
                        }
                        
                        // Can't write any new data since we were unable to write the cached data.
                        //如果读写cache出错,我们暂时不能去读后面的数据
                        hasNewDataToWrite = NO;
                    }
                }
                
                //如果还有数据去读
                if (hasNewDataToWrite)
                {
                    //拿到buffer偏移位置
                    const uint8_t *buffer = (const uint8_t *)[currentWrite->buffer bytes]
                                                            + currentWrite->bytesDone
                                                            + bytesWritten;
                    
                    //得到需要读的长度
                    NSUInteger bytesToWrite = [currentWrite->buffer length] - currentWrite->bytesDone - bytesWritten;
                    //如果大于最大值,就等于最大值
                    if (bytesToWrite > SIZE_MAX) // NSUInteger may be bigger than size_t (write param 3)
                    {
                        bytesToWrite = SIZE_MAX;
                    }
                    
                    size_t bytesRemaining = bytesToWrite;
                    
                    //循环值
                    BOOL keepLooping = YES;
                    while (keepLooping)
                    {
                        //最大写的字节数?
                        const size_t sslMaxBytesToWrite = 32768;
                        //得到二者小的,得到需要写的字节数
                        size_t sslBytesToWrite = MIN(bytesRemaining, sslMaxBytesToWrite);
                        //已写字节数
                        size_t sslBytesWritten = 0;
                        
                        //将结果从buffer中写到socket上(经由了这个函数,数据就加密了)
                        result = SSLWrite(sslContext, buffer, sslBytesToWrite, &sslBytesWritten);
                        
                        //如果写成功
                        if (result == noErr)
                        {
                            //buffer指针偏移
                            buffer += sslBytesWritten;
                            //加上些的数量
                            bytesWritten += sslBytesWritten;
                            //减去仍需写的数量
                            bytesRemaining -= sslBytesWritten;
                            //判断是否需要继续循环
                            keepLooping = (bytesRemaining > 0);
                        }
                        else
                        {
                            //IO阻塞
                            if (result == errSSLWouldBlock)
                            {
                                waiting = YES;
                                //得到缓存的大小(后续长度会被自己写到SSL缓存去)
                                sslWriteCachedLength = sslBytesToWrite;
                            }
                            else
                            {
                                error = [self sslError:result];
                            }
                            
                            //跳出循环
                            keepLooping = NO;
                        }
                        
                    } // while (keepLooping)
                    
                } // if (hasNewDataToWrite)
            }
        }
        
        //普通socket
        else
        {
            // 
            // Writing data directly over raw socket
            // 
            
            //拿到当前socket
            int socketFD = (socket4FD != SOCKET_NULL) ? socket4FD : (socket6FD != SOCKET_NULL) ? socket6FD : socketUN;
            
            //得到指针偏移
            const uint8_t *buffer = (const uint8_t *)[currentWrite->buffer bytes] + currentWrite->bytesDone;
            
            NSUInteger bytesToWrite = [currentWrite->buffer length] - currentWrite->bytesDone;
            
            if (bytesToWrite > SIZE_MAX) // NSUInteger may be bigger than size_t (write param 3)
            {
                bytesToWrite = SIZE_MAX;
            }
            //直接写
            ssize_t result = write(socketFD, buffer, (size_t)bytesToWrite);
            LogVerbose(@"wrote to socket = %zd", result);
            
            // Check results
            if (result < 0)
            {
                //IO阻塞
                if (errno == EWOULDBLOCK)
                {
                    waiting = YES;
                }
                else
                {
                    error = [self errnoErrorWithReason:@"Error in write() function"];
                }
            }
            else
            {
                //得到写的大小
                bytesWritten = result;
            }
        }
        
        // We're done with our writing.
        // If we explictly ran into a situation where the socket told us there was no room in the buffer,
        // then we immediately resume listening for notifications.
        // 
        // We must do this before we dequeue another write,
        // as that may in turn invoke this method again.
        // 
        // Note that if CFStream is involved, it may have maliciously put our socket in blocking mode.
        //注意,如果用CFStream,很可能会被恶意的放置数据 阻塞socket
        
        //如果等待,则恢复写source
        if (waiting)
        {
            //把socket可接受数据的标记去掉
            flags &= ~kSocketCanAcceptBytes;
            
            if (![self usingCFStreamForTLS])
            {
                //恢复写source
                [self resumeWriteSource];
            }
        }
        
        // Check our results
        
        //判断是否完成
        BOOL done = NO;
        //判断已写大小
        if (bytesWritten > 0)
        {
            // Update total amount read for the current write
            //更新当前总共写的大小
            currentWrite->bytesDone += bytesWritten;
            LogVerbose(@"currentWrite->bytesDone = %lu", (unsigned long)currentWrite->bytesDone);
            
            // Is packet done?
            //判断当前写包是否写完
            done = (currentWrite->bytesDone == [currentWrite->buffer length]);
        }
        
        //如果完成了
        if (done)
        {
            //完成操作
            [self completeCurrentWrite];
            
            if (!error)
            {
                dispatch_async(socketQueue, ^{ @autoreleasepool{
                    //开始下一次的读取任务
                    [self maybeDequeueWrite];
                }});
            }
        }
        //未完成
        else
        {
            // We were unable to finish writing the data,
            // so we're waiting for another callback to notify us of available space in the lower-level output buffer.
            //如果不是等待 而且没有出错
            if (!waiting && !error)
            {
                // This would be the case if our write was able to accept some data, but not all of it.
                //这是我们写了一部分数据的情况。
                
                //去掉可接受数据的标记
                flags &= ~kSocketCanAcceptBytes;
                //再去等读source触发
                if (![self usingCFStreamForTLS])
                {
                    [self resumeWriteSource];
                }
            }
            
            //如果已写大于0
            if (bytesWritten > 0)
            {
                // We're not done with the entire write, but we have written some bytes
                
                __strong id theDelegate = delegate;
    
                //调用写的进度代理
                if (delegateQueue && [theDelegate respondsToSelector:@selector(socket:didWritePartialDataOfLength:tag:)])
                {
                    long theWriteTag = currentWrite->tag;
                    
                    dispatch_async(delegateQueue, ^{ @autoreleasepool {
                        
                        [theDelegate socket:self didWritePartialDataOfLength:bytesWritten tag:theWriteTag];
                    }});
                }
            }
        }
        
        // Check for errors
        //如果有错,则报错断开连接
        if (error)
        {
            [self closeWithError:[self errnoErrorWithReason:@"Error in write() function"]];
        }
        
        // Do not add any code here without first adding a return statement in the error case above.
    }
    
    • 这里不同doRead的是没有提前通过flush写入链路层
    • 如果socket中可接受写数据,防止反复触发写source,挂起
    • 如果当前socket无法在写数据了,则恢复写source,当有空间去写的时候,会触发回来
    • 如果正在进行TLS认证 如果是安全通道,并且I/O阻塞,那么重新去握手


    • 下面是写入的三种方式

      • CFStreamForTLS
    • SSL写的方式
    if (hasNewDataToWrite)
                {
                    //拿到buffer偏移位置
                    const uint8_t *buffer = (const uint8_t *)[currentWrite->buffer bytes]
                                                            + currentWrite->bytesDone
                                                            + bytesWritten;
                    
                    //得到需要读的长度
                    NSUInteger bytesToWrite = [currentWrite->buffer length] - currentWrite->bytesDone - bytesWritten;
                    //如果大于最大值,就等于最大值
                    if (bytesToWrite > SIZE_MAX) // NSUInteger may be bigger than size_t (write param 3)
                    {
                        bytesToWrite = SIZE_MAX;
                    }
                    
                    size_t bytesRemaining = bytesToWrite;
                    
                    //循环值
                    BOOL keepLooping = YES;
                    while (keepLooping)
                    {
                        //最大写的字节数?
                        const size_t sslMaxBytesToWrite = 32768;
                        //得到二者小的,得到需要写的字节数
                        size_t sslBytesToWrite = MIN(bytesRemaining, sslMaxBytesToWrite);
                        //已写字节数
                        size_t sslBytesWritten = 0;
                        
                        //将结果从buffer中写到socket上(经由了这个函数,数据就加密了)
                        result = SSLWrite(sslContext, buffer, sslBytesToWrite, &sslBytesWritten);
                        
                        //如果写成功
                        if (result == noErr)
                        {
                            //buffer指针偏移
                            buffer += sslBytesWritten;
                            //加上些的数量
                            bytesWritten += sslBytesWritten;
                            //减去仍需写的数量
                            bytesRemaining -= sslBytesWritten;
                            //判断是否需要继续循环
                            keepLooping = (bytesRemaining > 0);
                        }
                        else
                        {
                            //IO阻塞
                            if (result == errSSLWouldBlock)
                            {
                                waiting = YES;
                                //得到缓存的大小(后续长度会被自己写到SSL缓存去)
                                sslWriteCachedLength = sslBytesToWrite;
                            }
                            else
                            {
                                error = [self sslError:result];
                            }
                            
                            //跳出循环
                            keepLooping = NO;
                        }
                        
                    } // while (keepLooping)
    

    这里还有对残余数据的处理:是通过指针buffer获取我们的keepLooping循环值,循环进行写入

     //将结果从buffer中写到socket上(经由了这个函数,数据就加密了)
    result = SSLWrite(sslContext, buffer, sslBytesToWrite, &sslBytesWritten);
    
    
    • 普通socket写入
    • 也做了完成判断
    //判断是否完成
        BOOL done = NO;
        //判断已写大小
        if (bytesWritten > 0)
        {
            // Update total amount read for the current write
            //更新当前总共写的大小
            currentWrite->bytesDone += bytesWritten;
            LogVerbose(@"currentWrite->bytesDone = %lu", (unsigned long)currentWrite->bytesDone);
            
            // Is packet done?
            //判断当前写包是否写完
            done = (currentWrite->bytesDone == [currentWrite->buffer length]);
        }
    
    

    同样为的也是三种数据包:一次性包,粘包,断包

      //如果完成了
        if (done)
        {
            //完成操作
            [self completeCurrentWrite];
            
            if (!error)
            {
                dispatch_async(socketQueue, ^{ @autoreleasepool{
                    //开始下一次的读取任务
                    [self maybeDequeueWrite];
                }});
            }
        }
        //未完成
        else
        {
            // We were unable to finish writing the data,
            // so we're waiting for another callback to notify us of available space in the lower-level output buffer.
            //如果不是等待 而且没有出错
            if (!waiting && !error)
            {
                // This would be the case if our write was able to accept some data, but not all of it.
                //这是我们写了一部分数据的情况。
                
                //去掉可接受数据的标记
                flags &= ~kSocketCanAcceptBytes;
                //再去等读source触发
                if (![self usingCFStreamForTLS])
                {
                    [self resumeWriteSource];
                }
            }
            
            //如果已写大于0
            if (bytesWritten > 0)
            {
                // We're not done with the entire write, but we have written some bytes
                
                __strong id theDelegate = delegate;
    
                //调用写的进度代理
                if (delegateQueue && [theDelegate respondsToSelector:@selector(socket:didWritePartialDataOfLength:tag:)])
                {
                    long theWriteTag = currentWrite->tag;
                    
                    dispatch_async(delegateQueue, ^{ @autoreleasepool {
                        
                        [theDelegate socket:self didWritePartialDataOfLength:bytesWritten tag:theWriteTag];
                    }});
                }
            }
        }
        
    

    那么整个 CocoaAsyncSocket Wirte的解析就到这里完成了,当你读完前面几篇,再来看这篇就跟喝水一样,故:知识在于积累

    感谢大神涂耀辉

    相关文章

      网友评论

        本文标题:CocoaAsyncSocket源码分析---Write

        本文链接:https://www.haomeiwen.com/subject/qljdbftx.html