Rac迭代、绑定、Join
RACStream、RACSignal、RACSequence
1、RACSequence队列
typedef struct {
unsigned long state;
id __unsafe_unretained _Nullable * _Nullable itemsPtr;
unsigned long * _Nullable mutationsPtr;
unsigned long extra[5];
} NSFastEnumerationState;
@protocol NSFastEnumeration
//转换成C的数组
- (NSUInteger)countByEnumeratingWithState:(NSFastEnumerationState *)state objects:(id __unsafe_unretained _Nullable [_Nonnull])buffer count:(NSUInteger)len;
@end
RACStringSequence *sequence2=[@"12345" rac_sequence];
// head/tail countByEnumeratingWithState方式迭代
for (NSString *str in sequence2) {
NSLog(@"%@", str);
}
// 用nextObject的方式迭代
// head/ tail / nextObject
NSEnumerator *enumerator=sequence2.objectEnumerator;
NSString *next = enumerator.nextObject;
while(next!=nil) {
NSLog(@"%@", next);
next = enumerator.nextObject;
}
// A block used to evaluate head. This should be set to nil after _head
has been
// initialized.
//
// This is marked strong
instead of copy
because of some bizarre block
// copying bug. See https://github.com/ReactiveCocoa/ReactiveCocoa/pull/506.
//
// The signature of this block varies based on the value of hasDependency
:
//
// - If YES, this block is of type id (^)(id)
.
// - If NO, this block is of type id (^)(void)
.
//
// This property should only be accessed while synchronized on self.
@property (nonatomic, strong) id headBlock;
id (^block1) (void ) ;
id (^block2) (id ) ;
-
(RACSequence *)sequenceWithHeadBlock:(id (^)(void))headBlock tailBlock:(RACSequence<id> *(^)(void))tailBlock {
NSCParameterAssert(headBlock != nil);RACDynamicSequence *seq = [[RACDynamicSequence alloc] init];
seq.headBlock = [headBlock copy];
seq.tailBlock = [tailBlock copy];
seq.hasDependency = NO;
return seq;
} -
(RACSequence *)sequenceWithLazyDependency:(id (^)(void))dependencyBlock headBlock:(id (^)(id dependency))headBlock tailBlock:(RACSequence *(^)(id dependency))tailBlock {
NSCParameterAssert(dependencyBlock != nil);
NSCParameterAssert(headBlock != nil);RACDynamicSequence *seq = [[RACDynamicSequence alloc] init];
seq.headBlock = [headBlock copy];
seq.tailBlock = [tailBlock copy];
seq.dependencyBlock = [dependencyBlock copy];
seq.hasDependency = YES;
return seq;
}
- (RACSequence *)bind:(RACSequenceBindBlock (^)(void))block {
RACSequenceBindBlock bindBlock = block();
return [[self bind:bindBlock passingThroughValuesFromSequence:nil] setNameWithFormat:@"[%@] -bind:", self.name];
}
-
(RACSequence *)bind:(RACSequenceBindBlock)bindBlock passingThroughValuesFromSequence:(RACSequence *)passthroughSequence {
// Store values calculated in the dependency here instead, avoiding any kind
// of temporary collection and boxing.
//
// This relies on the implementation of RACDynamicSequence synchronizing
// access to its head, tail, and dependency, and we're only doing it because
// we really need the performance.
__block RACSequence *valuesSeq = self;
__block RACSequence *current = passthroughSequence;
__block BOOL stop = NO;RACSequence *sequence = [RACDynamicSequence sequenceWithLazyDependency:^ id {
// 这里 会控制 过滤掉空的
// 如果这里是RACEmptySequence
while (current.head == nil) {
if (stop) return nil;// We've exhausted the current sequence, create a sequence from the // next value. id value = valuesSeq.head; if (value == nil) { // We've exhausted all the sequences. stop = YES; return nil; } current = (id)bindBlock(value, &stop); if (current == nil) { stop = YES; return nil; } // valuesSeq.tail : 这里有一个迭代 valuesSeq = valuesSeq.tail; } NSCAssert([current isKindOfClass:RACSequence.class]
, @"-bind: block returned an object that is not a sequence: %@", current);
return nil;
} headBlock:^(id _) {
return current.head;
} tailBlock:^ id (id _) {
if (stop) return nil;
// current.tail 这里有一个迭代
//如果这里是RACEmptySequence
return [valuesSeq bind:bindBlock passingThroughValuesFromSequence:current.tail];
}];
sequence.name = self.name;
return sequence;
}
- (RACSequence *)bind:(RACStreamBindBlock)bindBlock passingThroughValuesFromSequence:(RACSequence *)passthroughSequence {
return passthroughSequence ?: self;
}
2、RACSignal
-
(RACSignal *)bind:(RACSignalBindBlock (^)(void))block {
NSCParameterAssert(block != NULL);/*
-
-bind: should:
- Subscribe to the original signal of values.
- Any time the original signal sends a value, transform it using the binding block.
- If the binding block returns a signal, subscribe to it, and pass all of its values through to the subscriber as they're received.
- If the binding block asks the bind to terminate, complete the original signal.
- When all signals complete, send completed to the subscriber.
-
If any signal sends an error at any point, send that to the subscriber.
*/
// 创建新的信号C
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACSignalBindBlock bindingBlock = block();__block volatile int32_t signalCount = 1; // indicates self
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
void (^completeSignal)(RACDisposable *) = ^(RACDisposable *finishedDisposable) {
if (OSAtomicDecrement32Barrier(&signalCount) == 0) {
[subscriber sendCompleted];
[compoundDisposable dispose];
} else {
[compoundDisposable removeDisposable:finishedDisposable];
}
};void (^addSignal)(RACSignal *) = ^(RACSignal *signal) {
OSAtomicIncrement32Barrier(&signalCount);RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init]; [compoundDisposable addDisposable:selfDisposable]; // B : 对新的信号进行订阅 ; 触发新信号产生消息; RACDisposable *disposable = [signal subscribeNext:^(id x) { // 给C发送消息; A-〉传递给B-〉传递给C [subscriber sendNext:x]; } error:^(NSError *error) { [compoundDisposable dispose]; [subscriber sendError:error]; } completed:^{ @autoreleasepool { completeSignal(selfDisposable); } }]; selfDisposable.disposable = disposable;
};
@autoreleasepool {
RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
[compoundDisposable addDisposable:selfDisposable];
// A:对原来的信号进行订阅;触发原来信号产生消息
RACDisposable *bindingDisposable = [self subscribeNext:^(id x) {
// Manually check disposal to handle synchronous errors.
if (compoundDisposable.disposed) return;BOOL stop = NO; // 创建新的信号了! id signal = bindingBlock(x, &stop); @autoreleasepool { if (signal != nil) addSignal(signal); if (signal == nil || stop) { [selfDisposable dispose]; completeSignal(selfDisposable); } } } error:^(NSError *error) { [compoundDisposable dispose]; [subscriber sendError:error]; } completed:^{ @autoreleasepool { completeSignal(selfDisposable); } }]; selfDisposable.disposable = bindingDisposable;
}
return compoundDisposable;
}] setNameWithFormat:@"[%@] -bind:", self.name];
}
-
3、RACStream的私有方法
用在RacSignal的 CombineLatest方法, RACSequence没有此操作
-
(__kindof RACStream *)join:(id<NSFastEnumeration>)streams block:(RACStream * (^)(id, id))block {
RACStream *current = nil;// Creates streams of successively larger tuples by combining the input
// streams one-by-one.
for (RACStream *stream in streams) {
// For the first stream, just wrap its values in a RACTuple. That way,
// if only one stream is given, the result is still a stream of tuples.
if (current == nil) {
current = [stream map:^(id x) {
return RACTuplePack(x);
}];continue; } current = block(current, stream);
}
if (current == nil) return [self empty];
return [current map:^(RACTuple *xs) {
// Right now, each value is contained in its own tuple, sorta like:
//
// (((1), 2), 3)
//
// We need to unwrap all the layers and create a tuple out of the result.
NSMutableArray *values = [[NSMutableArray alloc] init];while (xs != nil) { [values insertObject:xs.last ?: RACTupleNil.tupleNil atIndex:0]; xs = (xs.count > 1 ? xs.first : nil); } return [RACTuple tupleWithObjectsFromArray:values];
}];
}
网友评论