美文网首页iOS备忘录iOS 开发每天分享优质文章iOS 网络编程
iOS即时通讯进阶 - CocoaAsyncSocket源码解析

iOS即时通讯进阶 - CocoaAsyncSocket源码解析

作者: 涂耀辉 | 来源:发表于2017-01-24 16:22 被阅读4489次
    前言:

    本文承接自上篇:iOS即时通讯进阶 - CocoaAsyncSocket源码解析(Connect篇)

    注:文中涉及代码比较多,建议大家结合源码一起阅读比较容易能加深理解。这里有楼主标注好注释的源码,有需要的可以作为参照:CocoaAsyncSocket源码注释
    如果对该框架用法不熟悉的话,可以参考楼主之前这篇文章:iOS即时通讯,从入门到“放弃”?,或者自行查阅。

    上文我们提到了GCDAsyncSocket的初始化,以及最终connect之前的准备工作,包括一些错误检查;本机地址创建以及socket创建;服务端地址的创建;还有一些本机socket可选项的配置,例如禁止网络出错导致进程关闭的信号等。

    言归正传,继续上文往下讲
    上文讲到了本文方法八--创建Socket,其中有这么一行代码:
    //和connectInterface绑定
    if (![self bindSocket:socketFD toInterface:connectInterface error:errPtr])
    {
        //绑定失败,直接关闭返回
        [self closeSocket:socketFD];
        
        return SOCKET_NULL;
    }
    

    我们去用之前创建的本机地址去做socket绑定,接着会调用到如下方法中:

    本文方法九--给Socket绑定本机地址
    //绑定一个Socket的本地地址
    - (BOOL)bindSocket:(int)socketFD toInterface:(NSData *)connectInterface error:(NSError **)errPtr
    {
        // Bind the socket to the desired interface (if needed)
        //无接口就不绑定,connect会自动绑定到一个不冲突的端口上去。
        if (connectInterface)
        {
            LogVerbose(@"Binding socket...");
            
            //判断当前地址的Port是不是大于0
            if ([[self class] portFromAddress:connectInterface] > 0)
            {
                // Since we're going to be binding to a specific port,
                // we should turn on reuseaddr to allow us to override sockets in time_wait.
                
                int reuseOn = 1;
                
                
                //设置调用close(socket)后,仍可继续重用该socket。调用close(socket)一般不会立即关闭socket,而经历TIME_WAIT的过程。
                setsockopt(socketFD, SOL_SOCKET, SO_REUSEADDR, &reuseOn, sizeof(reuseOn));
            }
            
            //拿到地址
            const struct sockaddr *interfaceAddr = (const struct sockaddr *)[connectInterface bytes];
            //绑定这个地址
            int result = bind(socketFD, interfaceAddr, (socklen_t)[connectInterface length]);
            
            //绑定出错,返回NO
            if (result != 0)
            {
                if (errPtr)
                    *errPtr = [self errnoErrorWithReason:@"Error in bind() function"];
                
                return NO;
            }
        }
        
        //成功
        return YES;
    }
    

    这个方法也非常简单,如果没有connectInterface则直接返回YES,当socket进行连接的时候,会自动绑定一个端口,进行连接。
    如果有值,则我们开始绑定到我们一开始指定的地址上。
    这里调用了两个和scoket相关的函数:
    第一个是我们之前提到的配置scoket参数的函数:

    setsockopt(socketFD, SOL_SOCKET, SO_REUSEADDR, &reuseOn, sizeof(reuseOn));
    

    这里调用这个函数的主要目的是为了调用close的时候,不立即去关闭socket连接,而是经历一个TIME_WAIT过程。在这个过程中,socket是可以被复用的。我们注意到之前的connect流程并没有看到复用socket的代码。注意,我们现在走的连接流程是客户端的流程,等我们讲到服务端accept进行连接的时候,我们就能看到这个复用的作用了。

    第二个是bind函数

    int result = bind(socketFD, interfaceAddr, (socklen_t)[connectInterface length]);
    

    这个函数倒是很简单,就3个参数,socket、需要绑定的地址、地址大小。这样就把socket和这个地址(其实就是端口)捆绑在一起了。

    这样我们就做完了最终连接前所有准备工作,本机socket有了,服务端的地址也有了。接着我们就可以开始进行最终连接了:

    本文方法十 -- 建立连接的最终方法
    //连接最终方法 3 finnal。。。
    - (void)connectSocket:(int)socketFD address:(NSData *)address stateIndex:(int)aStateIndex
    {
        // If there already is a socket connected, we close socketFD and return
        //已连接,关闭连接返回
        if (self.isConnected)
        {
            [self closeSocket:socketFD];
            return;
        }
        
        // Start the connection process in a background queue
        //开始连接过程,在后台queue中
        __weak GCDAsyncSocket *weakSelf = self;
        
        //获取到全局Queue
        dispatch_queue_t globalConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
        //新线程
        dispatch_async(globalConcurrentQueue, ^{
    #pragma clang diagnostic push
    #pragma clang diagnostic warning "-Wimplicit-retain-self"
            //调用connect方法,该函数阻塞线程,所以要异步新线程
            //客户端向特定网络地址的服务器发送连接请求,连接成功返回0,失败返回 -1。
            int result = connect(socketFD, (const struct sockaddr *)[address bytes], (socklen_t)[address length]);
            
            //老样子,安全判断
            __strong GCDAsyncSocket *strongSelf = weakSelf;
            if (strongSelf == nil) return_from_block;
            
            //在socketQueue中,开辟线程
            dispatch_async(strongSelf->socketQueue, ^{ @autoreleasepool {
                //如果状态为已经连接,关闭连接返回
                if (strongSelf.isConnected)
                {
                    [strongSelf closeSocket:socketFD];
                    return_from_block;
                }
                
                //说明连接成功
                if (result == 0)
                {
                    //关闭掉另一个没用的socket
                    [self closeUnusedSocket:socketFD];
                    //调用didConnect,生成stream,改变状态等等!
                    [strongSelf didConnect:aStateIndex];
                }
                //连接失败
                else
                {
                    //关闭当前socket
                    [strongSelf closeSocket:socketFD];
                    
                    // If there are no more sockets trying to connect, we inform the error to the delegate
                    //返回连接错误的error
                    if (strongSelf.socket4FD == SOCKET_NULL && strongSelf.socket6FD == SOCKET_NULL)
                    {
                        NSError *error = [strongSelf errnoErrorWithReason:@"Error in connect() function"];
                        [strongSelf didNotConnect:aStateIndex error:error];
                    }
                }
            }});
            
    #pragma clang diagnostic pop
        });
        //输出正在连接中
        LogVerbose(@"Connecting...");
    }
    
    

    这个方法主要就是做了一件事,调用下面一个函数进行连接:

    int result = connect(socketFD, (const struct sockaddr *)[address bytes], (socklen_t)[address length]);
    

    这里需要注意的是这个函数是阻塞,直到结果返回之前,线程会一直停在这行。所以这里用的是全局并发队列,开辟了一个新的线程进行连接,在得到结果之后,又调回socketQueue中进行后续操作。

    如果result为0,说明连接成功,我们会关闭掉另外一个没有用到的socket(如果有的话)。然后调用另外一个方法做一些连接成功的初始化操作。
    否则连接失败,我们会关闭socket,填充错误并且返回。

    我们接着来看看连接成功后,初始化的方法:

    本文方法十一 -- 连接成功后的初始化
    //连接成功后调用,设置一些连接成功的状态
    - (void)didConnect:(int)aStateIndex
    {
        LogTrace();
        
        NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
        
        //状态不同
        if (aStateIndex != stateIndex)
        {
            LogInfo(@"Ignoring didConnect, already disconnected");
            
            // The connect operation has been cancelled.
            // That is, socket was disconnected, or connection has already timed out.
            return;
        }
        
        //kConnected合并到当前flag中
        flags |= kConnected;
        //停止连接超时
        [self endConnectTimeout];
        
        #if TARGET_OS_IPHONE
        // The endConnectTimeout method executed above incremented the stateIndex.
        //上面的endConnectTimeout,会导致stateIndex增加,所以需要重新赋值
        aStateIndex = stateIndex;
        #endif
        
        // Setup read/write streams (as workaround for specific shortcomings in the iOS platform)
        // 
        // Note:
        // There may be configuration options that must be set by the delegate before opening the streams.
        //打开stream之前必须用相关配置设置代理
        // The primary example is the kCFStreamNetworkServiceTypeVoIP flag, which only works on an unopened stream.
        //主要的例子是kCFStreamNetworkServiceTypeVoIP标记,只能工作在未打开的stream中?
        // 
        // Thus we wait until after the socket:didConnectToHost:port: delegate method has completed.
        //所以我们要等待,连接完成的代理调用完
        // This gives the delegate time to properly configure the streams if needed.
        //这些给了代理时间,去正确的配置Stream,如果是必要的话
        
        //创建个Block来初始化Stream
        dispatch_block_t SetupStreamsPart1 = ^{
            
            NSLog(@"hello~");
            #if TARGET_OS_IPHONE
            //创建读写stream失败,则关闭并报对应错误
            if (![self createReadAndWriteStream])
            {
                [self closeWithError:[self otherError:@"Error creating CFStreams"]];
                return;
            }
            
            //参数是给NO的,就是有可读bytes的时候,不会调用回调函数
            if (![self registerForStreamCallbacksIncludingReadWrite:NO])
            {
                [self closeWithError:[self otherError:@"Error in CFStreamSetClient"]];
                return;
            }
            
            #endif
        };
        //part2设置stream
        dispatch_block_t SetupStreamsPart2 = ^{
            #if TARGET_OS_IPHONE
            //状态不一样直接返回
            if (aStateIndex != stateIndex)
            {
                // The socket has been disconnected.
                return;
            }
            //如果加到runloop上失败
            if (![self addStreamsToRunLoop])
            {
                //错误返回
                [self closeWithError:[self otherError:@"Error in CFStreamScheduleWithRunLoop"]];
                return;
            }
            
            //读写stream open
            if (![self openStreams])
            {
                //开启错误返回
                [self closeWithError:[self otherError:@"Error creating CFStreams"]];
                return;
            }
            
            #endif
        };
        
        // Notify delegate
        //通知代理
        //拿到server端的host port
        NSString *host = [self connectedHost];
        uint16_t port = [self connectedPort];
        //拿到unix域的 url
        NSURL *url = [self connectedUrl];
        //拿到代理
        __strong id theDelegate = delegate;
        
        //代理队列 和 Host不为nil 且响应didConnectToHost代理方法
        if (delegateQueue && host != nil && [theDelegate respondsToSelector:@selector(socket:didConnectToHost:port:)])
        {
            //调用初始化stream1
            SetupStreamsPart1();
            
            dispatch_async(delegateQueue, ^{ @autoreleasepool {
                
                //到代理队列调用连接成功的代理方法
                [theDelegate socket:self didConnectToHost:host port:port];
                
                //然后回到socketQueue中去执行初始化stream2
                dispatch_async(socketQueue, ^{ @autoreleasepool {
                    
                    SetupStreamsPart2();
                }});
            }});
        }
        //这个是unix domain 请求回调
        else if (delegateQueue && url != nil && [theDelegate respondsToSelector:@selector(socket:didConnectToUrl:)])
        {
            SetupStreamsPart1();
            
            dispatch_async(delegateQueue, ^{ @autoreleasepool {
                
                [theDelegate socket:self didConnectToUrl:url];
                
                dispatch_async(socketQueue, ^{ @autoreleasepool {
                    
                    SetupStreamsPart2();
                }});
            }});
        }
        //否则只初始化stream
        else
        {
            SetupStreamsPart1();
            SetupStreamsPart2();
        }
            
        // Get the connected socket
        
        int socketFD = (socket4FD != SOCKET_NULL) ? socket4FD : (socket6FD != SOCKET_NULL) ? socket6FD : socketUN;
        
        //fcntl,功能描述:根据文件描述词来操作文件的特性。http://blog.csdn.net/pbymw8iwm/article/details/7974789
        // Enable non-blocking IO on the socket
        //使socket支持非阻塞IO
        int result = fcntl(socketFD, F_SETFL, O_NONBLOCK);
        if (result == -1)
        {
            //失败 ,报错
            NSString *errMsg = @"Error enabling non-blocking IO on socket (fcntl)";
            [self closeWithError:[self otherError:errMsg]];
            
            return;
        }
        
        // Setup our read/write sources
        //初始化读写source
        [self setupReadAndWriteSourcesForNewlyConnectedSocket:socketFD];
        
        // Dequeue any pending read/write requests
        //开始下一个任务
        [self maybeDequeueRead];
        [self maybeDequeueWrite];
    }
    
    

    这个方法很长一大串,其实做的东西也很简单,主要做了下面几件事:

    1. 把当前状态flags加上已连接,并且关闭掉我们一开始连接开启的,连接超时的定时器。
    2. 初始化了两个BlockSetupStreamsPart1SetupStreamsPart2,这两个Block做的事都和读写流有关。SetupStreamsPart1用来创建读写流,并且注册回调。另一个SetupStreamsPart2用来把流添加到当前线程的runloop上,并且打开流。
    3. 判断是否有代理queuehost或者url这些参数是否为空、是否代理响应didConnectToHostdidConnectToUrl代理,这两种分别对应了普通socket连接和unix domin socket连接。如果实现了对应的代理,则调用连接成功的代理。
    4. 在调用代理的同时,调用了我们之前初始化的两个读写流相关的Block。这里值得说下的是这两个Block和代理之间的调用顺序:
    • 先执行SetupStreamsPart1后执行SetupStreamsPart2,没什么好说的,问题是代理的执行时间,想想如果我们放在SetupStreamsPart2后面是不是会导致个问题,就是用户收到消息了,但是连接成功的代理还没有被调用,这显然是不合理的。所以我们的调用顺序是SetupStreamsPart1->代理->SetupStreamsPart2

      所以出现了如下代码:

      //调用初始化stream1
            SetupStreamsPart1();
            
            dispatch_async(delegateQueue, ^{ @autoreleasepool {
                
                //到代理队列调用连接成功的代理方法
                [theDelegate socket:self didConnectToHost:host port:port];
                
                //然后回到socketQueue中去执行初始化stream2
                dispatch_async(socketQueue, ^{ @autoreleasepool {
                    
                    SetupStreamsPart2();
                }});
            }});
    

    原因是为了线程安全和socket相关的操作必须在socketQueue中进行。而代理必须在我们设置的代理queue中被回调。

    1. 拿到当前的本机socket,调用如下函数:
    int result = fcntl(socketFD, F_SETFL, O_NONBLOCK);
    

    简单来说,这个函数类似我们之前提到的一个函数setsockopt(),都是给socket设置一些参数,以实现一些功能。而这个函数,能实现的功能更多。大家可以看看这篇文章参考参考:fcntl函数详解

    而在这里,就是为了把socket的IO模式设置为非阻塞。很多小伙伴又要疑惑什么是非阻塞了,先别急,关于这个我们下文会详细的来谈。

    1. 我们初始化了读写source(很重要,所有的消息都是由这个source来触发的,我们之后会详细分析这个方法)。

    2. 我们做完了streamsource的初始化处理,则开始做一次读写任务(这两个方法暂时不讲,会放到之后的ReadWrite篇中去讲)。

    我们接着来讲讲这个方法中对其他方法的调用,按照顺序来,先从第2条,两个Block中对stream的处理开始。和stream相关的函数一共有6个:

    Stream相关方法一 -- 创建读写stream
    //创建读写stream
    - (BOOL)createReadAndWriteStream
    {
        LogTrace();
        
        NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
        
        //如果有一个有值,就返回
        if (readStream || writeStream)
        {
            // Streams already created
            return YES;
        }
        //拿到socket,首选是socket4FD,其次socket6FD,都没有才是socketUN,socketUN应该是Unix的socket结构体
        int socketFD = (socket4FD != SOCKET_NULL) ? socket4FD : (socket6FD != SOCKET_NULL) ? socket6FD : socketUN;
        
        //如果都为空,返回NO
        if (socketFD == SOCKET_NULL)
        {
            // Cannot create streams without a file descriptor
            return NO;
        }
        
        //如果非连接,返回NO
        if (![self isConnected])
        {
            // Cannot create streams until file descriptor is connected
            return NO;
        }
        
        LogVerbose(@"Creating read and write stream...");
        
    #pragma mark - 绑定Socket和CFStream
        //下面的接口用于创建一对 socket stream,一个用于读取,一个用于写入:
        CFStreamCreatePairWithSocket(NULL, (CFSocketNativeHandle)socketFD, &readStream, &writeStream);
        
        // The kCFStreamPropertyShouldCloseNativeSocket property should be false by default (for our case).
        // But let's not take any chances.
        
        
        
        //读写stream都设置成不会随着绑定的socket一起close,release。 kCFBooleanFalse不一起,kCFBooleanTrue一起
        if (readStream)
            CFReadStreamSetProperty(readStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse);
        if (writeStream)
            CFWriteStreamSetProperty(writeStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse);
        
        //如果有一个为空
        if ((readStream == NULL) || (writeStream == NULL))
        {
            LogWarn(@"Unable to create read and write stream...");
            
            //关闭对应的stream
            if (readStream)
            {
                CFReadStreamClose(readStream);
                CFRelease(readStream);
                readStream = NULL;
            }
            if (writeStream)
            {
                CFWriteStreamClose(writeStream);
                CFRelease(writeStream);
                writeStream = NULL;
            }
            //返回创建失败
            return NO;
        }
        //创建成功
        return YES;
    }
    

    这个方法基本上很简单,就是关于两个stream函数的调用:

    1. 创建stream的函数:
    CFStreamCreatePairWithSocket(NULL, (CFSocketNativeHandle)socketFD, &readStream, &writeStream);
    

    这个函数创建了一对读写stream,并且把stream与这个scoket做了绑定。

    1. 设置stream属性:
    CFReadStreamSetProperty(readStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse);
    CFWriteStreamSetProperty(writeStream, kCFStreamPropertyShouldCloseNativeSocket, kCFBooleanFalse);
    

    这个函数可以给stream设置一个属性,这里是设置stream不会随着socket的生命周期(close,release)而变化。

    接着调用了registerForStreamCallbacksIncludingReadWrite来给stream注册读写回调。

    Stream相关方法二 -- 读写回调的注册:
    //注册Stream的回调
    - (BOOL)registerForStreamCallbacksIncludingReadWrite:(BOOL)includeReadWrite
    {
        LogVerbose(@"%@ %@", THIS_METHOD, (includeReadWrite ? @"YES" : @"NO"));
        
        NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
        //判断读写stream是不是都为空
        NSAssert((readStream != NULL && writeStream != NULL), @"Read/Write stream is null");
        
        //客户端stream上下文对象
        streamContext.version = 0;
        streamContext.info = (__bridge void *)(self);
        streamContext.retain = nil;
        streamContext.release = nil;
        streamContext.copyDescription = nil;
        
    //    The open has completed successfully.
    //    The stream has bytes to be read.
    //    The stream can accept bytes for writing.
    //        An error has occurred on the stream.
    //        The end of the stream has been reached.
        
        //设置一个CF的flag  两种,一种是错误发生的时候,一种是stream事件结束
        CFOptionFlags readStreamEvents = kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered ;
        //如果包含读写
        if (includeReadWrite)
            //仍然有Bytes要读的时候     The stream has bytes to be read.
            readStreamEvents |= kCFStreamEventHasBytesAvailable;
        
        //给读stream设置客户端,会在之前设置的那些标记下回调函数 CFReadStreamCallback。设置失败的话直接返回NO
        if (!CFReadStreamSetClient(readStream, readStreamEvents, &CFReadStreamCallback, &streamContext))
        {
            return NO;
        }
        
        //写的flag,也一样
        CFOptionFlags writeStreamEvents = kCFStreamEventErrorOccurred | kCFStreamEventEndEncountered;
        if (includeReadWrite)
            writeStreamEvents |= kCFStreamEventCanAcceptBytes;
        
        if (!CFWriteStreamSetClient(writeStream, writeStreamEvents, &CFWriteStreamCallback, &streamContext))
        {
            return NO;
        }
        //走到最后说明读写都设置回调成功,返回YES
        return YES;
    }
    

    相信用过CFStream的朋友,应该会觉得很简单,这个方法就是调用了一些CFStream相关函数,其中最主要的这个设置读写回调函数:

    Boolean CFReadStreamSetClient(CFReadStreamRef stream, CFOptionFlags streamEvents, CFReadStreamClientCallBack clientCB, CFStreamClientContext *clientContext);
    Boolean CFWriteStreamSetClient(CFWriteStreamRef stream, CFOptionFlags streamEvents, CFWriteStreamClientCallBack clientCB, CFStreamClientContext *clientContext);
    

    这个函数共4个参数:
    第1个为我们需要设置的stream;
    第2个为需要监听的事件选项,包括以下事件:

    typedef CF_OPTIONS(CFOptionFlags, CFStreamEventType) {
        kCFStreamEventNone = 0,  //没有事件发生
        kCFStreamEventOpenCompleted = 1,  //成功打开流
        kCFStreamEventHasBytesAvailable = 2, //流中有数据可读
        kCFStreamEventCanAcceptBytes = 4,  //流中可以接受数据去写
        kCFStreamEventErrorOccurred = 8,  //流发生错误
        kCFStreamEventEndEncountered = 16  //到达流的结尾
    };
    

    其中具体用法,大家可以自行去试试,这里作者只监听了了两种事件kCFStreamEventErrorOccurredkCFStreamEventEndEncountered,再根据传过来的参数去决定是否监听kCFStreamEventCanAcceptBytes

    //如果包含读写
    if (includeReadWrite)
       //仍然有Bytes要读的时候     The stream has bytes to be read.
         readStreamEvents |= kCFStreamEventHasBytesAvailable;
    

    而这里我们传过来的参数为NO,导致它并不监听可读数据。显然,我们正常的连接,当有消息发送过来,并不是由stream回调来触发的。这个框架中,如果是TLS传输的socket是用stream来触发的,这个我们后续文章会讲到。

    那么有数据的时候,到底是什么来触发我们的读写呢,答案就是读写source,我们接下来就会去创建初始化它。

    这里绑定了两个函数,分别对应读和写的回调,分别为:

    //读的回调
    static void CFReadStreamCallback (CFReadStreamRef stream, CFStreamEventType type, void *pInfo)
    //写的回调
    static void CFWriteStreamCallback (CFWriteStreamRef stream, CFStreamEventType type, void *pInfo)
    

    关于这两个函数,同样这里暂时不做讨论,等后续文章再来分析。

    还有一点需要说一下的是streamContext这个属性,它是一个结构体,包含流的上下文信息,其结构如下:

    typedef struct {
        CFIndex version;
        void *info;
        void *(*retain)(void *info);
        void (*release)(void *info);
        CFStringRef (*copyDescription)(void *info);
    } CFStreamClientContext;
    
    

    这个流的上下文中info指针,其实就是前面所对应的读写回调函数中的pInfo指针,每次回调都会传过去。其它的version就是流的版本标识,之外的3个都需要的是一个函数指针,对应我们传递的pInfo的持有以及释放还有复制的描述信息,这里我们都赋值给nil

    接着我们来到流处理的第三步:addStreamsToRunLoop-添加到runloop上。

    Stream相关方法三 -- 加到当前线程的runloop上:
    //把stream添加到runloop上
    - (BOOL)addStreamsToRunLoop
    {
        LogTrace();
        
        NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
        NSAssert((readStream != NULL && writeStream != NULL), @"Read/Write stream is null");
        
        //判断flag里是否包含kAddedStreamsToRunLoop,没添加过则添加。
        if (!(flags & kAddedStreamsToRunLoop))
        {
            LogVerbose(@"Adding streams to runloop...");
            
            
            [[self class] startCFStreamThreadIfNeeded];
            //在开启的线程中去执行,阻塞式的
            [[self class] performSelector:@selector(scheduleCFStreams:)
                                 onThread:cfstreamThread
                               withObject:self
                            waitUntilDone:YES];
            
            //添加标识
            flags |= kAddedStreamsToRunLoop;
        }
        
        return YES;
    }
    

    这里方法做了两件事:

    1. 开启了一条用于CFStream读写回调的常驻线程,其中调用了好几个函数:
     + (void)startCFStreamThreadIfNeeded;
     + (void)cfstreamThread;
    

    在这两个函数中,添加了一个runloop,并且绑定了一个定时器事件,让它run起来,使得线程常驻。大家可以结合着github中demo的注释,自行查看这几个方法。如果有任何疑问可以看看楼主这篇文章:基于runloop的线程保活、销毁与通信,或者本文下评论,会一一解答。

    1. 在这个常驻线程中去调用注册方法:
    //注册CFStream
     + (void)scheduleCFStreams:(GCDAsyncSocket *)asyncSocket
    {
        LogTrace();
       
        //断言当前线程是cfstreamThread,不是则报错
        NSAssert([NSThread currentThread] == cfstreamThread, @"Invoked on wrong thread");
        
        //获取到runloop
        CFRunLoopRef runLoop = CFRunLoopGetCurrent();
        //如果有readStream
        if (asyncSocket->readStream)
            //注册readStream在runloop的kCFRunLoopDefaultMode上
            CFReadStreamScheduleWithRunLoop(asyncSocket->readStream, runLoop, kCFRunLoopDefaultMode);
        
        //一样
        if (asyncSocket->writeStream)
            CFWriteStreamScheduleWithRunLoop(asyncSocket->writeStream, runLoop, kCFRunLoopDefaultMode);
    }
    

    这里可以看到,我们流的回调都是在这条流的常驻线程中,至于为什么要这么做,相信大家楼主看过AFNetworking系列文章的会明白。我们之后文章也会就这个框架线程的问题详细讨论的,这里就暂时不详细说明了。
    这里主要用了CFReadStreamScheduleWithRunLoop函数完成了runloop的注册:

    CFReadStreamScheduleWithRunLoop(asyncSocket->readStream, runLoop, kCFRunLoopDefaultMode);
    CFWriteStreamScheduleWithRunLoop(asyncSocket->writeStream, runLoop, kCFRunLoopDefaultMode);
    

    这样,如果stream中有我们监听的事件发生了,就会在这个runloop中触发我们之前设置的读写回调函数。

    我们完成了注册,接下来我们就需要打开stream了:

    Stream相关方法四 -- 打开stream:
    //打开stream
    - (BOOL)openStreams
    {
        LogTrace();
        
        NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
        //断言读写stream都不会空
        NSAssert((readStream != NULL && writeStream != NULL), @"Read/Write stream is null");
        
        //返回stream的状态
    
        CFStreamStatus readStatus = CFReadStreamGetStatus(readStream);
        CFStreamStatus writeStatus = CFWriteStreamGetStatus(writeStream);
        
        //如果有任意一个没有开启
        if ((readStatus == kCFStreamStatusNotOpen) || (writeStatus == kCFStreamStatusNotOpen))
        {
            LogVerbose(@"Opening read and write stream...");
            
            //开启
            BOOL r1 = CFReadStreamOpen(readStream);
            BOOL r2 = CFWriteStreamOpen(writeStream);
            
            //有一个开启失败
            if (!r1 || !r2)
            {
                LogError(@"Error in CFStreamOpen");
                return NO;
            }
        }
        
        return YES;
    }
    
    

    方法也很简单,通过CFReadStreamGetStatus函数,获取到当前stream的状态,判断没开启则调用CFReadStreamOpen函数去开启,如果开启失败,错误返回。

    到这里stream初始化相关的工作就做完了,接着我们还是回到本文方法十一 -- 连接成功后的初始化中

    其中第5条,我们谈到了设置socket的I/O模式为非阻塞,相信很多朋友对socket的I/O:同步、异步、阻塞、非阻塞。这四个概念有所混淆。
    简单的来说,同步、异步是对于客户端而言的。比如我发起一个调用一个函数,我如果直接去调用,那么就是同步的,否则新开辟一个线程去做,那么对于当前线程而言就是异步的。
    阻塞和非阻塞是对于服务端而言。当服务端被客户端调用后,我如果立刻返回调用的结果(无论数据是否处理完)那么就是非阻塞的,又或者等待数据拿到并且处理完(总之一系列逻辑)再返回,那么这种情况就是阻塞的。

    好了,有了这个概念,我们接下来看看Linux下的5种I/O模型:
    1)阻塞I/O(blocking I/O)
    2)非阻塞I/O (nonblocking I/O)

    1. I/O复用(select 和poll) (I/O multiplexing)
      4)信号驱动I/O (signal driven I/O (SIGIO))
      5)异步I/O (asynchronous I/O (the POSIX aio_functions))

    我们来简单谈谈这5种模型:
    1)阻塞I/O:
    简单举个例子,比如我们调用read()去读取消息,如果是在阻塞模式下,我们会一直等待,直到有消息到来为止。
    很多小伙伴可能又要说了,这有什么不可以,我们新开辟一条线程,让它等着不就行了,看起来确实没什么不可以。
    那是因为你仅仅是站在客户端的角度上来看。试想如果我们服务端也这么做,那岂不是有多少个socket连接,我们得开辟多少个线程去做阻塞IO?
    2)非阻塞I/O
    于是就有了非阻塞的概念,当我们去read()的时候,直接返回结果,这样在很大概率下,是并没有消息给我们读的。这时候函数就会错误返回-1,并将errno设置为 EWOULDBLOCK,意为IO并没有数据。
    这时候就需要我们自己有一个机制,能知道什么时候有数据,在去调用read()。有一个很傻的方式就是不停的循环去调用这个函数,这样有数据来,我们第一时间就读到了。
    3)I/O复用模式
    I/O复用模式阻塞I/O的改进版,它在read之前,会先去调用select去遍历所有的socket,看哪一个有消息。当然这个过程是阻塞的,直到有消息返回为止。然后在去调用read,阻塞的方式去读取从系统内核中去读取这条消息到进程中来。
    4)信号驱动I/O
    信号驱动I/O是一个半异步的I/O模式,它首先会调用一个系统sginal相关的函数,把socket和信号绑定起来,然后不管有没有消息直接返回(这一步非阻塞)。这时候系统内核会去检查socket是否有可用数据。有的话则发送该信号给进程,然后进程在去调用read阻塞式的从系统内核读取数据到进程中来(这一步阻塞)。
    5)可能聪明的你已经想到了更好的解决方式,这就对了,这就是我们第5种IO模式:异步I/O ,它和第4步一样,也是调用sginal相关函数,把socket和信号绑定起来,同时绑定起来的还有一块数据缓冲区buffer。然后无论有没有数据直接返回(非阻塞)。而系统内核会去检查是否有可用数据,一旦有可用数据,则触发信号,并且把数据填充到我们之前提供的数据缓冲区buffer中。这样我们进程被信号触发,并且直接能从buffer中读取到数据,整个过程没有任何阻塞。
    很显然,我们CocoaAyncSocket框架用的就是第5种I/O模式。

    如果大家对I/O模式仍然感到疑惑,可以看看这篇文章:
    socket阻塞与非阻塞,同步与异步、I/O模型

    接着我们继续看本文方法十一 -- 连接成功后的初始化中第6条,读写source的初始化方法:

    本文方法十二 -- 初始化读写source:
    //初始化读写source
    - (void)setupReadAndWriteSourcesForNewlyConnectedSocket:(int)socketFD
    {
        //GCD source DISPATCH_SOURCE_TYPE_READ 会一直监视着 socketFD,直到有数据可读
        readSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, socketFD, 0, socketQueue);
        //_dispatch_source_type_write :监视着 socketFD,直到写数据了
        writeSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, socketFD, 0, socketQueue);
        
        // Setup event handlers
        
        __weak GCDAsyncSocket *weakSelf = self;
        
    #pragma mark readSource的回调
    
        //GCD事件句柄  读,当socket中有数据流出现,就会触发这个句柄,全自动,不需要手动触发
        dispatch_source_set_event_handler(readSource, ^{ @autoreleasepool {
        #pragma clang diagnostic push
        #pragma clang diagnostic warning "-Wimplicit-retain-self"
            
            __strong GCDAsyncSocket *strongSelf = weakSelf;
            if (strongSelf == nil) return_from_block;
            
            LogVerbose(@"readEventBlock");
            //从readSource中,获取到数据长度,
            strongSelf->socketFDBytesAvailable = dispatch_source_get_data(strongSelf->readSource);
            LogVerbose(@"socketFDBytesAvailable: %lu", strongSelf->socketFDBytesAvailable);
            
            //如果长度大于0,开始读数据
            if (strongSelf->socketFDBytesAvailable > 0)
                [strongSelf doReadData];
            else
                //因为触发了,但是却没有可读数据,说明读到当前包边界了。做边界处理
                [strongSelf doReadEOF];
            
        #pragma clang diagnostic pop
        }});
        
        //写事件句柄
        dispatch_source_set_event_handler(writeSource, ^{ @autoreleasepool {
        #pragma clang diagnostic push
        #pragma clang diagnostic warning "-Wimplicit-retain-self"
            
            __strong GCDAsyncSocket *strongSelf = weakSelf;
            if (strongSelf == nil) return_from_block;
            
            LogVerbose(@"writeEventBlock");
            //标记为接受数据
            strongSelf->flags |= kSocketCanAcceptBytes;
            //开始写
            [strongSelf doWriteData];
            
        #pragma clang diagnostic pop
        }});
        
        // Setup cancel handlers
        
        __block int socketFDRefCount = 2;
        
        #if !OS_OBJECT_USE_OBJC
        dispatch_source_t theReadSource = readSource;
        dispatch_source_t theWriteSource = writeSource;
        #endif
        
        //读写取消的句柄
        dispatch_source_set_cancel_handler(readSource, ^{
        #pragma clang diagnostic push
        #pragma clang diagnostic warning "-Wimplicit-retain-self"
            
            LogVerbose(@"readCancelBlock");
            
            #if !OS_OBJECT_USE_OBJC
            LogVerbose(@"dispatch_release(readSource)");
            dispatch_release(theReadSource);
            #endif
            
            if (--socketFDRefCount == 0)
            {
                LogVerbose(@"close(socketFD)");
                //关闭socket
                close(socketFD);
            }
            
        #pragma clang diagnostic pop
        });
        
        dispatch_source_set_cancel_handler(writeSource, ^{
        #pragma clang diagnostic push
        #pragma clang diagnostic warning "-Wimplicit-retain-self"
            
            LogVerbose(@"writeCancelBlock");
            
            #if !OS_OBJECT_USE_OBJC
            LogVerbose(@"dispatch_release(writeSource)");
            dispatch_release(theWriteSource);
            #endif
            
            if (--socketFDRefCount == 0)
            {
                LogVerbose(@"close(socketFD)");
                //关闭socket
                close(socketFD);
            }
            
        #pragma clang diagnostic pop
        });
        
        // We will not be able to read until data arrives.
        // But we should be able to write immediately.
        
        //设置未读数量为0
        socketFDBytesAvailable = 0;
        //把读挂起的状态移除
        flags &= ~kReadSourceSuspended;
        
        LogVerbose(@"dispatch_resume(readSource)");
        //开启读source
        dispatch_resume(readSource);
        
        //标记为当前可接受数据
        flags |= kSocketCanAcceptBytes;
        //先把写source标记为挂起
        flags |= kWriteSourceSuspended;
    }
    

    这个方法初始化了读写source,这个方法主要是GCD source运用,如果有对这部分知识有所疑问,可以看看宜龙大神这篇:GCD高级用法
    这里GCD Source相关的主要是下面这3个函数:

    //创建source
    dispatch_source_create(dispatch_source_type_t type,
        uintptr_t handle,
        unsigned long mask,
        dispatch_queue_t _Nullable queue);
    //为source设置事件句柄
    dispatch_source_set_event_handler(dispatch_source_t source,
        dispatch_block_t _Nullable handler);
    //为source设置取消句柄
    dispatch_source_set_cancel_handler(dispatch_source_t source,
        dispatch_block_t _Nullable handler);
    

    相信大家用至少用过GCD定时器,接触过这3个函数,这里创建source的函数,根据参数type的不同,可以处理不同的事件:

    这里我们用的是DISPATCH_SOURCE_TYPE_READDISPATCH_SOURCE_TYPE_WRITE这两个类型。标识如果handle如果有可读或者可写数据时,会触发我们的事件句柄。

    • 而这里初始化的读写事件句柄内容也很简单,就是去读写数据。
    • 而取消句柄也就是去关闭socket
    • 初始化完成后,我们开启了readSource,一旦有数据过来就触发了我们readSource事件句柄,就可以去监听的socket所分配的缓冲区中去读取数据了,而wirteSource初始化完是挂起的。
    • 除此之外我们还初始化了当前source的状态,用于我们后续的操作。

    至此我们客户端的整个Connect流程结束了,用一张图来概括总结一下吧:

    连接流程图

    整个客户端连接的流程大致如上图,当然远不及于此,这里我们对地址做了IPV4IPV6的兼容处理,对一些使用socket而产生的网络错误导致进程退出的容错处理。以及在这个过程中,socketQueue、代理queue、全局并发queuestream常驻线程的管理调度等等。

    当然其中绝大部分操作都是在socketQueue中进行的。而在socketQueue中,我们也分为两种操作dispatch_syncdispatch_async
    因为socketQueue本身就是一个串行queue,所以我们所有的操作都在这个queue中进行保证了线程安全,而需要阻塞后续行为的操作,我们用了sync的方式。其实这样使用sync是及其容易死锁的,但是作者每次在调用sync之前都调用了这么一行判断:

    if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))
    

    判断当前队列是否就是这个socketQueue队列,如果是则直接调用,否则就用sync的方式提交到这个queue中去执行。这种防死锁的方式,你学到了么?

    接着我们来讲讲服务端Accept流程:

    整个流程还是相对Connect来说还是十分简单的,因为这个方法很长,而且大多数是我们直接连接讲到过得内容,所以我省略了一部分的代码,只把重要的展示出来,大家可以参照着源码看。

    //监听端口起点
    - (BOOL)acceptOnPort:(uint16_t)port error:(NSError **)errPtr
    {
         return [self acceptOnInterface:nil port:port error:errPtr];
    }
    
    - (BOOL)acceptOnInterface:(NSString *)inInterface port:(uint16_t)port error:(NSError **)errPtr
    {
         LogTrace();
         
         // Just in-case interface parameter is immutable.
        //防止参数被修改
         NSString *interface = [inInterface copy];
         
         __block BOOL result = NO;
         __block NSError *err = nil;
         
         // CreateSocket Block
         // This block will be invoked within the dispatch block below.
         //创建socket的Block
         int(^createSocket)(int, NSData*) = ^int (int domain, NSData *interfaceAddr) {
              
            //创建TCP的socket
              int socketFD = socket(domain, SOCK_STREAM, 0);
         
              //一系列错误判断
              ...
              // Bind socket
            //用本地地址去绑定
              status = bind(socketFD, (const struct sockaddr *)[interfaceAddr bytes], (socklen_t)[interfaceAddr length]);
         
              //监听这个socket
            //第二个参数是这个端口下维护的socket请求队列,最多容纳的用户请求数。
              status = listen(socketFD, 1024);
              return socketFD;
         };
         
         // Create dispatch block and run on socketQueue
         
         dispatch_block_t block = ^{ @autoreleasepool {
              
             //一系列错误判断
              ...
    
            //判断ipv4 ipv6是否支持
              ...
    
            //得到本机的IPV4 IPV6的地址
              [self getInterfaceAddress4:&interface4 address6:&interface6 fromDescription:interface port:port];
              ...
              
            //判断可以用IPV4还是6进行请求
              ...
              
              // Create accept sources
              //创建接受连接被触发的source
              if (enableIPv4)
              {
                //接受连接的source
                   accept4Source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, socket4FD, 0, socketQueue);
                   
             
                   
                //事件句柄
                   dispatch_source_set_event_handler(accept4Source, ^{ @autoreleasepool {
                 
                    //拿到数据,连接数
                        unsigned long numPendingConnections = dispatch_source_get_data(acceptSource);
                        
                        LogVerbose(@"numPendingConnections: %lu", numPendingConnections);
                        
                    //循环去接受这些socket的事件(一次触发可能有多个连接)
                        while ([strongSelf doAccept:socketFD] && (++i < numPendingConnections));
                        
                   }});
                   
                   //取消句柄
                   dispatch_source_set_cancel_handler(accept4Source, ^{
                    //...
                    //关闭socket
                        close(socketFD);
                   
                   });
                   
                //开启source
                   dispatch_resume(accept4Source);
              }
              
            //ipv6一样
              ...
    
        //在scoketQueue中同步做这些初始化。
         if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))
              block();
         else
              dispatch_sync(socketQueue, block);
         
         //...错误判断
         //返回结果
         return result;
    }
    

    这个方法省略完仍然有这么长,它主要做了这两件事(篇幅原因,尽量精简):

    1. 创建本机地址、创建socket、绑定端口、监听端口。
    2. 创建了一个GCD Source,来监听这个socket读source,这样连接事件一发生,就会触发我们的事件句柄。接着我们调用了doAccept:方法循环去接受所有的连接。

    接着我们来看这个接受连接的方法(同样省略了一部分不那么重要的代码):

    //连接接受的方法
    - (BOOL)doAccept:(int)parentSocketFD
    {
         LogTrace();
         
         int socketType;
         int childSocketFD;
         NSData *childSocketAddress;
         
        //IPV4
         if (parentSocketFD == socket4FD)
         {
              socketType = 0;
              
              struct sockaddr_in addr;
              socklen_t addrLen = sizeof(addr);
              //调用接受,得到接受的子socket
              childSocketFD = accept(parentSocketFD, (struct sockaddr *)&addr, &addrLen);
              //NO说明没有连接
              if (childSocketFD == -1)
              {
                   LogWarn(@"Accept failed with error: %@", [self errnoError]);
                   return NO;
              }
              //子socket的地址数据
              childSocketAddress = [NSData dataWithBytes:&addr length:addrLen];
         }
        //一样
         else if (parentSocketFD == socket6FD)
         {
              ...
         }
        //unix domin socket 一样
         else // if (parentSocketFD == socketUN)
         {
              ...
         }
         
         //socket 配置项的设置... 和connect一样
         
        //响应代理
         if (delegateQueue)
         {
              __strong id theDelegate = delegate;
              //代理队列中调用
              dispatch_async(delegateQueue, ^{ @autoreleasepool {
                   
                   // Query delegate for custom socket queue
                   
                   dispatch_queue_t childSocketQueue = NULL;
                   
                //判断是否实现了为socket 生成一个新的SocketQueue,是的话拿到新queue
                   if ([theDelegate respondsToSelector:@selector(newSocketQueueForConnectionFromAddress:onSocket:)])
                   {
                        childSocketQueue = [theDelegate newSocketQueueForConnectionFromAddress:childSocketAddress
                                                                                      onSocket:self];
                   }
                   
                   // Create GCDAsyncSocket instance for accepted socket
                   //新创建一个本类实例,给接受的socket
                   GCDAsyncSocket *acceptedSocket = [[[self class] alloc] initWithDelegate:theDelegate
                                                                                           delegateQueue:delegateQueue
                                                                                              socketQueue:childSocketQueue];
                   //IPV4 6 un
                   if (socketType == 0)
                        acceptedSocket->socket4FD = childSocketFD;
                   else if (socketType == 1)
                        acceptedSocket->socket6FD = childSocketFD;
                   else
                        acceptedSocket->socketUN = childSocketFD;
                   //标记开始 并且已经连接
                   acceptedSocket->flags = (kSocketStarted | kConnected);
                   
                   // Setup read and write sources for accepted socket
                   //初始化读写source
                   dispatch_async(acceptedSocket->socketQueue, ^{ @autoreleasepool {
                        
                        [acceptedSocket setupReadAndWriteSourcesForNewlyConnectedSocket:childSocketFD];
                   }});
                   
                //判断代理是否实现了didAcceptNewSocket方法,把我们新创建的socket返回出去
                   if ([theDelegate respondsToSelector:@selector(socket:didAcceptNewSocket:)])
                   {
                        [theDelegate socket:self didAcceptNewSocket:acceptedSocket];
                   }
               
              }});
         }
         return YES;
    }
    
    • 这个方法很简单,核心就是调用下面这个函数,去接受连接,并且拿到一个新的socket
    childSocketFD = accept(parentSocketFD, (struct sockaddr *)&addr, &addrLen);
    
    • 然后调用了newSocketQueueForConnectionFromAddress:onSocket:这个代理,可以为新的socket重新设置一个socketQueue
    • 接着我们用这个Socket重新创建了一个GCDAsyncSocket实例,然后调用我们的代理didAcceptNewSocket方法,把这个实例给传出去了。
    • 这里需要注意的是,我们调用didAcceptNewSocket代理方法传出去的实例我们需要自己保留,不然就会被释放掉,那么这个与客户端的连接也就断开了。
    • 同时我们还初始化了这个新socket的读写source,这一步完全和connect中一样,调用同一个方法,这样如果有读写数据,就会触发这个新的socketsource了。

    建立连接之后的无数个新的socket,都是独立的,它们处理读写连接断开的逻辑就和客户端socket完全一样了。
    而我们监听本机端口的那个socket始终只有一个,这个用来监听触发socket连接,并返回创建我们这无数个新的socket实例。

    作为服务端的Accept流程就这么结束了,因为篇幅原因,所以尽量精简了一些细节的处理,不过这些处理在Connect中也是反复出现的,所以基本无伤大雅。如果大家会感到困惑,建议下载github中的源码注释,对照着再看一遍,相信会有帮助的。

    接着我们来讲讲Unix Domin Socket建立本地进程通信流程:

    基本上这个流程,比上述任何流程还要简单,简单的到即使不简化代码,也没多少行(当然这是建立在客户端Connect流程已经实现了很多公用方法的基础上)。

    接着进入正题,我们来看看它发起连接的方法:

    //连接本机的url上,IPC,进程间通信
    - (BOOL)connectToUrl:(NSURL *)url withTimeout:(NSTimeInterval)timeout error:(NSError **)errPtr;
    {
        LogTrace();
        
        __block BOOL result = NO;
        __block NSError *err = nil;
        
        dispatch_block_t block = ^{ @autoreleasepool {
            
            //判断长度
            if ([url.path length] == 0)
            {
                NSString *msg = @"Invalid unix domain socket url.";
                err = [self badParamError:msg];
                
                return_from_block;
            }
            
            // Run through standard pre-connect checks
            //前置的检查
            if (![self preConnectWithUrl:url error:&err])
            {
                return_from_block;
            }
            
            // We've made it past all the checks.
            // It's time to start the connection process.
            
            flags |= kSocketStarted;
            
            // Start the normal connection process
            
            NSError *connectError = nil;
            //调用另一个方法去连接
            if (![self connectWithAddressUN:connectInterfaceUN error:&connectError])
            {
                [self closeWithError:connectError];
                
                return_from_block;
            }
    
            [self startConnectTimeout:timeout];
            
            result = YES;
        }};
        
        //在socketQueue中同步执行
        if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))
            block();
        else
            dispatch_sync(socketQueue, block);
        
        if (result == NO)
        {
            if (errPtr)
                *errPtr = err;
        }
        
        return result;
    }
    

    连接方法非常简单,就只是做了一些错误的处理,然后调用了其他的方法,包括一个前置检查,这检查中会去判断各种参数是否正常,如果正常会返回YES,并且把url转换成Uinix domin socket地址的结构体,赋值给我们的属性connectInterfaceUN
    接着调用了connectWithAddressUN方法去发起连接。

    我们接着来看看这个方法:

    //连接Unix域服务器
    - (BOOL)connectWithAddressUN:(NSData *)address error:(NSError **)errPtr
    {
         LogTrace();
         
         NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
         
         // Create the socket
         
         int socketFD;
         
         LogVerbose(@"Creating unix domain socket");
         
        //创建本机socket
         socketUN = socket(AF_UNIX, SOCK_STREAM, 0);
         
         socketFD = socketUN;
         
         if (socketFD == SOCKET_NULL)
         {
              if (errPtr)
                   *errPtr = [self errnoErrorWithReason:@"Error in socket() function"];
              
              return NO;
         }
         
         // Bind the socket to the desired interface (if needed)
         
         LogVerbose(@"Binding socket...");
         
         int reuseOn = 1;
        //设置可复用
         setsockopt(socketFD, SOL_SOCKET, SO_REUSEADDR, &reuseOn, sizeof(reuseOn));
    
         // Prevent SIGPIPE signals
         
         int nosigpipe = 1;
        //进程终止错误信号禁止
         setsockopt(socketFD, SOL_SOCKET, SO_NOSIGPIPE, &nosigpipe, sizeof(nosigpipe));
         
         // Start the connection process in a background queue
         
         int aStateIndex = stateIndex;
         
         dispatch_queue_t globalConcurrentQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
         dispatch_async(globalConcurrentQueue, ^{
              
              const struct sockaddr *addr = (const struct sockaddr *)[address bytes];
            //并行队列调用连接
              int result = connect(socketFD, addr, addr->sa_len);
              if (result == 0)
              {
                   dispatch_async(socketQueue, ^{ @autoreleasepool {
                        //连接成功的一些状态初始化
                        [self didConnect:aStateIndex];
                   }});
              }
              else
              {
                   // 失败的处理
                   perror("connect");
                   NSError *error = [self errnoErrorWithReason:@"Error in connect() function"];
                   
                   dispatch_async(socketQueue, ^{ @autoreleasepool {
                        
                        [self didNotConnect:aStateIndex error:error];
                   }});
              }
         });
         
         LogVerbose(@"Connecting...");
         
         return YES;
    }
    

    主要部分基本和客户端连接相同,并且简化了很多,调用了这一行完成了连接:

    int result = connect(socketFD, addr, addr->sa_len);
    

    同样也和客户端一样,在连接成功之后去调用下面这个方法完成了一些资源的初始化:

     [self didConnect:aStateIndex];
    

    基本上连接就这么两个方法了(当然我们省略了一些细节),看完客户端的连接之后,到这就变得非常简单了。

    接着我们来看看uinix domin socket作为服务端Accept。

    这个Accpet,基本和我们普通Socket服务端的Accept相同。

    //接受一个Url,uniex domin socket 做为服务端
    - (BOOL)acceptOnUrl:(NSURL *)url error:(NSError **)errPtr;
    {
         LogTrace();
         
         __block BOOL result = NO;
         __block NSError *err = nil;
         
        //基本和正常的socket accept一模一样
         // CreateSocket Block
         // This block will be invoked within the dispatch block below.
         //生成一个创建socket的block,创建、绑定、监听
         int(^createSocket)(int, NSData*) = ^int (int domain, NSData *interfaceAddr) {
              
              //creat  socket 
              ...
              // Set socket options
       
              ...
              // Bind socket
              
              ...
              
              // Listen
              ...
         };
         
         // Create dispatch block and run on socketQueue
         //错误判断
         dispatch_block_t block = ^{ @autoreleasepool {
              
              //错误判断
              ...
              
             
              //判断是否有这个url路径是否正确
              ...
    
              //调用上面的Block创建socket,并且绑定监听。
              ...
              
              //创建接受连接的source
              acceptUNSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, socketUN, 0, socketQueue);
              
              int socketFD = socketUN;
              dispatch_source_t acceptSource = acceptUNSource;
              //事件句柄,和accpept一样
              dispatch_source_set_event_handler(acceptUNSource, ^{ @autoreleasepool {
                   //循环去接受所有的每一个连接
                   ...
              }});
              
              //取消句柄
              dispatch_source_set_cancel_handler(acceptUNSource, ^{
                   
                   //关闭socket
                   close(socketFD);
              });
              
              LogVerbose(@"dispatch_resume(accept4Source)");
              dispatch_resume(acceptUNSource);
              
              flags |= kSocketStarted;
              
              result = YES;
         }};
         
         if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))
              block();
         else
              dispatch_sync(socketQueue, block);
         //填充错误
         if (result == NO)
         {
              LogInfo(@"Error in accept: %@", err);
              
              if (errPtr)
                   *errPtr = err;
         }
         
         return result; 
    }
    

    因为代码基本雷同,所以我们省略了大部分代码,大家可以参照着之前的讲解或者源码去理解。这里和普通服务端socket唯一的区别就是,这里服务端绑定的地址是unix domin socket类型的地址,它是一个结构体,里面包含的是我们进行进程通信的纽带-一个本机文件路径。
    所以这里服务端简单来说就是绑定的这个文件路径,当这个文件路径有数据可读(即有客户端连接到达)的时候,会触发初始化的source事件句柄,我们会去循环的接受所有的连接,并且新生成一个socket实例,这里和普通的socket完全一样。

    就这样我们所有的连接方式已经讲完了,后面这两种方式,为了节省篇幅,确实讲的比较粗略,但是核心的部分都有提到。
    另外如果你有理解客户端的Connect流程,那么理解起来应该没有什么问题,这两个流程比前者可简化太多了。

    写在结尾:

    这个框架的Connect篇到此为止了,其实想一篇结束一块内容的,但是代码量实在太多,如果讲的太粗略,大家也很难去学习到真正的内容。但是楼主也不想写的太长,太琐碎,相信大家都很难看下去,不过万幸能两篇内总结完。
    之后的内容,等过完年会继续写。包括read篇和write篇等等,希望这个系列能让大家能对Socket编程有个新的认识和理解。以后也可以自己上手Socket,运用于项目中去。

    转眼过年了,回想这一年,许多地方都做的差强人意,希望2017有个更好的愿景吧。
    纸上学来终觉浅,绝知此事要躬行。

    相关文章

      网友评论

      • LoveCode:大神你好:
        我在使用这个框架的时候遇到了一个谜一样的问题,至今没有解决思路,若大神有何见解敬请告知。

        问题是这样的:我们使用这个框架通过WiFi与智能设备构建socket连接,在正常情况下都是可以正常使用的,但是当用户使用T-Mobile运营商的的时候socket就连接超时,美国其他的运营商还有中国的运营商的都可以正常连接,我们甚至把T-Mobile的卡拿到中国来使用(此时中国移动提供漫游服务)也可以正常连接。更让人不解的是同一台手机关闭蜂窝数据按钮或者开启飞行模式后又都可以正常连接。

        另外我们使用CFSocket写了个demo运行在上述连接不上的手机环境下却可以连接成功。

        问题至此描述就结束了,大神有什么看法么?
      • 35607579afd6:能否请教下,为何connect用socket,数据流的读写又被桥接到了cfstream。这样有何优势?
      • 852c98ad1b17:你好楼主,请教下 所有的操作都在一个队列里面是为了保证了线程安全,那else什么时候回执行到呢
        if (dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey))
        block();
        else
        dispatch_sync(socketQueue, block);
      • c4b1be2599c4:我很好奇为什么只有connect要用阻塞的?
      • Curry_J_X:你好,GCDAsyncSocket如何创建多个TCP长连接

        @interface ViewController : NSViewController
        {
        @private
        GCDAsyncSocket *asyncSocket1;
        GCDAsyncSocket *asyncSocket2;
        }

        这是第一个连接 :
        dispatch_queue_t mainQueue = dispatch_get_main_queue();

        asyncSocket1 = [[GCDAsyncSocket alloc] initWithDelegate:self delegateQueue:mainQueue];

        {
        NSString *host = @"192.168.1.101";
        uint16_t port = 8080;


        NSLog(@"Connecting to \"%@\" on port %hu...", host, port);

        NSError *error = nil;
        if (![asyncSocket1 connectToHost:host onPort:port error:&error])
        {
        NSLog(@"Error connecting: %@", error);
        }
        }
        第二个连接如下
        asyncSocket2 = [[GCDAsyncSocket alloc] initWithDelegate:self delegateQueue:mainQueue];
        {
        NSString *host = @"192.168.1.102";
        uint16_t port = 8080;


        NSLog(@"Connecting to \"%@\" on port %hu...", host, port);

        NSError *error = nil;
        if (![asyncSocket2 connectToHost:host onPort:port error:&error])
        {
        NSLog(@"Error connecting: %@", error);
        }
        }
        现在我想连的服务器不确定几个,我该怎么建立n个socket连接呢?即一个 GCDAsyncSocket client 实例如何建立多个TCP连接 ?
        涂耀辉:@故事在哪 创建个数组属性,for循环初始化5个socket,加到数组里去
        Curry_J_X:@涂耀辉 GCDAsyncSocket *asyncSocket = [[GCDAsyncSocket alloc]initWithDelegate:self delegateQueue:serialQueue ];这样初始化的话会调用socketDidDisconnect socket就断开了,
        我将 GCDAsyncSocket *asyncSocket写为成员变量就可以正常连接。
        现在我想连5个服务器,难道要写5个 成员变量吗?有没有其他做法呢?
        涂耀辉:想创建多少个连接,就初始化多少个GCDAsyncSocket,调用多少个connect就行。
      • 曲年:如果在文章前加一点socket基本原理就更好了。这样带着目的去读效果会好一些。
      • 四分热度:看完两篇有点懵逼
        四分热度:@涂耀辉 很清楚 我理解问题:sweat:
        涂耀辉:@四分热度 哪里不清楚吗:joy:
      • 十一岁的加重:收藏收藏
      • 瘦子书生:大神,还是有点看不懂:sweat: 这些东西有点混淆了
        涂耀辉:@egoCogito_panf 最近有时间就会陆续写完的~感谢打赏:yum:
        egoCogito_panf:学习了,希望大神出read篇和write篇:smile:
        涂耀辉:@瘦子书生 回头再看几遍就明白了:smile:

      本文标题:iOS即时通讯进阶 - CocoaAsyncSocket源码解析

      本文链接:https://www.haomeiwen.com/subject/zywzbttx.html