美文网首页
RACSignal中的concat和concat:

RACSignal中的concat和concat:

作者: 一只二进制编码的狗 | 来源:发表于2016-01-28 17:20 被阅读1058次

    一开始没注意有两个方法,下面从源码看两个方法

    - (RACSignal *)concat;
    - (RACSignal *)concat:(RACSignal *)signal;
    

    先说concat,源码:

    - (RACSignal *)concat {
        return [[self flatten:1] setNameWithFormat:@"[%@] -concat", self.name];
    }
    

    方法还是走到了flatten:先看用法吧

        RACSubject *test1 = [RACSubject subject];
        RACSubject *test2 = [RACSubject subject];
        RACSubject *test3 = [RACSubject subject];
        
        RACSignal *sig = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
            [subscriber sendNext:test1];
            [subscriber sendNext:test2];
            [subscriber sendNext:test3];
            return nil;
        }];
        
        [[sig flatten:2] subscribeNext:^(id x) {
            NSLog(@"%@",x);
        }];
        
        [test1 sendNext:@"test1   1"];
        [test2 sendNext:@"test2   1"];
        [test3 sendNext:@"test3   1"];
    

    打印的日志:
    2016-01-28 16:17:34.207 demo[10439:133672] test1 1
    2016-01-28 16:17:34.208 demo[10439:133672] test2 1
    如果改成[sig flatten:3] 打印日志:
    2016-01-28 16:27:24.848 demo[17056:144114] test1 1
    2016-01-28 16:27:24.848 demo[17056:144114] test2 1
    2016-01-28 16:27:24.849 demo[17056:144114] test3 1
    可以大概猜出flatten:带的参数为最大的并发数量,下面开始看源码

    源码如下

    - (RACSignal *)flatten:(NSUInteger)maxConcurrent {
        return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
            RACCompoundDisposable *compoundDisposable = [[RACCompoundDisposable alloc] init];
    
            // Contains disposables for the currently active subscriptions.
            //
            // This should only be used while synchronized on `subscriber`.
            NSMutableArray *activeDisposables = [[NSMutableArray alloc] initWithCapacity:maxConcurrent];
    
            // Whether the signal-of-signals has completed yet.
            //
            // This should only be used while synchronized on `subscriber`.
            __block BOOL selfCompleted = NO;
    
            // Subscribes to the given signal.
            __block void (^subscribeToSignal)(RACSignal *);
    
            // Weak reference to the above, to avoid a leak.
            __weak __block void (^recur)(RACSignal *);
    
            // Sends completed to the subscriber if all signals are finished.
            //
            // This should only be used while synchronized on `subscriber`.
            void (^completeIfAllowed)(void) = ^{
                if (selfCompleted && activeDisposables.count == 0) {
                    [subscriber sendCompleted];
    
                    // A strong reference is held to `subscribeToSignal` until completion,
                    // preventing it from deallocating early.
                    subscribeToSignal = nil;
                }
            };
    
            // The signals waiting to be started.
            //
            // This array should only be used while synchronized on `subscriber`.
            NSMutableArray *queuedSignals = [NSMutableArray array];
    
            recur = subscribeToSignal = ^(RACSignal *signal) {
    //处理sendnext得到的racsignal
                RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];
    
                @synchronized (subscriber) {
                    [compoundDisposable addDisposable:serialDisposable];
                    [activeDisposables addObject:serialDisposable];
                }
    //订阅这个signal
                serialDisposable.disposable = [signal subscribeNext:^(id x) {
                    [subscriber sendNext:x];
                } error:^(NSError *error) {
                    [subscriber sendError:error];
                } completed:^{
    //如果这个信号已经结束,那么就去判断刚才那些因为不能超过最大并发而存到queuedSignals里面的信号
                    __strong void (^subscribeToSignal)(RACSignal *) = recur;
                    RACSignal *nextSignal;
    
                    @synchronized (subscriber) {
                        [compoundDisposable removeDisposable:serialDisposable];
                        [activeDisposables removeObjectIdenticalTo:serialDisposable];
    
                        if (queuedSignals.count == 0) {
                            completeIfAllowed();
                            return;
                        }
    
                        nextSignal = queuedSignals[0];
                        [queuedSignals removeObjectAtIndex:0];
                    }
    
                    subscribeToSignal(nextSignal);
                }];
            };
    //订阅自己,只接受sendnext类型为RACSignal的值
            [compoundDisposable addDisposable:[self subscribeNext:^(RACSignal *signal) {
                if (signal == nil) return;
    
                NSCAssert([signal isKindOfClass:RACSignal.class], @"Expected a RACSignal, got %@", signal);
    
                @synchronized (subscriber) {
    //维护了一个数组,并发的数量不能超过maxConcurrent,如果超过就加入queuedSignals数组
                    if (maxConcurrent > 0 && activeDisposables.count >= maxConcurrent) {
                        [queuedSignals addObject:signal];
    
                        // If we need to wait, skip subscribing to this
                        // signal.
                        return;
                    }
                }
    
                subscribeToSignal(signal);
            } error:^(NSError *error) {
                [subscriber sendError:error];
            } completed:^{
                @synchronized (subscriber) {
                    selfCompleted = YES;
                    completeIfAllowed();
                }
            }]];
    
            return compoundDisposable;
        }] setNameWithFormat:@"[%@] -flatten: %lu", self.name, (unsigned long)maxConcurrent];
    }
    

    相关文章

      网友评论

          本文标题:RACSignal中的concat和concat:

          本文链接:https://www.haomeiwen.com/subject/nfkjkttx.html