Rac迭代

作者: 我是小胡胡分胡 | 来源:发表于2018-01-25 10:03 被阅读8次

    Rac迭代、绑定、Join
    RACStream、RACSignal、RACSequence

    1、RACSequence队列

    typedef struct {
    unsigned long state;
    id __unsafe_unretained _Nullable * _Nullable itemsPtr;
    unsigned long * _Nullable mutationsPtr;
    unsigned long extra[5];
    } NSFastEnumerationState;

    @protocol NSFastEnumeration

    //转换成C的数组

    • (NSUInteger)countByEnumeratingWithState:(NSFastEnumerationState *)state objects:(id __unsafe_unretained _Nullable [_Nonnull])buffer count:(NSUInteger)len;

    @end

    RACStringSequence *sequence2=[@"12345" rac_sequence];
    

    // head/tail countByEnumeratingWithState方式迭代
    for (NSString *str in sequence2) {
    NSLog(@"%@", str);
    }
    // 用nextObject的方式迭代

    // head/ tail / nextObject

    NSEnumerator *enumerator=sequence2.objectEnumerator;
    NSString *next = enumerator.nextObject;
    while(next!=nil) {
        NSLog(@"%@", next);
        next = enumerator.nextObject;
    }
    

    // A block used to evaluate head. This should be set to nil after _head has been
    // initialized.
    //
    // This is marked strong instead of copy because of some bizarre block
    // copying bug. See https://github.com/ReactiveCocoa/ReactiveCocoa/pull/506.
    //
    // The signature of this block varies based on the value of hasDependency:
    //
    // - If YES, this block is of type id (^)(id).
    // - If NO, this block is of type id (^)(void).
    //
    // This property should only be accessed while synchronized on self.
    @property (nonatomic, strong) id headBlock;

    id (^block1) (void ) ;

    id (^block2) (id ) ;

    • (RACSequence *)sequenceWithHeadBlock:(id (^)(void))headBlock tailBlock:(RACSequence<id> *(^)(void))tailBlock {
      NSCParameterAssert(headBlock != nil);

      RACDynamicSequence *seq = [[RACDynamicSequence alloc] init];
      seq.headBlock = [headBlock copy];
      seq.tailBlock = [tailBlock copy];
      seq.hasDependency = NO;
      return seq;
      }

    • (RACSequence *)sequenceWithLazyDependency:(id (^)(void))dependencyBlock headBlock:(id (^)(id dependency))headBlock tailBlock:(RACSequence *(^)(id dependency))tailBlock {
      NSCParameterAssert(dependencyBlock != nil);
      NSCParameterAssert(headBlock != nil);

      RACDynamicSequence *seq = [[RACDynamicSequence alloc] init];
      seq.headBlock = [headBlock copy];
      seq.tailBlock = [tailBlock copy];
      seq.dependencyBlock = [dependencyBlock copy];
      seq.hasDependency = YES;
      return seq;
      }

    • (RACSequence *)bind:(RACSequenceBindBlock (^)(void))block {
      RACSequenceBindBlock bindBlock = block();
      return [[self bind:bindBlock passingThroughValuesFromSequence:nil] setNameWithFormat:@"[%@] -bind:", self.name];
      }
    • (RACSequence *)bind:(RACSequenceBindBlock)bindBlock passingThroughValuesFromSequence:(RACSequence *)passthroughSequence {
      // Store values calculated in the dependency here instead, avoiding any kind
      // of temporary collection and boxing.
      //
      // This relies on the implementation of RACDynamicSequence synchronizing
      // access to its head, tail, and dependency, and we're only doing it because
      // we really need the performance.
      __block RACSequence *valuesSeq = self;
      __block RACSequence *current = passthroughSequence;
      __block BOOL stop = NO;

      RACSequence *sequence = [RACDynamicSequence sequenceWithLazyDependency:^ id {
      // 这里 会控制 过滤掉空的
      // 如果这里是RACEmptySequence
      while (current.head == nil) {
      if (stop) return nil;

            // We've exhausted the current sequence, create a sequence from the
            // next value.
            id value = valuesSeq.head;
      
            if (value == nil) {
                // We've exhausted all the sequences.
                stop = YES;
                return nil;
            }
      
            current = (id)bindBlock(value, &stop);
            if (current == nil) {
                stop = YES;
                return nil;
            }
      
            // valuesSeq.tail :  这里有一个迭代
        
            valuesSeq = valuesSeq.tail;
        }
      
      
      
        NSCAssert([current isKindOfClass:RACSequence.class]
      

    , @"-bind: block returned an object that is not a sequence: %@", current);
    return nil;
    } headBlock:^(id _) {
    return current.head;
    } tailBlock:^ id (id _) {
    if (stop) return nil;
    // current.tail 这里有一个迭代
    //如果这里是RACEmptySequence
    return [valuesSeq bind:bindBlock passingThroughValuesFromSequence:current.tail];
    }];

    sequence.name = self.name;
    return sequence;
    

    }

    • (RACSequence *)bind:(RACStreamBindBlock)bindBlock passingThroughValuesFromSequence:(RACSequence *)passthroughSequence {
      return passthroughSequence ?: self;
      }

    2、RACSignal

    • (RACSignal *)bind:(RACSignalBindBlock (^)(void))block {
      NSCParameterAssert(block != NULL);

      /*

      • -bind: should:

        1. Subscribe to the original signal of values.
        1. Any time the original signal sends a value, transform it using the binding block.
        1. If the binding block returns a signal, subscribe to it, and pass all of its values through to the subscriber as they're received.
        1. If the binding block asks the bind to terminate, complete the original signal.
        1. When all signals complete, send completed to the subscriber.
      • If any signal sends an error at any point, send that to the subscriber.
        */
        // 创建新的信号C
        return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
        RACSignalBindBlock bindingBlock = block();

        __block volatile int32_t signalCount = 1; // indicates self

        RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];

        void (^completeSignal)(RACDisposable *) = ^(RACDisposable *finishedDisposable) {
        if (OSAtomicDecrement32Barrier(&signalCount) == 0) {
        [subscriber sendCompleted];
        [compoundDisposable dispose];
        } else {
        [compoundDisposable removeDisposable:finishedDisposable];
        }
        };

        void (^addSignal)(RACSignal *) = ^(RACSignal *signal) {
        OSAtomicIncrement32Barrier(&signalCount);

         RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
         [compoundDisposable addDisposable:selfDisposable];
                 // B : 对新的信号进行订阅 ; 触发新信号产生消息;
         RACDisposable *disposable = [signal subscribeNext:^(id x) {
        
                 // 给C发送消息;   A-〉传递给B-〉传递给C
             [subscriber sendNext:x];
         } error:^(NSError *error) {
             [compoundDisposable dispose];
             [subscriber sendError:error];
         } completed:^{
             @autoreleasepool {
                 completeSignal(selfDisposable);
             }
         }];
        
         selfDisposable.disposable = disposable;
        

        };

        @autoreleasepool {
        RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
        [compoundDisposable addDisposable:selfDisposable];
        // A:对原来的信号进行订阅;触发原来信号产生消息
        RACDisposable *bindingDisposable = [self subscribeNext:^(id x) {
        // Manually check disposal to handle synchronous errors.
        if (compoundDisposable.disposed) return;

             BOOL stop = NO;
        
                 // 创建新的信号了!
             id signal = bindingBlock(x, &stop);
        
             @autoreleasepool {
                 if (signal != nil) addSignal(signal);
                 if (signal == nil || stop) {
                     [selfDisposable dispose];
                     completeSignal(selfDisposable);
                 }
             }
         } error:^(NSError *error) {
             [compoundDisposable dispose];
             [subscriber sendError:error];
         } completed:^{
             @autoreleasepool {
                 completeSignal(selfDisposable);
             }
         }];
        
         selfDisposable.disposable = bindingDisposable;
        

        }

        return compoundDisposable;
        }] setNameWithFormat:@"[%@] -bind:", self.name];
        }

    3、RACStream的私有方法

    用在RacSignal的 CombineLatest方法, RACSequence没有此操作

    • (__kindof RACStream *)join:(id<NSFastEnumeration>)streams block:(RACStream * (^)(id, id))block {
      RACStream *current = nil;

      // Creates streams of successively larger tuples by combining the input
      // streams one-by-one.
      for (RACStream *stream in streams) {
      // For the first stream, just wrap its values in a RACTuple. That way,
      // if only one stream is given, the result is still a stream of tuples.
      if (current == nil) {
      current = [stream map:^(id x) {
      return RACTuplePack(x);
      }];

            continue;
        }
      
        current = block(current, stream);
      

      }

      if (current == nil) return [self empty];

      return [current map:^(RACTuple *xs) {
      // Right now, each value is contained in its own tuple, sorta like:
      //
      // (((1), 2), 3)
      //
      // We need to unwrap all the layers and create a tuple out of the result.
      NSMutableArray *values = [[NSMutableArray alloc] init];

        while (xs != nil) {
            [values insertObject:xs.last ?: RACTupleNil.tupleNil atIndex:0];
            xs = (xs.count > 1 ? xs.first : nil);
        }
      
        return [RACTuple tupleWithObjectsFromArray:values];
      

      }];
      }

    相关文章

      网友评论

          本文标题:Rac迭代

          本文链接:https://www.haomeiwen.com/subject/srwmaxtx.html