Write核心逻辑本文为
CocoaAsyncSocket
Write,主要介绍GCDAsyncSpecialPacket
和GCDAsyncWritePacket
类型数据的处理,还有核心写入方法doWriteData
三种不同方式的写入
注:由于该框架源码篇幅过大,且有大部分相对抽象的数据操作逻辑,尽管楼主竭力想要简单的去陈述相关内容,但是阅读起来仍会有一定的难度。如果不是诚心想学习IM
相关知识,在这里就可以离场了...
iOS- CocoaAsyncSocket源码解析(Connect 上)
iOS- CocoaAsyncSocket源码解析(Connect 下)
iOS- CocoaAsyncSocket源码解析(Read 上)
iOS- CocoaAsyncSocket源码解析(Read 下)
注:文中涉及代码比较多,建议大家结合源码一起阅读比较容易能加深理解。这里有楼主标注好注释的源码,有需要的可以作为参照:CocoaAsyncSocket源码注释
我们切入口
//写数据对外方法
- (void)writeData:(NSData *)data withTimeout:(NSTimeInterval)timeout tag:(long)tag
{
if ([data length] == 0) return;
//初始化写包
GCDAsyncWritePacket *packet = [[GCDAsyncWritePacket alloc] initWithData:data timeout:timeout tag:tag];
dispatch_async(socketQueue, ^{ @autoreleasepool {
LogTrace();
if ((flags & kSocketStarted) && !(flags & kForbidReadsWrites))
{
[writeQueue addObject:packet];
//离队执行
[self maybeDequeueWrite];
}
}});
// Do not rely on the block being run in order to release the packet,
// as the queue might get released without the block completing.
}
写法类似Read
- 初始化写包 :
GCDAsyncWritePacket
- 写入包放入我们的写入队列(数组)
[writeQueue addObject:packet];
- 离队执行
[self maybeDequeueWrite];
写入包,添加队列没什么讲的了,不太清楚的小伙伴可以参考
iOS- CocoaAsyncSocket源码解析(Read 上)
iOS- CocoaAsyncSocket源码解析(Read 下)
下面重点解析maybeDequeueWrite
- (void)maybeDequeueWrite
{
LogTrace();
NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
// If we're not currently processing a write AND we have an available write stream
if ((currentWrite == nil) && (flags & kConnected))
{
if ([writeQueue count] > 0)
{
// Dequeue the next object in the write queue
currentWrite = [writeQueue objectAtIndex:0];
[writeQueue removeObjectAtIndex:0];
//TLS
if ([currentWrite isKindOfClass:[GCDAsyncSpecialPacket class]])
{
LogVerbose(@"Dequeued GCDAsyncSpecialPacket");
// Attempt to start TLS
flags |= kStartingWriteTLS;
// This method won't do anything unless both kStartingReadTLS and kStartingWriteTLS are set
[self maybeStartTLS];
}
else
{
LogVerbose(@"Dequeued GCDAsyncWritePacket");
// Setup write timer (if needed)
[self setupWriteTimerWithTimeout:currentWrite->timeout];
// Immediately write, if possible
[self doWriteData];
}
}
//写超时导致的错误
else if (flags & kDisconnectAfterWrites)
{
//如果没有可读任务,直接关闭socket
if (flags & kDisconnectAfterReads)
{
if (([readQueue count] == 0) && (currentRead == nil))
{
[self closeWithError:nil];
}
}
else
{
[self closeWithError:nil];
}
}
}
}
- 我们首先做了一些是否连接,写入队列任务是否大于0等等一些判断
- 接着我们从全局的
writeQueue
中,拿到第一条任务,去做读取,我们来判断这个任务的类型,如果是GCDAsyncSpecialPacket
类型的,我们将开启TLS认证 - 如果是是我们之前加入队列中的
GCDAsyncWritePacket
类型,我们则开始读取操作,调用doWriteData
- 如果没有可读任务,直接关闭socket
其中 maybeStartTLS
我们解析过了,我们就只要来看看核心写入方法:doWriteData
- (void)doWriteData
{
LogTrace();
// This method is called by the writeSource via the socketQueue
//错误,不写
if ((currentWrite == nil) || (flags & kWritesPaused))
{
LogVerbose(@"No currentWrite or kWritesPaused");
// Unable to write at this time
//
if ([self usingCFStreamForTLS])
{
// CFWriteStream only fires once when there is available data.
// It won't fire again until we've invoked CFWriteStreamWrite.
}
else
{
// If the writeSource is firing, we need to pause it
// or else it will continue to fire over and over again.
//如果socket中可接受写数据,防止反复触发写source,挂起
if (flags & kSocketCanAcceptBytes)
{
[self suspendWriteSource];
}
}
return;
}
//如果当前socket无法在写数据了
if (!(flags & kSocketCanAcceptBytes))
{
LogVerbose(@"No space available to write...");
// No space available to write.
//如果不是cfstream
if (![self usingCFStreamForTLS])
{
// Need to wait for writeSource to fire and notify us of
// available space in the socket's internal write buffer.
//则恢复写source,当有空间去写的时候,会触发回来
[self resumeWriteSource];
}
return;
}
//如果正在进行TLS认证
if (flags & kStartingWriteTLS)
{
LogVerbose(@"Waiting for SSL/TLS handshake to complete");
// The writeQueue is waiting for SSL/TLS handshake to complete.
if (flags & kStartingReadTLS)
{
//如果是安全通道,并且I/O阻塞,那么重新去握手
if ([self usingSecureTransportForTLS] && lastSSLHandshakeError == errSSLWouldBlock)
{
// We are in the process of a SSL Handshake.
// We were waiting for available space in the socket's internal OS buffer to continue writing.
[self ssl_continueSSLHandshake];
}
}
//说明不走`TLS`了,因为只支持写的TLS
else
{
// We are still waiting for the readQueue to drain and start the SSL/TLS process.
// We now know we can write to the socket.
//挂起写source
if (![self usingCFStreamForTLS])
{
// Suspend the write source or else it will continue to fire nonstop.
[self suspendWriteSource];
}
}
return;
}
// Note: This method is not called if currentWrite is a GCDAsyncSpecialPacket (startTLS packet)
//开始写数据
BOOL waiting = NO;
NSError *error = nil;
size_t bytesWritten = 0;
//安全连接
if (flags & kSocketSecure)
{
//CFStreamForTLS
if ([self usingCFStreamForTLS])
{
#if TARGET_OS_IPHONE
//
// Writing data using CFStream (over internal TLS)
//
const uint8_t *buffer = (const uint8_t *)[currentWrite->buffer bytes] + currentWrite->bytesDone;
//写的长度为buffer长度-已写长度
NSUInteger bytesToWrite = [currentWrite->buffer length] - currentWrite->bytesDone;
if (bytesToWrite > SIZE_MAX) // NSUInteger may be bigger than size_t (write param 3)
{
bytesToWrite = SIZE_MAX;
}
//往writeStream中写入数据, bytesToWrite写入的长度
CFIndex result = CFWriteStreamWrite(writeStream, buffer, (CFIndex)bytesToWrite);
LogVerbose(@"CFWriteStreamWrite(%lu) = %li", (unsigned long)bytesToWrite, result);
//写错误
if (result < 0)
{
error = (__bridge_transfer NSError *)CFWriteStreamCopyError(writeStream);
}
else
{
//拿到已写字节数
bytesWritten = (size_t)result;
// We always set waiting to true in this scenario.
//我们经常设置等待来信任这个方案
// CFStream may have altered our underlying socket to non-blocking.
//CFStream很可能修改socket为非阻塞
// Thus if we attempt to write without a callback, we may end up blocking our queue.
//因此,我们尝试去写,而不用回调。 我们可能终止我们的队列。
waiting = YES;
}
#endif
}
//SSL写的方式
else
{
// We're going to use the SSLWrite function.
//
// OSStatus SSLWrite(SSLContextRef context, const void *data, size_t dataLength, size_t *processed)
//
// Parameters:
// context - An SSL session context reference.
// data - A pointer to the buffer of data to write.
// dataLength - The amount, in bytes, of data to write.
// processed - On return, the length, in bytes, of the data actually written.
//
// It sounds pretty straight-forward,
//看起来相当直观,但是这里警告你应注意。
// but there are a few caveats you should be aware of.
//
// The SSLWrite method operates in a non-obvious (and rather annoying) manner.
// According to the documentation:
// 这个SSLWrite方法使用着一个不明显的方法(相当讨厌)导致了下面这些事。
// Because you may configure the underlying connection to operate in a non-blocking manner,
//因为你要辨别出下层连接 操纵 非阻塞的方法,一个写的操作将返回errSSLWouldBlock,表明需要写的数据少了。
// a write operation might return errSSLWouldBlock, indicating that less data than requested
// was actually transferred. In this case, you should repeat the call to SSLWrite until some
//在这种情况下你应该重复调用SSLWrite,直到一些其他结果被返回
// other result is returned.
// This sounds perfect, but when our SSLWriteFunction returns errSSLWouldBlock,
//这样听起来很完美,但是当SSLWriteFunction返回errSSLWouldBlock,SSLWrite返回但是却设置了进度长度?
// then the SSLWrite method returns (with the proper errSSLWouldBlock return value),
// but it sets processed to dataLength !!
//
// In other words, if the SSLWrite function doesn't completely write all the data we tell it to,
//另外,SSLWrite方法没有完整的写完我们给的所有数据,因此它没有告诉我们到底写了多少数据,
// then it doesn't tell us how many bytes were actually written. So, for example, if we tell it to
//因此。举个例子,如果我们告诉它去写256个字节,它可能只写了128个字节,但是告诉我们写了0个字节
// write 256 bytes then it might actually write 128 bytes, but then report 0 bytes written.
//
// You might be wondering:
//你可能会觉得奇怪,如果这个方法不告诉我们写了多少字节,那么该如何去更新参数来应对下一次的SSLWrite?
// If the SSLWrite function doesn't tell us how many bytes were written,
// then how in the world are we supposed to update our parameters (buffer & bytesToWrite)
// for the next time we invoke SSLWrite?
//
// The answer is that SSLWrite cached all the data we told it to write,
//答案就是,SSLWrite缓存了所有的数据我们要它写的。并且拉出这些数据,只要我们下次调用SSLWrite。
// and it will push out that data next time we call SSLWrite.
// If we call SSLWrite with new data, it will push out the cached data first, and then the new data.
//如果我们用新的data调用SSLWrite,它会拉出这些缓存的数据,然后才轮到新数据
// If we call SSLWrite with empty data, then it will simply push out the cached data.
// 如果我们调用SSLWrite用一个空的数据,则它仅仅会拉出缓存数据。
// For this purpose we're going to break large writes into a series of smaller writes.
//为了这个目的,我们去分开一个大数据写成一连串的小数据,它允许我们去报告进度给代理。
// This allows us to report progress back to the delegate.
OSStatus result;
//SSL缓存的写的数据
BOOL hasCachedDataToWrite = (sslWriteCachedLength > 0);
//是否有新数据要写
BOOL hasNewDataToWrite = YES;
if (hasCachedDataToWrite)
{
size_t processed = 0;
//去写空指针,就是拉取了所有的缓存SSL数据
result = SSLWrite(sslContext, NULL, 0, &processed);
//如果写成功
if (result == noErr)
{
//拿到写的缓存长度
bytesWritten = sslWriteCachedLength;
//置空缓存长度
sslWriteCachedLength = 0;
//判断当前需要写的buffer长度,是否和已写的大小+缓存 大小相等
if ([currentWrite->buffer length] == (currentWrite->bytesDone + bytesWritten))
{
// We've written all data for the current write.
//相同则不需要再写新数据了
hasNewDataToWrite = NO;
}
}
//有错
else
{
//IO阻塞,等待
if (result == errSSLWouldBlock)
{
waiting = YES;
}
//报错
else
{
error = [self sslError:result];
}
// Can't write any new data since we were unable to write the cached data.
//如果读写cache出错,我们暂时不能去读后面的数据
hasNewDataToWrite = NO;
}
}
//如果还有数据去读
if (hasNewDataToWrite)
{
//拿到buffer偏移位置
const uint8_t *buffer = (const uint8_t *)[currentWrite->buffer bytes]
+ currentWrite->bytesDone
+ bytesWritten;
//得到需要读的长度
NSUInteger bytesToWrite = [currentWrite->buffer length] - currentWrite->bytesDone - bytesWritten;
//如果大于最大值,就等于最大值
if (bytesToWrite > SIZE_MAX) // NSUInteger may be bigger than size_t (write param 3)
{
bytesToWrite = SIZE_MAX;
}
size_t bytesRemaining = bytesToWrite;
//循环值
BOOL keepLooping = YES;
while (keepLooping)
{
//最大写的字节数?
const size_t sslMaxBytesToWrite = 32768;
//得到二者小的,得到需要写的字节数
size_t sslBytesToWrite = MIN(bytesRemaining, sslMaxBytesToWrite);
//已写字节数
size_t sslBytesWritten = 0;
//将结果从buffer中写到socket上(经由了这个函数,数据就加密了)
result = SSLWrite(sslContext, buffer, sslBytesToWrite, &sslBytesWritten);
//如果写成功
if (result == noErr)
{
//buffer指针偏移
buffer += sslBytesWritten;
//加上些的数量
bytesWritten += sslBytesWritten;
//减去仍需写的数量
bytesRemaining -= sslBytesWritten;
//判断是否需要继续循环
keepLooping = (bytesRemaining > 0);
}
else
{
//IO阻塞
if (result == errSSLWouldBlock)
{
waiting = YES;
//得到缓存的大小(后续长度会被自己写到SSL缓存去)
sslWriteCachedLength = sslBytesToWrite;
}
else
{
error = [self sslError:result];
}
//跳出循环
keepLooping = NO;
}
} // while (keepLooping)
} // if (hasNewDataToWrite)
}
}
//普通socket
else
{
//
// Writing data directly over raw socket
//
//拿到当前socket
int socketFD = (socket4FD != SOCKET_NULL) ? socket4FD : (socket6FD != SOCKET_NULL) ? socket6FD : socketUN;
//得到指针偏移
const uint8_t *buffer = (const uint8_t *)[currentWrite->buffer bytes] + currentWrite->bytesDone;
NSUInteger bytesToWrite = [currentWrite->buffer length] - currentWrite->bytesDone;
if (bytesToWrite > SIZE_MAX) // NSUInteger may be bigger than size_t (write param 3)
{
bytesToWrite = SIZE_MAX;
}
//直接写
ssize_t result = write(socketFD, buffer, (size_t)bytesToWrite);
LogVerbose(@"wrote to socket = %zd", result);
// Check results
if (result < 0)
{
//IO阻塞
if (errno == EWOULDBLOCK)
{
waiting = YES;
}
else
{
error = [self errnoErrorWithReason:@"Error in write() function"];
}
}
else
{
//得到写的大小
bytesWritten = result;
}
}
// We're done with our writing.
// If we explictly ran into a situation where the socket told us there was no room in the buffer,
// then we immediately resume listening for notifications.
//
// We must do this before we dequeue another write,
// as that may in turn invoke this method again.
//
// Note that if CFStream is involved, it may have maliciously put our socket in blocking mode.
//注意,如果用CFStream,很可能会被恶意的放置数据 阻塞socket
//如果等待,则恢复写source
if (waiting)
{
//把socket可接受数据的标记去掉
flags &= ~kSocketCanAcceptBytes;
if (![self usingCFStreamForTLS])
{
//恢复写source
[self resumeWriteSource];
}
}
// Check our results
//判断是否完成
BOOL done = NO;
//判断已写大小
if (bytesWritten > 0)
{
// Update total amount read for the current write
//更新当前总共写的大小
currentWrite->bytesDone += bytesWritten;
LogVerbose(@"currentWrite->bytesDone = %lu", (unsigned long)currentWrite->bytesDone);
// Is packet done?
//判断当前写包是否写完
done = (currentWrite->bytesDone == [currentWrite->buffer length]);
}
//如果完成了
if (done)
{
//完成操作
[self completeCurrentWrite];
if (!error)
{
dispatch_async(socketQueue, ^{ @autoreleasepool{
//开始下一次的读取任务
[self maybeDequeueWrite];
}});
}
}
//未完成
else
{
// We were unable to finish writing the data,
// so we're waiting for another callback to notify us of available space in the lower-level output buffer.
//如果不是等待 而且没有出错
if (!waiting && !error)
{
// This would be the case if our write was able to accept some data, but not all of it.
//这是我们写了一部分数据的情况。
//去掉可接受数据的标记
flags &= ~kSocketCanAcceptBytes;
//再去等读source触发
if (![self usingCFStreamForTLS])
{
[self resumeWriteSource];
}
}
//如果已写大于0
if (bytesWritten > 0)
{
// We're not done with the entire write, but we have written some bytes
__strong id theDelegate = delegate;
//调用写的进度代理
if (delegateQueue && [theDelegate respondsToSelector:@selector(socket:didWritePartialDataOfLength:tag:)])
{
long theWriteTag = currentWrite->tag;
dispatch_async(delegateQueue, ^{ @autoreleasepool {
[theDelegate socket:self didWritePartialDataOfLength:bytesWritten tag:theWriteTag];
}});
}
}
}
// Check for errors
//如果有错,则报错断开连接
if (error)
{
[self closeWithError:[self errnoErrorWithReason:@"Error in write() function"]];
}
// Do not add any code here without first adding a return statement in the error case above.
}
- 这里不同
doRead
的是没有提前通过flush写入链路层 - 如果socket中可接受写数据,防止反复触发写source,挂起
- 如果当前socket无法在写数据了,则恢复写source,当有空间去写的时候,会触发回来
-
如果正在进行TLS认证 如果是安全通道,并且I/O阻塞,那么重新去握手
-
下面是写入的三种方式
CFStreamForTLS
- SSL写的方式
if (hasNewDataToWrite)
{
//拿到buffer偏移位置
const uint8_t *buffer = (const uint8_t *)[currentWrite->buffer bytes]
+ currentWrite->bytesDone
+ bytesWritten;
//得到需要读的长度
NSUInteger bytesToWrite = [currentWrite->buffer length] - currentWrite->bytesDone - bytesWritten;
//如果大于最大值,就等于最大值
if (bytesToWrite > SIZE_MAX) // NSUInteger may be bigger than size_t (write param 3)
{
bytesToWrite = SIZE_MAX;
}
size_t bytesRemaining = bytesToWrite;
//循环值
BOOL keepLooping = YES;
while (keepLooping)
{
//最大写的字节数?
const size_t sslMaxBytesToWrite = 32768;
//得到二者小的,得到需要写的字节数
size_t sslBytesToWrite = MIN(bytesRemaining, sslMaxBytesToWrite);
//已写字节数
size_t sslBytesWritten = 0;
//将结果从buffer中写到socket上(经由了这个函数,数据就加密了)
result = SSLWrite(sslContext, buffer, sslBytesToWrite, &sslBytesWritten);
//如果写成功
if (result == noErr)
{
//buffer指针偏移
buffer += sslBytesWritten;
//加上些的数量
bytesWritten += sslBytesWritten;
//减去仍需写的数量
bytesRemaining -= sslBytesWritten;
//判断是否需要继续循环
keepLooping = (bytesRemaining > 0);
}
else
{
//IO阻塞
if (result == errSSLWouldBlock)
{
waiting = YES;
//得到缓存的大小(后续长度会被自己写到SSL缓存去)
sslWriteCachedLength = sslBytesToWrite;
}
else
{
error = [self sslError:result];
}
//跳出循环
keepLooping = NO;
}
} // while (keepLooping)
这里还有对残余数据的处理:是通过指针buffer获取我们的keepLooping
循环值,循环进行写入
//将结果从buffer中写到socket上(经由了这个函数,数据就加密了)
result = SSLWrite(sslContext, buffer, sslBytesToWrite, &sslBytesWritten);
- 普通socket写入
- 也做了完成判断
//判断是否完成
BOOL done = NO;
//判断已写大小
if (bytesWritten > 0)
{
// Update total amount read for the current write
//更新当前总共写的大小
currentWrite->bytesDone += bytesWritten;
LogVerbose(@"currentWrite->bytesDone = %lu", (unsigned long)currentWrite->bytesDone);
// Is packet done?
//判断当前写包是否写完
done = (currentWrite->bytesDone == [currentWrite->buffer length]);
}
同样为的也是三种数据包:一次性包,粘包,断包
//如果完成了
if (done)
{
//完成操作
[self completeCurrentWrite];
if (!error)
{
dispatch_async(socketQueue, ^{ @autoreleasepool{
//开始下一次的读取任务
[self maybeDequeueWrite];
}});
}
}
//未完成
else
{
// We were unable to finish writing the data,
// so we're waiting for another callback to notify us of available space in the lower-level output buffer.
//如果不是等待 而且没有出错
if (!waiting && !error)
{
// This would be the case if our write was able to accept some data, but not all of it.
//这是我们写了一部分数据的情况。
//去掉可接受数据的标记
flags &= ~kSocketCanAcceptBytes;
//再去等读source触发
if (![self usingCFStreamForTLS])
{
[self resumeWriteSource];
}
}
//如果已写大于0
if (bytesWritten > 0)
{
// We're not done with the entire write, but we have written some bytes
__strong id theDelegate = delegate;
//调用写的进度代理
if (delegateQueue && [theDelegate respondsToSelector:@selector(socket:didWritePartialDataOfLength:tag:)])
{
long theWriteTag = currentWrite->tag;
dispatch_async(delegateQueue, ^{ @autoreleasepool {
[theDelegate socket:self didWritePartialDataOfLength:bytesWritten tag:theWriteTag];
}});
}
}
}
那么整个 CocoaAsyncSocket Wirte的解析就到这里完成了,当你读完前面几篇,再来看这篇就跟喝水一样,故:知识在于积累
网友评论