美文网首页
RACSignal中的concat和concat:

RACSignal中的concat和concat:

作者: 一只二进制编码的狗 | 来源:发表于2016-01-28 17:20 被阅读1058次

一开始没注意有两个方法,下面从源码看两个方法

- (RACSignal *)concat;
- (RACSignal *)concat:(RACSignal *)signal;

先说concat,源码:

- (RACSignal *)concat {
    return [[self flatten:1] setNameWithFormat:@"[%@] -concat", self.name];
}

方法还是走到了flatten:先看用法吧

    RACSubject *test1 = [RACSubject subject];
    RACSubject *test2 = [RACSubject subject];
    RACSubject *test3 = [RACSubject subject];
    
    RACSignal *sig = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
        [subscriber sendNext:test1];
        [subscriber sendNext:test2];
        [subscriber sendNext:test3];
        return nil;
    }];
    
    [[sig flatten:2] subscribeNext:^(id x) {
        NSLog(@"%@",x);
    }];
    
    [test1 sendNext:@"test1   1"];
    [test2 sendNext:@"test2   1"];
    [test3 sendNext:@"test3   1"];

打印的日志:
2016-01-28 16:17:34.207 demo[10439:133672] test1 1
2016-01-28 16:17:34.208 demo[10439:133672] test2 1
如果改成[sig flatten:3] 打印日志:
2016-01-28 16:27:24.848 demo[17056:144114] test1 1
2016-01-28 16:27:24.848 demo[17056:144114] test2 1
2016-01-28 16:27:24.849 demo[17056:144114] test3 1
可以大概猜出flatten:带的参数为最大的并发数量,下面开始看源码

源码如下

- (RACSignal *)flatten:(NSUInteger)maxConcurrent {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACCompoundDisposable *compoundDisposable = [[RACCompoundDisposable alloc] init];

        // Contains disposables for the currently active subscriptions.
        //
        // This should only be used while synchronized on `subscriber`.
        NSMutableArray *activeDisposables = [[NSMutableArray alloc] initWithCapacity:maxConcurrent];

        // Whether the signal-of-signals has completed yet.
        //
        // This should only be used while synchronized on `subscriber`.
        __block BOOL selfCompleted = NO;

        // Subscribes to the given signal.
        __block void (^subscribeToSignal)(RACSignal *);

        // Weak reference to the above, to avoid a leak.
        __weak __block void (^recur)(RACSignal *);

        // Sends completed to the subscriber if all signals are finished.
        //
        // This should only be used while synchronized on `subscriber`.
        void (^completeIfAllowed)(void) = ^{
            if (selfCompleted && activeDisposables.count == 0) {
                [subscriber sendCompleted];

                // A strong reference is held to `subscribeToSignal` until completion,
                // preventing it from deallocating early.
                subscribeToSignal = nil;
            }
        };

        // The signals waiting to be started.
        //
        // This array should only be used while synchronized on `subscriber`.
        NSMutableArray *queuedSignals = [NSMutableArray array];

        recur = subscribeToSignal = ^(RACSignal *signal) {
//处理sendnext得到的racsignal
            RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];

            @synchronized (subscriber) {
                [compoundDisposable addDisposable:serialDisposable];
                [activeDisposables addObject:serialDisposable];
            }
//订阅这个signal
            serialDisposable.disposable = [signal subscribeNext:^(id x) {
                [subscriber sendNext:x];
            } error:^(NSError *error) {
                [subscriber sendError:error];
            } completed:^{
//如果这个信号已经结束,那么就去判断刚才那些因为不能超过最大并发而存到queuedSignals里面的信号
                __strong void (^subscribeToSignal)(RACSignal *) = recur;
                RACSignal *nextSignal;

                @synchronized (subscriber) {
                    [compoundDisposable removeDisposable:serialDisposable];
                    [activeDisposables removeObjectIdenticalTo:serialDisposable];

                    if (queuedSignals.count == 0) {
                        completeIfAllowed();
                        return;
                    }

                    nextSignal = queuedSignals[0];
                    [queuedSignals removeObjectAtIndex:0];
                }

                subscribeToSignal(nextSignal);
            }];
        };
//订阅自己,只接受sendnext类型为RACSignal的值
        [compoundDisposable addDisposable:[self subscribeNext:^(RACSignal *signal) {
            if (signal == nil) return;

            NSCAssert([signal isKindOfClass:RACSignal.class], @"Expected a RACSignal, got %@", signal);

            @synchronized (subscriber) {
//维护了一个数组,并发的数量不能超过maxConcurrent,如果超过就加入queuedSignals数组
                if (maxConcurrent > 0 && activeDisposables.count >= maxConcurrent) {
                    [queuedSignals addObject:signal];

                    // If we need to wait, skip subscribing to this
                    // signal.
                    return;
                }
            }

            subscribeToSignal(signal);
        } error:^(NSError *error) {
            [subscriber sendError:error];
        } completed:^{
            @synchronized (subscriber) {
                selfCompleted = YES;
                completeIfAllowed();
            }
        }]];

        return compoundDisposable;
    }] setNameWithFormat:@"[%@] -flatten: %lu", self.name, (unsigned long)maxConcurrent];
}

相关文章

网友评论

      本文标题:RACSignal中的concat和concat:

      本文链接:https://www.haomeiwen.com/subject/nfkjkttx.html