一开始没注意有两个方法,下面从源码看两个方法
- (RACSignal *)concat;
- (RACSignal *)concat:(RACSignal *)signal;
先说concat,源码:
- (RACSignal *)concat {
return [[self flatten:1] setNameWithFormat:@"[%@] -concat", self.name];
}
方法还是走到了flatten:先看用法吧
RACSubject *test1 = [RACSubject subject];
RACSubject *test2 = [RACSubject subject];
RACSubject *test3 = [RACSubject subject];
RACSignal *sig = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:test1];
[subscriber sendNext:test2];
[subscriber sendNext:test3];
return nil;
}];
[[sig flatten:2] subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
[test1 sendNext:@"test1 1"];
[test2 sendNext:@"test2 1"];
[test3 sendNext:@"test3 1"];
打印的日志:
2016-01-28 16:17:34.207 demo[10439:133672] test1 1
2016-01-28 16:17:34.208 demo[10439:133672] test2 1
如果改成[sig flatten:3] 打印日志:
2016-01-28 16:27:24.848 demo[17056:144114] test1 1
2016-01-28 16:27:24.848 demo[17056:144114] test2 1
2016-01-28 16:27:24.849 demo[17056:144114] test3 1
可以大概猜出flatten:带的参数为最大的并发数量,下面开始看源码
源码如下
- (RACSignal *)flatten:(NSUInteger)maxConcurrent {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *compoundDisposable = [[RACCompoundDisposable alloc] init];
// Contains disposables for the currently active subscriptions.
//
// This should only be used while synchronized on `subscriber`.
NSMutableArray *activeDisposables = [[NSMutableArray alloc] initWithCapacity:maxConcurrent];
// Whether the signal-of-signals has completed yet.
//
// This should only be used while synchronized on `subscriber`.
__block BOOL selfCompleted = NO;
// Subscribes to the given signal.
__block void (^subscribeToSignal)(RACSignal *);
// Weak reference to the above, to avoid a leak.
__weak __block void (^recur)(RACSignal *);
// Sends completed to the subscriber if all signals are finished.
//
// This should only be used while synchronized on `subscriber`.
void (^completeIfAllowed)(void) = ^{
if (selfCompleted && activeDisposables.count == 0) {
[subscriber sendCompleted];
// A strong reference is held to `subscribeToSignal` until completion,
// preventing it from deallocating early.
subscribeToSignal = nil;
}
};
// The signals waiting to be started.
//
// This array should only be used while synchronized on `subscriber`.
NSMutableArray *queuedSignals = [NSMutableArray array];
recur = subscribeToSignal = ^(RACSignal *signal) {
//处理sendnext得到的racsignal
RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];
@synchronized (subscriber) {
[compoundDisposable addDisposable:serialDisposable];
[activeDisposables addObject:serialDisposable];
}
//订阅这个signal
serialDisposable.disposable = [signal subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
//如果这个信号已经结束,那么就去判断刚才那些因为不能超过最大并发而存到queuedSignals里面的信号
__strong void (^subscribeToSignal)(RACSignal *) = recur;
RACSignal *nextSignal;
@synchronized (subscriber) {
[compoundDisposable removeDisposable:serialDisposable];
[activeDisposables removeObjectIdenticalTo:serialDisposable];
if (queuedSignals.count == 0) {
completeIfAllowed();
return;
}
nextSignal = queuedSignals[0];
[queuedSignals removeObjectAtIndex:0];
}
subscribeToSignal(nextSignal);
}];
};
//订阅自己,只接受sendnext类型为RACSignal的值
[compoundDisposable addDisposable:[self subscribeNext:^(RACSignal *signal) {
if (signal == nil) return;
NSCAssert([signal isKindOfClass:RACSignal.class], @"Expected a RACSignal, got %@", signal);
@synchronized (subscriber) {
//维护了一个数组,并发的数量不能超过maxConcurrent,如果超过就加入queuedSignals数组
if (maxConcurrent > 0 && activeDisposables.count >= maxConcurrent) {
[queuedSignals addObject:signal];
// If we need to wait, skip subscribing to this
// signal.
return;
}
}
subscribeToSignal(signal);
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (subscriber) {
selfCompleted = YES;
completeIfAllowed();
}
}]];
return compoundDisposable;
}] setNameWithFormat:@"[%@] -flatten: %lu", self.name, (unsigned long)maxConcurrent];
}
网友评论