GCDAsyncSocket源码分析
1.初始化socket 源码提供了四种初始化方法
- (instancetype)init;
- (instancetype)initWithSocketQueue:(nullable dispatch_queue_t)sq;
- (instancetype)initWithDelegate:(nullable id<GCDAsyncSocketDelegate>)aDelegate delegateQueue:(nullable dispatch_queue_t)dq;
- (instancetype)initWithDelegate:(nullable id<GCDAsyncSocketDelegate>)aDelegate delegateQueue:(nullable dispatch_queue_t)dq socketQueue:(nullable dispatch_queue_t)sq;
最终实现方法:
- (id)initWithDelegate:(id)aDelegate delegateQueue:(dispatch_queue_t)dq socketQueue:(dispatch_queue_t)sq
{
if((self = [super init]))
{
delegate = aDelegate; // socket的代理
delegateQueue = sq; // delegate的线程
// You MUST set a delegate AND delegate dispatch queue before attempting to use the socket, or you will get an error
// 初始化socket这里的delegate是必须要有的,否则会出现错误
// socketQueue是可选的,如果socketQueue是空的话会创建一个
//这个宏是在sdk6.0之后才有的,如果是之前的,则OS_OBJECT_USE_OBJC为0,!0即执行if语句
//对6.0的适配,如果是6.0以下,则去retain release,6.0之后ARC也管理了GCD
#if !OS_OBJECT_USE_OBJC
if (dq) dispatch_retain(dq);
#endif
//创建socket,先都置为 -1
//本机的ipv4
socket4FD = SOCKET_NULL;
//ipv6
socket6FD = SOCKET_NULL;
//应该是UnixSocket
socketUN = SOCKET_NULL;
//url
socketUrl = nil;
//状态
stateIndex = 0;
if (sq)
{
//给定的socketQueue参数不能是一个并发队列。
NSAssert(sq != dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0),
@"The given socketQueue parameter must not be a concurrent queue.");
NSAssert(sq != dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 0),
@"The given socketQueue parameter must not be a concurrent queue.");
NSAssert(sq != dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0),
@"The given socketQueue parameter must not be a concurrent queue.");
//拿到scoketQueue
socketQueue = sq;
//iOS6之下retain
#if !OS_OBJECT_USE_OBJC
dispatch_retain(sq);
#endif
}
else
{
//没有的话创建一个, 名字为:GCDAsyncSocket,串行
socketQueue = dispatch_queue_create([GCDAsyncSocketQueueName UTF8String], NULL);
}
// The dispatch_queue_set_specific() and dispatch_get_specific() functions take a "void *key" parameter.
// From the documentation:
//
// > Keys are only compared as pointers and are never dereferenced.
// > Thus, you can use a pointer to a static variable for a specific subsystem or
// > any other value that allows you to identify the value uniquely.
//
// We're just going to use the memory address of an ivar.
// Specifically an ivar that is explicitly named for our purpose to make the code more readable.
//
// However, it feels tedious (and less readable) to include the "&" all the time:
// dispatch_get_specific(&IsOnSocketQueueOrTargetQueueKey)
//
// So we're going to make it so it doesn't matter if we use the '&' or not,
// by assigning the value of the ivar to the address of the ivar.
// Thus: IsOnSocketQueueOrTargetQueueKey == &IsOnSocketQueueOrTargetQueueKey;
//比如原来为 0X123 -> NULL 变成 0X222->0X123->NULL
//自己的指针等于自己原来的指针,成二级指针了 看了注释是为了以后省略&,让代码更可读?
IsOnSocketQueueOrTargetQueueKey = &IsOnSocketQueueOrTargetQueueKey;
void *nonNullUnusedPointer = (__bridge void *)self;
//dispatch_queue_set_specific给当前队里加一个标识 dispatch_get_specific当前线程取出这个标识,判断是不是在这个队列
//这个key的值其实就是一个一级指针的地址 ,第三个参数把自己传过去了,上下文对象?第4个参数,为销毁的时候用的,可以指定一个函数
dispatch_queue_set_specific(socketQueue, IsOnSocketQueueOrTargetQueueKey, nonNullUnusedPointer, NULL);
//读的数组 限制为5
readQueue = [[NSMutableArray alloc] initWithCapacity:5];
currentRead = nil;
//写的数组,限制5
writeQueue = [[NSMutableArray alloc] initWithCapacity:5];
currentWrite = nil;
//设置大小为 4kb
preBuffer = [[GCDAsyncSocketPreBuffer alloc] initWithCapacity:(1024 * 4)];
#pragma mark alternateAddressDelay??
//交替地址延时?? wtf
alternateAddressDelay = 0.3;
}
return self;
}
核心建立连接方法
- (BOOL)connectToHost:(NSString *)inHost
onPort:(uint16_t)port
viaInterface:(NSString *)inInterface
withTimeout:(NSTimeInterval)timeout
error:(NSError **)errPtr
{
//{} ??有什么意义? -- 跟踪当前行为
LogTrace();
// Just in case immutable objects were passed
//拿到host ,copy防止值被修改
NSString *host = [inHost copy];
//interface?接口?
NSString *interface = [inInterface copy];
//声明两个__block的
__block BOOL result = NO;
//error信息
__block NSError *preConnectErr = nil;
//在socketQueue中执行这个Block
if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))
block();
//否则同步的调起这个queue去执行
else
dispatch_sync(socketQueue, block);
//如果有错误,赋值错误
if (errPtr) *errPtr = preConnectErr;
//把连接是否成功的result返回
return result;
}
说明下LogTrace();
#ifndef GCDAsyncSocketLoggingEnabled
#define GCDAsyncSocketLoggingEnabled 0
#endif
#if GCDAsyncSocketLoggingEnabled
#import "DDLog.h"
#define LogTrace() LogObjc(LOG_FLAG_VERBOSE, @"%@: %@", THIS_FILE, THIS_METHOD)
#else
#define LogTrace() {}
#endif
这个
LogTrace
根据GCDAsyncSocketLoggingEnabled
这个宏定义来是否打开的,会跟踪到文件名和方法名,但是GCDAsyncSocket这个库里面没有使用到。
现在着重说下block()里面干了什么操作:
//gcdBlock ,都包裹在自动释放池中 :
// 什么情况下使用自动释放池
// 1: 大量临时变量 connect : 重连
// 2: 自定义线程管理 : nsoperation
// 3: 非UI 命令 工具
dispatch_block_t block = ^{ @autoreleasepool {
// Check for problems with host parameter
if ([host length] == 0)
{
NSString *msg = @"Invalid host parameter (nil or \"\"). Should be a domain name or IP address string.";
preConnectErr = [self badParamError:msg];
//其实就是return,大牛的代码真是充满逼格
return_from_block;
}
// Run through standard pre-connect checks
//一个前置的检查,如果没通过返回,这个检查里,如果interface有值,则会将本机的IPV4 IPV6的 address设置上。
// 参数 : 指针 操作同一片内存空间
if (![self preConnectWithInterface:interface error:&preConnectErr])
{
return_from_block;
}
// We've made it past all the checks.
// It's time to start the connection process.
//flags 做或等运算。 flags标识为开始Socket连接
flags |= kSocketStarted;
//又是一个{}? 只是为了标记么?
LogVerbose(@"Dispatching DNS lookup...");
// It's possible that the given host parameter is actually a NSMutableString.
//很可能给我们的服务端的参数是一个可变字符串
// So we want to copy it now, within this block that will be executed synchronously.
//所以我们需要copy,在Block里同步的执行
// This way the asynchronous lookup block below doesn't have to worry about it changing.
//这种基于Block的异步查找,不需要担心它被改变
//copy,防止改变
NSString *hostCpy = [host copy];
//拿到状态
int aStateIndex = stateIndex;
__weak GCDAsyncSocket *weakSelf = self;
//全局Queue ---> 服务器
// client <---> server
// 这个globalConcurrentQueue主要是用来查找服务器链接
dispatch_queue_t globalConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
//异步执行
dispatch_async(globalConcurrentQueue, ^{ @autoreleasepool {
//忽视循环引用
#pragma clang diagnostic push
#pragma clang diagnostic warning "-Wimplicit-retain-self"
//查找错误
NSError *lookupErr = nil;
//server地址数组(包含IPV4 IPV6的地址 sockaddr_in6、sockaddr_in类型)
NSMutableArray *addresses = [[self class] lookupHost:hostCpy port:port error:&lookupErr];
//strongSelf
__strong GCDAsyncSocket *strongSelf = weakSelf;
//完整Block安全形态,在加个if
if (strongSelf == nil) return_from_block;
//如果有错
if (lookupErr)
{
//用cocketQueue
dispatch_async(strongSelf->socketQueue, ^{ @autoreleasepool {
//一些错误处理,清空一些数据等等
[strongSelf lookup:aStateIndex didFail:lookupErr];
}});
}
//正常
else
{
NSData *address4 = nil;
NSData *address6 = nil;
//遍历地址数组
for (NSData *address in addresses)
{
//判断address4为空,且address为IPV4
if (!address4 && [[self class] isIPv4Address:address])
{
address4 = address;
}
//判断address6为空,且address为IPV6
else if (!address6 && [[self class] isIPv6Address:address])
{
address6 = address;
}
}
//异步去发起连接
dispatch_async(strongSelf->socketQueue, ^{ @autoreleasepool {
[strongSelf lookup:aStateIndex didSucceedWithAddress4:address4 address6:address6];
}});
}
#pragma clang diagnostic pop
}});
//开启连接超时
[self startConnectTimeout:timeout];
result = YES;
}};
block()connect流程 :
1.
preConnectWithInterface
这个方法先检查interface是否有值,不过开发者一般都是传nil,如果interface有值是本机地址。
- 将
flags
的状态变成kSocketStarted
- 开启了一个异步全局并行队列来连接开启连接任务。
lookupHost:port:error:
这个方法根据host和port获取地址的信息,是返回一个数组,数组包括(IPV4 IPV6的地址 sockaddr_in6、sockaddr_in类型)lookup: didSucceedWithAddress4: address6:
去创建连接socket。- 连接完后再开辟一条异步线程,会把
flags |= kConnected
,连接成功后会停止超时连接.- 这时会创建读写stream后,添加到runloop上,最后openStreams。
接着到读/写数据函数
- (void)readDataWithTimeout:(NSTimeInterval)timeout
buffer:(NSMutableData *)buffer
bufferOffset:(NSUInteger)offset
maxLength:(NSUInteger)length
tag:(long)tag
{
if (offset > [buffer length]) {
LogWarn(@"Cannot read: offset > [buffer length]");
return;
}
GCDAsyncReadPacket *packet = [[GCDAsyncReadPacket alloc] initWithData:buffer
startOffset:offset
maxLength:length
timeout:timeout
readLength:0
terminator:nil
tag:tag];
dispatch_async(socketQueue, ^{ @autoreleasepool {
LogTrace();
if ((flags & kSocketStarted) && !(flags & kForbidReadsWrites))
{
//往读的队列添加任务,任务是包的形式
[readQueue addObject:packet];
[self maybeDequeueRead];
}
}});
// Do not rely on the block being run in order to release the packet,
// as the queue might get released without the block completing.
}
//让读任务离队,开始执行这条读任务
- (void)maybeDequeueRead
{
LogTrace();
NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
// If we're not currently processing a read AND we have an available read stream
//如果当前读的包为空,而且flag为已连接
if ((currentRead == nil) && (flags & kConnected))
{
//如果读的queue大于0 (里面装的是我们封装的GCDAsyncReadPacket数据包)
if ([readQueue count] > 0)
{
// Dequeue the next object in the write queue
//使得下一个对象从写的queue中离开
//从readQueue中拿到第一个写的数据
currentRead = [readQueue objectAtIndex:0];
//移除
[readQueue removeObjectAtIndex:0];
//我们的数据包,如果是GCDAsyncSpecialPacket这种类型,这个包里装了TLS的一些设置
//如果是这种类型的数据,那么我们就进行TLS
if ([currentRead isKindOfClass:[GCDAsyncSpecialPacket class]])
{
LogVerbose(@"Dequeued GCDAsyncSpecialPacket");
// Attempt to start TLS
//标记flag为正在读取TLS
flags |= kStartingReadTLS;
// This method won't do anything unless both kStartingReadTLS and kStartingWriteTLS are set
//只有读写都开启了TLS,才会做TLS认证
[self maybeStartTLS];
}
else
{
LogVerbose(@"Dequeued GCDAsyncReadPacket");
// Setup read timer (if needed)
//设置读的任务超时,每次延时的时候还会调用 [self doReadData];
[self setupReadTimerWithTimeout:currentRead->timeout];
// Immediately read, if possible
//读取数据 主要对数据是否有一次性的数据读取或者粘包的处理
[self doReadData];
}
}
//读的队列没有数据,标记flag为,读了没有数据则断开连接状态
else if (flags & kDisconnectAfterReads)
{
//如果标记有写然后断开连接
if (flags & kDisconnectAfterWrites)
{
//如果写的队列为0,而且写为空
if (([writeQueue count] == 0) && (currentWrite == nil))
{
//断开连接
[self closeWithError:nil];
}
}
else
{
//断开连接
[self closeWithError:nil];
}
}
//如果有安全socket。
else if (flags & kSocketSecure)
{
//
[self flushSSLBuffers];
// Edge case:
//
// We just drained all data from the ssl buffers,
// and all known data from the socket (socketFDBytesAvailable).
//
// If we didn't get any data from this process,
// then we may have reached the end of the TCP stream.
//
// Be sure callbacks are enabled so we're notified about a disconnection.
//如果可读字节数为0
if ([preBuffer availableBytes] == 0)
{
//CFStream形式TLS
if ([self usingCFStreamForTLS]) {
// Callbacks never disabled
}
else {
//重新恢复读的source。因为每次开始读数据的时候,都会挂起读的source
[self resumeReadSource];
}
}
}
}
}
//可能开启TLS
- (void)maybeStartTLS
{
// We can't start TLS until:
// - All queued reads prior to the user calling startTLS are complete
// - All queued writes prior to the user calling startTLS are complete
//
// We'll know these conditions are met when both kStartingReadTLS and kStartingWriteTLS are set
//只有读和写TLS都开启
if ((flags & kStartingReadTLS) && (flags & kStartingWriteTLS))
{
//需要安全传输
BOOL useSecureTransport = YES;
#if TARGET_OS_IPHONE
{
//拿到当前读的数据
GCDAsyncSpecialPacket *tlsPacket = (GCDAsyncSpecialPacket *)currentRead;
//得到设置字典
NSDictionary *tlsSettings = tlsPacket->tlsSettings;
//拿到Key为CFStreamTLS的 value
NSNumber *value = [tlsSettings objectForKey:GCDAsyncSocketUseCFStreamForTLS];
if (value && [value boolValue])
//如果是用CFStream的,则安全传输为NO
useSecureTransport = NO;
}
#endif
//如果使用安全通道
if (useSecureTransport)
{
//开启TLS
[self ssl_startTLS];
}
//CFStream形式的Tls
else
{
#if TARGET_OS_IPHONE
[self cf_startTLS];
#endif
}
}
}
- 如果能进入
maybeStartTLS
这个方法后,证明GCDAsyncSpecialPacket
这个包要对TLS认证。- 这里的TLS分了两种情况:
2.1 ssl_startTLS认证
2.2 cf_startTLS认证- 但是最终都会去到prebuff,再根据代理方法代理出去的
ssl_startTLS认证流程:
- client -- '发送一个信号' --> server
- server先弄个公开密钥将客户端发送过来的信号生成主密钥所需要信息 ---> client
- client将主密钥加密的信息 ---> server
- server用公开密钥对主密钥解密,然后认证
- 认证完后初始化SSL提前缓冲(4kb)
- 然后根据preBuffer可读大小进行读写
- 开始SSL握手过程,调用socket已经开启安全通道的代理方法
cf_startTLS流程:
这是CF流形式的TLS ---》createReadAndWriteStream ---》addStreamsToRunLoop ---》拿到当前的GCDAsyncSpecialPacket包 ---》再去拿ssl配置 ---》直接设置给读写stream ---》openStreams
Close流程:
1.缓冲区的prebuff进行reset
2.相应的事件流关闭、释放、滞空
3.SSL上下文关闭、释放
4.针对三种不同类型socket进行关闭释放
5.取消相关的souce
6.代理回调关闭
GCDAsyncSocketDelegate代理方法:
//已经连接到服务器
- (void)socket:(GCDAsyncSocket *)sock didConnectToHost:(nonnull NSString *)host port:(uint16_t)port{
NSLog(@"连接成功 : %@---%d",host,port);
//连接成功或者收到消息,必须开始read,否则将无法收到消息,
//不read的话,缓存区将会被关闭
// -1 表示无限时长 ,永久不失效
[self.socket readDataWithTimeout:-1 tag:10086];
}
// 连接断开
- (void)socketDidDisconnect:(GCDAsyncSocket *)sock withError:(NSError *)err{
NSLog(@"断开 socket连接 原因:%@",err);
}
//已经接收服务器返回来的数据
- (void)socket:(GCDAsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag{
NSLog(@"接收到tag = %ld : %ld 长度的数据",tag,data.length);
//连接成功或者收到消息,必须开始read,否则将无法收到消息
//不read的话,缓存区将会被关闭
// -1 表示无限时长 , tag
[self.socket readDataWithTimeout:-1 tag:10086];
}
//消息发送成功 代理函数 向服务器 发送消息
- (void)socket:(GCDAsyncSocket *)sock didWriteDataWithTag:(long)tag{
NSLog(@"%ld 发送数据成功",tag);
}
因为存在粘包和分包的情况,所以接收方需要对接收的数据进行一定的处理,主要解决的问题有两个:
1.在粘包产生时,要可以在同一个包内获取出多个包的内容。
2.在分包产生时,要保留上一个包的部分内容,与下一个包的部分内容组合。
处理方式:
#pragma mark - 发送数据格式化
- (void)sendData:(NSData *)data dataType:(unsigned int)dataType{
NSMutableData *mData = [NSMutableData data];
// 1.计算数据总长度 data
unsigned int dataLength = 4+4+(int)data.length;
// 将长度转成data
NSData *lengthData = [NSData dataWithBytes:&dataLength length:4];
// mData 拼接长度data
[mData appendData:lengthData];
// 数据类型 data
// 2.拼接指令类型(4~7:指令)
NSData *typeData = [NSData dataWithBytes:&dataType length:4];
// mData 拼接数据类型data
[mData appendData:typeData];
// 3.最后拼接真正的数据data
[mData appendData:data];
NSLog(@"发送数据的总字节大小:%ld",mData.length);
// 发数据
[self.socket writeData:mData withTimeout:-1 tag:10086];
}
接收数据
- (void)recvData:(NSData *)data{
//直接就给他缓存起来
[self.cacheData appendData:data];
// 获取总的数据包大小
// 整段数据长度(不包含长度跟类型)
NSData *totalSizeData = [data subdataWithRange:NSMakeRange(0, 4)];
unsigned int totalSize = 0;
[totalSizeData getBytes:&totalSize length:4];
//包含长度跟类型的数据长度
unsigned int completeSize = totalSize + 8;
//必须要大于8 才会进这个循环
while (self.cacheData.length>8) {
if (self.cacheData.length < completeSize) {
//如果缓存的长度 还不如 我们传过来的数据长度,就让socket继续接收数据
[self.socket readDataWithTimeout:-1 tag:10086];
break;
}
//取出数据
NSData *resultData = [self.cacheData subdataWithRange:NSMakeRange(8, completeSize)];
//处理数据
[self handleRecvData:resultData];
//清空刚刚缓存的data
[self.cacheData replaceBytesInRange:NSMakeRange(0, completeSize) withBytes:nil length:0];
//如果缓存的数据长度还是大于8,再执行一次方法
if (self.cacheData.length > 8) {
[self recvData:nil];
}
}
}
socket的消息如何准确送达
1.当连接的情况下是根据接口来接受消息,如果没有连接情况是走apns推送。
2.消息有序情况把数据封装成一个包一个包发送这样就可以保证消息有序。
websocket
websocket是解决粘包的问题,根据pingpong来保持连接,如果断了也是会走apns推送,保证消息接受。
网友评论