美文网首页
ReactiveCocoa 中 RACSignal 所有变换操作

ReactiveCocoa 中 RACSignal 所有变换操作

作者: Karos_凯 | 来源:发表于2017-02-08 16:55 被阅读46次

    前言

    紧接着上篇的源码实现分析,继续分析RACSignal的变换操作的底层实现。

    目录

    1.过滤操作
    2.组合操作

    一. 过滤操作

    过滤操作也属于一种变换,根据过滤条件,过滤出符合条件的值。变换出来的新的信号是原信号的一个子集。

    1. filter: (在父类RACStream中定义的)

    这个filter:操作在any:的实现中用到过了。

    - (instancetype)filter:(BOOL (^)(id value))block {
        NSCParameterAssert(block != nil);
    
        Class class = self.class;
    
        return [[self flattenMap:^ id (id value) {  
            if (block(value)) {
                return [class return:value];
            } else {
                return class.empty;
            }
        }] setNameWithFormat:@"[%@] -filter:", self.name];
    }
    

    filter:中传入一个闭包,是用筛选的条件。如果满足筛选条件的即返回原信号的值,否则原信号的值被“吞”掉,返回空的信号。这个变换主要是用flattenMap的。

    1. ignoreValues
    - (RACSignal *)ignoreValues {
        return [[self filter:^(id _) {
            return NO;
        }] setNameWithFormat:@"[%@] -ignoreValues", self.name];
    }
    

    由上面filter的实现,这里把筛选判断条件永远的传入NO,那么原信号的值都会被变换成empty信号,故变换之后的信号为空信号。

    1. ignore: (在父类RACStream中定义的)
    - (instancetype)ignore:(id)value {
        return [[self filter:^ BOOL (id innerValue) {
            return innerValue != value && ![innerValue isEqual:value];
        }] setNameWithFormat:@"[%@] -ignore: %@", self.name, [value rac_description]];
    }
    

    ignore:的实现还是由filter:实现的。传入的筛选判断条件是一个值,当原信号发送的值中是这个值的时候,就替换成空信号。

    1. distinctUntilChanged (在父类RACStream中定义的)
    - (instancetype)distinctUntilChanged {
        Class class = self.class;
    
        return [[self bind:^{
            __block id lastValue = nil;
            __block BOOL initial = YES;
    
            return ^(id x, BOOL *stop) {
                if (!initial && (lastValue == x || [x isEqual:lastValue])) return [class empty];
    
                initial = NO;
                lastValue = x;
                return [class return:x];
            };
        }] setNameWithFormat:@"[%@] -distinctUntilChanged", self.name];
    }
    

    distinctUntilChanged的实现是用bind来完成的。每次变换中都记录一下原信号上一次发送过来的值,并与这一次进行比较,如果是相同的值,就“吞”掉,返回empty信号。只有和原信号上一次发送的值不同,变换后的新信号才把这个值发送出来。

    关于distinctUntilChanged,这里关注的是两两信号之间的值是否不同,有时候我们可能需要一个类似于NSSet的信号集,distinctUntilChanged就无法满足了。在ReactiveCocoa 2.5的这个版本也并没有向我们提供distinct的变换函数。

    我们可以自己实现类似的变换。实现思路也不难,可以把之前每次发送过来的信号都用数组存起来,新来的信号都去数组里面查找一遍,如果找不到,就把这个值发送出去,如果找到了,就返回empty信号。效果如上图。

    1. take: (在父类RACStream中定义的)
    - (instancetype)take:(NSUInteger)count {
        Class class = self.class;
    
        if (count == 0) return class.empty;
    
        return [[self bind:^{
            __block NSUInteger taken = 0;
    
            return ^ id (id value, BOOL *stop) {
                if (taken < count) {
                    ++taken;
                    if (taken == count) *stop = YES;
                    return [class return:value];
                } else {
                    return nil;
                }
            };
        }] setNameWithFormat:@"[%@] -take: %lu", self.name, (unsigned long)count];
    }
    

    take:实现也非常简单,借助bind函数来实现的。入参的count是原信号取值的个数。在bind的闭包中,taken计数从0开始取原信号的值,当taken取到count个数的时候,就停止取值。

    在take:的基础上我们还可以继续改造出新的变换方式。比如说,想取原信号中执行的第几个值。类似于elementAt的操作。这个操作在ReactiveCocoa 2.5的这个版本也并没有直接向我们提供出来。

    其实实现很简单,只需要判断taken是否等于我们要取的那个位置就可以了,等于的时候把原信号的值发送出来,并*stop = YES。

    // 我自己增加实现的方法

    - (instancetype)elementAt:(NSUInteger)index {
        Class class = self.class;
    
        return [[self bind:^{
            __block NSUInteger taken = 0;
    
            return ^ id (id value, BOOL *stop) {
                if (index == 0) {
                    *stop = YES;
                    return [class return:value];
                }
                if (taken == index) {
                    *stop = YES;
                    return [class return:value];
                } else if (taken < index){
                    taken ++;
                    return [class empty];
                }else {
                    return nil;
                }
            };
        }] setNameWithFormat:@"[%@] -elementAt: %lu", self.name, (unsigned long)index];
    }
    
    1. takeLast:
    - (RACSignal *)takeLast:(NSUInteger)count {
        return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
            NSMutableArray *valuesTaken = [NSMutableArray arrayWithCapacity:count];
            return [self subscribeNext:^(id x) {
                [valuesTaken addObject:x ? : RACTupleNil.tupleNil];
    
                while (valuesTaken.count > count) {
                    [valuesTaken removeObjectAtIndex:0];
                }
            } error:^(NSError *error) {
                [subscriber sendError:error];
            } completed:^{
                for (id value in valuesTaken) {
                    [subscriber sendNext:value == RACTupleNil.tupleNil ? nil : value];
                }
    
                [subscriber sendCompleted];
            }];
        }] setNameWithFormat:@"[%@] -takeLast: %lu", self.name, (unsigned long)count];
    }
    

    takeLast:的实现也是按照套路来。先创建一个新信号,return的时候订阅原信号。在函数内部用一个valuesTaken来保存原信号发送过来的值,原信号发多少,就存多少,直到个数溢出入参给定的count,就溢出数组第0位。这样能保证数组里面始终都装着最后count个原信号的值。

    当原信号发送completed信号的时候,把数组里面存的值都sendNext出去。这里要注意的也是该变换发送信号的时机。如果原信号一直没有completed,那么takeLast:就一直没法发出任何信号来。

    1. takeUntilBlock: (在父类RACStream中定义的)
    - (instancetype)takeUntilBlock:(BOOL (^)(id x))predicate {
        NSCParameterAssert(predicate != nil);
    
        Class class = self.class;
    
        return [[self bind:^{
            return ^ id (id value, BOOL *stop) {
                if (predicate(value)) return nil;
    
                return [class return:value];
            };
        }] setNameWithFormat:@"[%@] -takeUntilBlock:", self.name];
    }
    

    takeUntilBlock:是根据传入的predicate闭包作为筛选条件的。一旦predicate( )闭包满足条件,那么新信号停止发送新信号,因为它被置为nil了。和函数名的意思是一样的,take原信号的值,Until直到闭包满足条件。

    1. takeWhileBlock: (在父类RACStream中定义的)
    - (instancetype)takeWhileBlock:(BOOL (^)(id x))predicate {
        NSCParameterAssert(predicate != nil);
    
        return [[self takeUntilBlock:^ BOOL (id x) {
            return !predicate(x);
        }] setNameWithFormat:@"[%@] -takeWhileBlock:", self.name];
    }
    

    takeWhileBlock:的信号集是takeUntilBlock:的信号集的补集。全集是原信号。takeWhileBlock:底层还是调用takeUntilBlock:,只不过判断条件的是不满足predicate( )闭包的集合。

    1. takeUntil:
    - (RACSignal *)takeUntil:(RACSignal *)signalTrigger {
        return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
            RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
            void (^triggerCompletion)(void) = ^{
                [disposable dispose];
                [subscriber sendCompleted];
            };
    
            RACDisposable *triggerDisposable = [signalTrigger subscribeNext:^(id _) {
                triggerCompletion();
            } completed:^{
                triggerCompletion();
            }];
    
            [disposable addDisposable:triggerDisposable];
    
            if (!disposable.disposed) {
                RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
                    [subscriber sendNext:x];
                } error:^(NSError *error) {
                    [subscriber sendError:error];
                } completed:^{
                    [disposable dispose];
                    [subscriber sendCompleted];
                }];
    
                [disposable addDisposable:selfDisposable];
            }
    
            return disposable;
        }] setNameWithFormat:@"[%@] -takeUntil: %@", self.name, signalTrigger];
    }
    

    takeUntil:的实现也是“经典套路”——return一个新信号,在新信号中订阅原信号。入参是一个信号signalTrigger,这个信号是一个Trigger。一旦signalTrigger发出第一个信号,就会触发triggerCompletion( )闭包,在这个闭包中,会调用triggerCompletion( )闭包。

      void (^triggerCompletion)(void) = ^{
       [disposable dispose];
       [subscriber sendCompleted];
      };
    

    一旦调用了triggerCompletion( )闭包,就会把原信号取消订阅,并给变换的新的信号订阅者sendCompleted。

    如果入参signalTrigger一直没有sendNext,那么原信号就会一直sendNext:。

    1. takeUntilReplacement:
    - (RACSignal *)takeUntilReplacement:(RACSignal *)replacement {
        return [RACSignal createSignal:^(id<RACSubscriber> subscriber) {
            RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
    
            RACDisposable *replacementDisposable = [replacement subscribeNext:^(id x) {
                [selfDisposable dispose];
                [subscriber sendNext:x];
            } error:^(NSError *error) {
                [selfDisposable dispose];
                [subscriber sendError:error];
            } completed:^{
                [selfDisposable dispose];
                [subscriber sendCompleted];
            }];
    
            if (!selfDisposable.disposed) {
                selfDisposable.disposable = [[self
                                              concat:[RACSignal never]]
                                             subscribe:subscriber];
            }
    
            return [RACDisposable disposableWithBlock:^{
                [selfDisposable dispose];
                [replacementDisposable dispose];
            }];
        }];
    }
    

    原信号concat:了一个[RACSignal never]信号,这样原信号就一直不会disposed,会一直等待replacement信号的到来。
    控制selfDisposable是否被dispose,控制权来自于入参的replacement信号,一旦replacement信号sendNext,那么原信号就会取消订阅,接下来的事情就会交给replacement信号了。
    变换后的新信号sendNext,sendError,sendCompleted全部都由replacement信号来发送,最终新信号完成的时刻也是replacement信号完成的时刻。

    1. skip: (在父类RACStream中定义的)
    - (instancetype)skip:(NSUInteger)skipCount {
        Class class = self.class;
    
        return [[self bind:^{
            __block NSUInteger skipped = 0;
    
            return ^(id value, BOOL *stop) {
                if (skipped >= skipCount) return [class return:value];
    
                skipped++;
                return class.empty;
            };
        }] setNameWithFormat:@"[%@] -skip: %lu", self.name, (unsigned long)skipCount];
    }
    

    skip:信号集和take:信号集是补集关系,全集是原信号。take:是取原信号的前count个信号,而skip:是从原信号第count + 1位开始取信号。

    skipped是一个游标,每次原信号发送一个值,就比较它和入参skipCount的大小。如果不比skipCount大,说明还需要跳过,所以就返回empty信号,否则就把原信号的值发送出来。

    通过类比take系列方法,可以发现在ReactiveCocoa 2.5的这个版本也并没有向我们提供skipLast:的变换函数。这个变换函数的实现过程也不难,我们可以类比takeLast:来实现。

    实现的思路也不难,原信号每次发送过来的值,都用一个数组存储起来。skipLast:是想去掉原信号最末尾的count个信号。

    我们先来分析一下:假设原信号有n个信号,从0 - (n-1),去掉最后的count个,前面还剩n - count个信号。那么从 原信号的第 count + 1位的信号开始发送,到原信号结束,这样中间就正好是发送了 n - count 个信号。

    分析清楚后,代码就很容易了:

    // 我自己增加实现的方法
    - (RACSignal *)skipLast:(NSUInteger)count {
        return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
            NSMutableArray *valuesTaken = [NSMutableArray arrayWithCapacity:count];
            return [self subscribeNext:^(id x) {
                [valuesTaken addObject:x ? : RACTupleNil.tupleNil];
    
                while (valuesTaken.count > count) {
                    [subscriber sendNext:valuesTaken[0] == RACTupleNil.tupleNil ? nil : valuesTaken[0]];
                    [valuesTaken removeObjectAtIndex:0];
                }
            } error:^(NSError *error) {
                [subscriber sendError:error];
            } completed:^{            
                [subscriber sendCompleted];
            }];
        }] setNameWithFormat:@"[%@] -skipLast: %lu", self.name, (unsigned long)count];
    }
    

    原信号每发送过来一个信号就存入数组,当数组里面的个数大于count的时候,就是需要我们发送信号的时候,这个时候每次都把数组里面第0位发送出去即可,数组维护了一个FIFO的队列。这样就实现了skipLast:的效果了。

    1. skipUntilBlock: (在父类RACStream中定义的)
    - (instancetype)skipUntilBlock:(BOOL (^)(id x))predicate {
        NSCParameterAssert(predicate != nil);
    
        Class class = self.class;
    
        return [[self bind:^{
            __block BOOL skipping = YES;
    
            return ^ id (id value, BOOL *stop) {
                if (skipping) {
                    if (predicate(value)) {
                        skipping = NO;
                    } else {
                        return class.empty;
                    }
                }
    
                return [class return:value];
            };
        }] setNameWithFormat:@"[%@] -skipUntilBlock:", self.name];
    }
    

    skipUntilBlock: 的实现可以类比takeUntilBlock: 的实现。

    skipUntilBlock: 是根据传入的predicate闭包作为筛选条件的。一旦predicate( )闭包满足条件,那么skipping = NO。skipping为NO,以后原信号发送的每个值都原封不动的发送出去。predicate( )闭包不满足条件的时候,即会一直skip原信号的值。和函数名的意思是一样的,skip原信号的值,Until直到闭包满足条件,就不再skip了。

    1. skipWhileBlock: (在父类RACStream中定义的)
    - (instancetype)skipWhileBlock:(BOOL (^)(id x))predicate {
        NSCParameterAssert(predicate != nil);
    
        return [[self skipUntilBlock:^ BOOL (id x) {
            return !predicate(x);
        }] setNameWithFormat:@"[%@] -skipWhileBlock:", self.name];
    }
    

    skipWhileBlock:的信号集是skipUntilBlock:的信号集的补集。全集是原信号。skipWhileBlock:底层还是调用skipUntilBlock:,只不过判断条件的是不满足predicate( )闭包的集合。

    到这里skip系列方法就结束了,对比take系列的方法,少了2个方法,在ReactiveCocoa 2.5的这个版本中 takeUntil: 和 takeUntilReplacement:这两个方法没有与之对应的skip方法。

    // 我自己增加实现的方法
    - (RACSignal *)skipUntil:(RACSignal *)signalTrigger {
        return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
            RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
    
            __block BOOL sendTrigger = NO;
    
            void (^triggerCompletion)(void) = ^{
                sendTrigger = YES;
            };
    
            RACDisposable *triggerDisposable = [signalTrigger subscribeNext:^(id _) {
                triggerCompletion();
            } completed:^{
                triggerCompletion();
            }];
    
            [disposable addDisposable:triggerDisposable];
    
            if (!disposable.disposed) {
                RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
    
                    if (sendTrigger) {
                        [subscriber sendNext:x];
                    }
    
                } error:^(NSError *error) {
                    [subscriber sendError:error];
                } completed:^{
                    [disposable dispose];
                    [subscriber sendCompleted];
                }];
    
                [disposable addDisposable:selfDisposable];
            }
    
            return disposable;
        }] setNameWithFormat:@"[%@] -skipUntil: %@", self.name, signalTrigger];
    }
    

    skipUntil实现方法也很简单,当入参的signalTrigger开发发送信号的时候,就让原信号sendNext把值发送出来,否则就把原信号的值“吞”掉。

    skipUntilReplacement:就没什么意义了,把原信号经过skipUntilReplacement:变换之后得到的新的信号就是Replacement信号。所以说这个操作也就没意义了。

    1. groupBy:transform:
    - (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock transform:(id (^)(id object))transformBlock {
        NSCParameterAssert(keyBlock != NULL);
    
        return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
            NSMutableDictionary *groups = [NSMutableDictionary dictionary];
            NSMutableArray *orderedGroups = [NSMutableArray array];
    
            return [self subscribeNext:^(id x) {
                id<NSCopying> key = keyBlock(x);
                RACGroupedSignal *groupSubject = nil;
                @synchronized(groups) {
                    groupSubject = groups[key];
                    if (groupSubject == nil) {
                        groupSubject = [RACGroupedSignal signalWithKey:key];
                        groups[key] = groupSubject;
                        [orderedGroups addObject:groupSubject];
                        [subscriber sendNext:groupSubject];
                    }
                }
    
                [groupSubject sendNext:transformBlock != NULL ? transformBlock(x) : x];
            } error:^(NSError *error) {
                [subscriber sendError:error];
    
                [orderedGroups makeObjectsPerformSelector:@selector(sendError:) withObject:error];
            } completed:^{
                [subscriber sendCompleted];
    
                [orderedGroups makeObjectsPerformSelector:@selector(sendCompleted)];
            }];
        }] setNameWithFormat:@"[%@] -groupBy:transform:", self.name];
    }
    

    看groupBy:transform:的实现,依旧是老“套路”。return 一个新的RACSignal,在新的信号里面订阅原信号。

    groupBy:transform:的重点就在subscribeNext中了。

    首先解释一下两个入参。两个入参都是闭包,keyBlock返回值是要作为字典的key,transformBlock的返回值是对原信号发出来的值x进行变换。
    先创建一个NSMutableDictionary字典groups,和NSMutableArray数组orderedGroups。
    从字典里面取出key对应的value,这里的key对应着keyBlock返回值。value的值是一个RACGroupedSignal信号。如果找不到对应的key值,就新建一个RACGroupedSignal信号,并存入字典对应的key值,与之对应。
    新变换之后的信号,订阅之后,RACGroupedSignal进行sendNext,这是一个信号,如果transformBlock不为空,就发送transformBlock变换之后的值。
    sendError和sendCompleted都要分别对数组orderedGroups里面每个RACGroupedSignal都要进行sendError或者sendCompleted。因为要对数组里面每个信号都执行一个操作,所以需要调用makeObjectsPerformSelector:withObject:方法。
    经过groupBy:transform:变换之后,原信号会根据keyBlock进行分组。

    写出测试代码,来看看平时应该怎么用。

        RACSignal *signalA = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber)
                             {
                                 [subscriber sendNext:@1];
                                 [subscriber sendNext:@2];
                                 [subscriber sendNext:@3];
                                 [subscriber sendNext:@4];
                                 [subscriber sendNext:@5];
                                 [subscriber sendCompleted];
                                 return [RACDisposable disposableWithBlock:^{
                                     NSLog(@"signal dispose");
                                 }];
                             }];
    
        RACSignal *signalGroup = [signalA groupBy:^id<NSCopying>(NSNumber *object) {
            return object.integerValue > 3 ? @"good" : @"bad";
        } transform:^id(NSNumber * object) {
            return @(object.integerValue * 10);
        }];
    
        [[[signalGroup filter:^BOOL(RACGroupedSignal *value) {
            return [(NSString *)value.key isEqualToString:@"good"];
        }] flatten]subscribeNext:^(id x) {
            NSLog(@"subscribeNext: %@", x);
        }];
    

    假设原信号发送的1,2,3,4,5是代表的成绩的5个等级。当成绩大于3的都算“good”,小于3的都算“bad”。

    signalGroup是原信号signalA经过groupBy:transform:得到的新的信号,这个信号是一个高阶的信号,因为它里面并不是直接装的是值,signalGroup这个信号里面装的还是信号。signalGroup里面有两个分组,分别是“good”分组和“bad”分组。

    想从中取出这两个分组里面的值,需要进行一次filter:筛选。筛选之后得到对应分组的高阶信号。这时还要再进行一个flatten操作,把高阶信号变成低阶信号,再次订阅才能取到其中的值。

    订阅新信号的值,输出如下:

    subscribeNext: 40
    subscribeNext: 50
    

    关于flatten的实现:

    - (instancetype)flatten {
        __weak RACStream *stream __attribute__((unused)) = self;
        return [[self flattenMap:^(id value) {
            return value;
        }] setNameWithFormat:@"[%@] -flatten", self.name];
    }
    

    flatten操作就是调用了flattenMap:把值传进去了。

    - (instancetype)flattenMap:(RACStream * (^)(id value))block {
        Class class = self.class;
    
        return [[self bind:^{
            return ^(id value, BOOL *stop) {
                id stream = block(value) ?: [class empty];
                NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);
    
                return stream;
            };
        }] setNameWithFormat:@"[%@] -flattenMap:", self.name];
    }
    

    flatten是把高阶信号变换成低阶信号的常用操作。flattenMap:具体实现上篇文章分析过了,这里不再赘述。

    1. groupBy:
    - (RACSignal *)groupBy:(id<NSCopying> (^)(id object))keyBlock {
        return [[self groupBy:keyBlock transform:nil] setNameWithFormat:@"[%@] -groupBy:", self.name];
    }
    groupBy:操作就是groupBy:transform:的缩减版,transform传入的为nil。
    
    关于groupBy:可以干的事情很多,可以进行很高级的分组操作。这里可以举一个例子:
    
    
    // 简单算法题,分离数组中的相同的元素,如果元素个数大于2,则组成一个新的数组,结果得到多个包含相同元素的数组,
    // 例如[1,2,3,1,2,3]分离成[1,1],[2,2],[3,3]
    RACSignal *signal = @[@1, @2, @3, @4,@2,@3,@3,@4,@4,@4].rac_sequence.signal;
    
      NSArray * array = [[[[signal groupBy:^NSString *(NSNumber *object) {
          return [NSString stringWithFormat:@"%@",object];
      }] map:^id(RACGroupedSignal *value) {
          return [value sequence];
      }] sequence] map:^id(RACSignalSequence * value) {
          return value.array;
      }].array;
    
    for (NSNumber * num in array) {
        NSLog(@"最后的数组%@",num);
    }
    

    // 最后输出 [1,2,3,4,2,3,3,4,4,4]变成[1],[2,2],[3,3,3],[4,4,4,4]

    二. 组合操作
    
    
    1. startWith: (在父类RACStream中定义的)
    
    
    • (instancetype)startWith:(id)value {

      return [[[self.class return:value]
      concat:self]
      setNameWithFormat:@"[%@] -startWith: %@", self.name, [value rac_description]];
      }

    startWith:的实现很简单,就是先构造一个只发送一个value的信号,然后这个信号发送完毕之后接上原信号。得到的新的信号就是在原信号前面新加了一个值。
    
    2. concat: (在父类RACStream中定义的)
    
    这里说的concat:是在父类RACStream中定义的。
    
    
    • (instancetype)concat:(RACStream *)stream {
      return nil;
      }
    父类中定义的这个方法就返回一个nil,具体的实现还要子类去重写。
    
    3. concat: (在父类RACStream中定义的)
    
    
    • (instancetype)concat:(id<NSFastEnumeration>)streams {
      RACStream *result = self.empty;
      for (RACStream *stream in streams) {
      result = [result concat:stream];
      }

      return [result setNameWithFormat:@"+concat: %@", streams];
      }

    这个concat:后面跟着一个数组,数组里面包含这很多信号,concat:依次把这些信号concat:连接串起来。
    
    4. merge:
    
    
    
    • (RACSignal *)merge:(id<NSFastEnumeration>)signals {
      NSMutableArray *copiedSignals = [[NSMutableArray alloc] init];
      for (RACSignal *signal in signals) {
      [copiedSignals addObject:signal];
      }

      return [[[RACSignal
      createSignal:^ RACDisposable * (id<RACSubscriber> subscriber) {
      for (RACSignal *signal in copiedSignals) {
      [subscriber sendNext:signal];
      }

                  [subscriber sendCompleted];
                  return nil;
              }]
             flatten]
            setNameWithFormat:@"+merge: %@", copiedSignals];
      

    }

    merge:后面跟一个数组。先会新建一个数组copiedSignals,把传入的信号都装到数组里。然后依次发送数组里面的信号。由于新信号也是一个高阶信号,因为sendNext会把各个信号都依次发送出去,所以需要flatten操作把这个信号转换成值发送出去。
    
    从上图上看,上下两个信号就像被拍扁了一样,就成了新信号的发送顺序。
    
    5. merge:
    
    
    
    • (RACSignal *)merge:(RACSignal *)signal {
      return [[RACSignal
      merge:@[ self, signal ]]
      setNameWithFormat:@"[%@] -merge: %@", self.name, signal];
      }
    merge:后面参数也可以跟一个信号,那么merge:就是合并这两个信号。具体实现和merge:多个信号是一样的原理。
    
    6. zip: (在父类RACStream中定义的)
    
    
    
    • (instancetype)zip:(id<NSFastEnumeration>)streams {
      return [[self join:streams block:^(RACStream *left, RACStream *right) {
      return [left zipWith:right];
      }] setNameWithFormat:@"+zip: %@", streams];
      }
    zip:后面可以跟一个数组,数组里面装的是各种信号流。
    
    它的实现是调用了join: block: 实现的。
    
    
    • (instancetype)join:(id<NSFastEnumeration>)streams block:(RACStream * (^)(id, id))block {
      RACStream *current = nil;
      // 第一步
      for (RACStream *stream in streams) {

        if (current == nil) {
            current = [stream map:^(id x) {
                return RACTuplePack(x);
            }];
      
            continue;
        }
      
        current = block(current, stream);
      

      }
      // 第二步
      if (current == nil) return [self empty];

      return [current map:^(RACTuple *xs) {

        NSMutableArray *values = [[NSMutableArray alloc] init];
        // 第三步
        while (xs != nil) {
            [values insertObject:xs.last ?: RACTupleNil.tupleNil atIndex:0];
            xs = (xs.count > 1 ? xs.first : nil);
        }
        // 第四步
        return [RACTuple tupleWithObjectsFromArray:values];
      

      }];
      }

    join: block: 的实现可以分为4步:
    
    依次打包各个信号流,把每个信号流都打包成元组RACTuple。首先第一个信号流打包成一个元组,这个元组里面就一个信号。接着把第一个元组和第二个信号执行block( )闭包里面的操作。传入的block( )闭包执行的是zipWith:的操作。这个操作是把两个信号“压”在一起。具体实现分析请看第一篇文章里面分析过的,这里就不再赘述了。得到第二个元组,里面装着是第一个元组和第二个信号。之后每次循环都执行类似的操作,再把第二个元组和第三个信号进行zipWith:操作,以此类推下去,直到所有的信号流都循环一遍。
    
    经过第一步的循环操作之后,还是nil,那么肯定就是空信号了,就返回empty信号。
    
    这一步是把之前第一步打包出来的结果,还原回原信号的过程。经过第一步的循环之后,current会是类似这个样子,(((1), 2), 3),第三步就是为了把这种多重元组解出来,每个信号流都依次按照顺序放在数组里。注意观察current的特点,最外层的元组,是一个值和一个元组,所以从最外层的元组开始,一层一层往里“剥”。while循环每次都取最外层元组的last,即那个单独的值,插入到数组的第0号位置,然后取出first即是里面一层的元组。然后依次循环。由于每次都插入到数组0号的位置,类似于链表的头插法,最终数组里面的顺序肯定也保证是原信号的顺序。
    
    第四步就是把还原成原信号的顺序的数组包装成元组,返回给map操作的闭包。
    
    
    • (instancetype)tupleWithObjectsFromArray:(NSArray *)array {
      return [self tupleWithObjectsFromArray:array convertNullsToNils:NO];
      }

    • (instancetype)tupleWithObjectsFromArray:(NSArray *)array convertNullsToNils:(BOOL)convert {
      RACTuple *tuple = [[self alloc] init];

      if (convert) {
      NSMutableArray *newArray = [NSMutableArray arrayWithCapacity:array.count];
      for (id object in array) {
      [newArray addObject:(object == NSNull.null ? RACTupleNil.tupleNil : object)];
      }
      tuple.backingArray = newArray;
      } else {
      tuple.backingArray = [array copy];
      }

      return tuple;
      }

    在转换过程中,入参convertNullsToNils的含义是,是否把数组里面的NSNull转换成RACTupleNil。
    
    这里转换传入的是NO,所以就是把数组原封不动的copy一份。
    
    测试代码:
    
    
    RACSignal *signalD = [RACSignal interval:3 onScheduler:[RACScheduler mainThreadScheduler] withLeeway:0];
    RACSignal *signalO = [RACSignal interval:1 onScheduler:[RACScheduler mainThreadScheduler] withLeeway:0];
    RACSignal *signalE = [RACSignal interval:4 onScheduler:[RACScheduler mainThreadScheduler] withLeeway:0];
    RACSignal *signalB = [RACStream zip:@[signalD,signalO,signalE]];
    
    [signalB subscribeNext:^(id x) {
        NSLog(@"最后接收到的值 = %@",x);
    }];
    
    打印输出:
    
    

    2016-11-29 13:07:57.349 最后接收到的值 = <RACTuple: 0x608000011440> (
    "2016-11-29 05:07:56 +0000",
    "2016-11-29 05:07:54 +0000",
    "2016-11-29 05:07:57 +0000"
    )

    2016-11-29 13:08:01.350 最后接收到的值 = <RACTuple: 0x608000010c60> (
    "2016-11-29 05:07:59 +0000",
    "2016-11-29 05:07:55 +0000",
    "2016-11-29 05:08:01 +0000"
    )

    2016-11-29 13:08:05.352 最后接收到的值 = <RACTuple: 0x60000001a350> (
    "2016-11-29 05:08:02 +0000",
    "2016-11-29 05:07:56 +0000",
    "2016-11-29 05:08:05 +0000"
    )

    最后输出的信号以时间最长的为主,最后接到的信号是一个元组,里面依次包含zip:数组里每个信号在一次“压”缩周期里面的值。
    
    7. zip: reduce: (在父类RACStream中定义的)
    
    
    • (instancetype)zip:(id<NSFastEnumeration>)streams reduce:(id (^)())reduceBlock {
      NSCParameterAssert(reduceBlock != nil);
      RACStream *result = [self zip:streams];
      if (reduceBlock != nil) result = [result reduceEach:reduceBlock];
      return [result setNameWithFormat:@"+zip: %@ reduce:", streams];
      }
    zip: reduce:是一个组合的方法。具体实现可以拆分成两部分,第一部分是先执行zip:,把数组里面的信号流依次都进行组合。这一过程的实现在上一个变换实现中分析过了。zip:完成之后,紧接着进行reduceEach:操作。
    
    这里有一个判断reduceBlock是否为nil的判断,这个判断是针对老版本的“历史遗留问题”。在ReactiveCocoa 2.5之前的版本,是允许reduceBlock传入nil,这里为了防止崩溃,所以加上了这个reduceBlock是否为nil的判断。
    
    
    • (instancetype)reduceEach:(id (^)())reduceBlock {
      NSCParameterAssert(reduceBlock != nil);

      __weak RACStream *stream attribute((unused)) = self;
      return [[self map:^(RACTuple *t) {
      NSCAssert([t isKindOfClass:RACTuple.class], @"Value from stream %@ is not a tuple: %@", stream, t);
      return [RACBlockTrampoline invokeBlock:reduceBlock withArguments:t];
      }] setNameWithFormat:@"[%@] -reduceEach:", self.name];
      }

    reduceEach:操作在上篇已经分析过了。它会动态的构造闭包,对原信号每个元组,执行reduceBlock( )闭包里面的方法。具体分析见上篇。一般用法如下:
    
    

    [RACStream zip:@[ stringSignal, intSignal ] reduce:^(NSString *string, NSNumber *number) {
    return [NSString stringWithFormat:@"%@: %@", string, number];
    }];

    8. zipWith: (在父类RACStream中定义的)
    
    
    • (instancetype)zipWith:(RACStream *)stream {
      return nil;
      }
    这个方法就是在父类的RACStream中定义了,具体实现还要看RACStream各个子类的实现。
    
    它就可以类比concat:在父类中的实现,也是直接返回一个nil。
    
    
    - (instancetype)concat:(RACStream *)stream { return nil;}
    在第一篇中分析了concat:和zipWith:在RACSignal子类中具体实现。忘记了具体实现的可以回去看看。
    
    9. combineLatestWith:
    
    
    • (RACSignal *)combineLatestWith:(RACSignal *)signal {
      NSCParameterAssert(signal != nil);

      return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
      RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];

        // 初始化第一个信号的一些标志变量
        __block id lastSelfValue = nil;
        __block BOOL selfCompleted = NO;
      
        // 初始化第二个信号的一些标志变量
        __block id lastOtherValue = nil;
        __block BOOL otherCompleted = NO;
      
        // 这里是一个判断是否sendNext的闭包
        void (^sendNext)(void) = ^{ };
      
        // 订阅第一个信号
        RACDisposable *selfDisposable = [self subscribeNext:^(id x) { }];
        [disposable addDisposable:selfDisposable];
      
        // 订阅第二个信号
        RACDisposable *otherDisposable = [signal subscribeNext:^(id x) { }];
        [disposable addDisposable:otherDisposable];
      
        return disposable;
      

      }] setNameWithFormat:@"[%@] -combineLatestWith: %@", self.name, signal];
      }

    大体实现思路比较简单,在新信号里面分别订阅原信号和入参signal信号。
    
    
    

    RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
    @synchronized (disposable) {
    lastSelfValue = x ?: RACTupleNil.tupleNil;
    sendNext();
    }
    } error:^(NSError *error) {
    [subscriber sendError:error];
    } completed:^{
    @synchronized (disposable) {
    selfCompleted = YES;
    if (otherCompleted) [subscriber sendCompleted];
    }
    }];

    先来看看原信号订阅的具体实现:
    
    在subscribeNext闭包中,记录下原信号最新发送的x值,并保存到lastSelfValue中。从此lastSelfValue变量每次都保存原信号发送过来的最新的值。然后再调用sendNext( )闭包。
    
    在completed闭包中,selfCompleted中记录下原信号发送完成。这是还要判断otherCompleted是否完成,即入参信号signal是否发送完成,只有两者都发送完成了,组合的新信号才能算全部发送完成。
    
    

    RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {
    @synchronized (disposable) {
    lastOtherValue = x ?: RACTupleNil.tupleNil;
    sendNext();
    }
    } error:^(NSError *error) {
    [subscriber sendError:error];
    } completed:^{
    @synchronized (disposable) {
    otherCompleted = YES;
    if (selfCompleted) [subscriber sendCompleted];
    }
    }];

    这是对入参信号signal的处理实现。和原信号的处理方式完全一致。现在重点就要看看sendNext( )闭包中都做了些什么。
    
    

    void (^sendNext)(void) = ^{
    @synchronized (disposable) {
    if (lastSelfValue == nil || lastOtherValue == nil) return;
    [subscriber sendNext:RACTuplePack(lastSelfValue, lastOtherValue)];
    }
    };

    在sendNext( )闭包中,如果lastSelfValue 或者 lastOtherValue 其中之一有一个为nil,就return,因为这个时候无法结合在一起。当两个信号都有值,那么就把这两个信号的最新的值打包成元组发送出来。
    
    
    可以看到,每个信号每发送出来一个新的值,都会去找另外一个信号上一个最新的值进行结合。
    
    这里可以对比一下类似的zip:操作
    
    
    zip:操作是会把新来的信号的值存起来,放在数组里,然后另外一个信号发送一个值过来就和数组第0位的值相互结合成新的元组信号发送出去,并分别移除数组里面第0位的两个值。zip:能保证每次结合的值都是唯一的,不会一个原信号的值被多次结合到新的元组信号中。但是combineLatestWith:是不能保证这一点的,在原信号或者另外一个信号新信号发送前,每次发送信号都会结合当前最新的信号,这里就会有反复结合的情况。
    
    10. combineLatest:
    
    
    • (RACSignal *)combineLatest:(id<NSFastEnumeration>)signals {
      return [[self join:signals block:^(RACSignal *left, RACSignal *right) {
      return [left combineLatestWith:right];
      }] setNameWithFormat:@"+combineLatest: %@", signals];
      }
    combineLatest:的实现就是把入参数组里面的每个信号都调用一次join: block:方法。传入的闭包是把两个信号combineLatestWith:一下。combineLatest:的实现就是2个操作的组合。具体实现上面也都分析过,这里不再赘述。
    
    11. combineLatest: reduce:
    
    
    • (RACSignal *)combineLatest:(id<NSFastEnumeration>)signals reduce:(id (^)())reduceBlock {
      NSCParameterAssert(reduceBlock != nil);
      RACSignal *result = [self combineLatest:signals];
      if (reduceBlock != nil) result = [result reduceEach:reduceBlock];
      return [result setNameWithFormat:@"+combineLatest: %@ reduce:", signals];
      }
    combineLatest: reduce: 的实现可以类比zip: reduce:的实现。
    
    具体实现可以拆分成两部分,第一部分是先执行combineLatest:,把数组里面的信号流依次都进行组合。这一过程的实现在上一个变换实现中分析过了。combineLatest:完成之后,紧接着进行reduceEach:操作。
    
    这里有一个判断reduceBlock是否为nil的判断,这个判断是针对老版本的“历史遗留问题”。在ReactiveCocoa 2.5之前的版本,是允许reduceBlock传入nil,这里为了防止崩溃,所以加上了这个reduceBlock是否为nil的判断。
    
    12. combinePreviousWithStart: reduce:(在父类RACStream中定义的)
    
    这个方法的实现也是多个变换操作组合在一起的。
    
    
    • (instancetype)combinePreviousWithStart:(id)start reduce:(id (^)(id previous, id next))reduceBlock {
      NSCParameterAssert(reduceBlock != NULL);
      return [[[self
      scanWithStart:RACTuplePack(start)
      reduce:^(RACTuple *previousTuple, id next) {
      id value = reduceBlock(previousTuple[0], next);
      return RACTuplePack(next, value);
      }]
      map:^(RACTuple *tuple) {
      return tuple[1];
      }]
      setNameWithFormat:@"[%@] -combinePreviousWithStart: %@ reduce:", self.name, [start rac_description]];
      }
    combinePreviousWithStart: reduce:的实现完全可以类比scanWithStart:reduce:的实现。举个例子来说明他们俩的不同。
    
    
      RACSequence *numbers = @[ @1, @2, @3, @4 ].rac_sequence;
    
      RACSignal *signalA = [numbers combinePreviousWithStart:@0 reduce:^(NSNumber *previous, NSNumber *next) {
          return @(previous.integerValue + next.integerValue);
      }].signal;
    
    RACSignal *signalB = [numbers scanWithStart:@0 reduce:^(NSNumber *previous, NSNumber *next) {
        return @(previous.integerValue + next.integerValue);
    }].signal;
    
    signalA输出如下:
    
    

    1
    3
    5
    7

    signalB输出如下:
    
    

    1
    3
    6
    10

    现在应该不同点应该很明显了。combinePreviousWithStart: reduce:实现的是两两之前的加和,而scanWithStart:reduce:实现的累加。
    
    为什么会这样呢,具体看看combinePreviousWithStart: reduce:的实现。
    
    虽然combinePreviousWithStart: reduce:也是调用了scanWithStart:reduce:,但是初始值是RACTuplePack(start)元组,聚合reduce的过程也有所不同:
    
    

    id value = reduceBlock(previousTuple[0], next);
    return RACTuplePack(next, value);

    依次调用reduceBlock( )闭包,传入previousTuple[0], next,这里reduceBlock( )闭包是进行累加的操作,所以就是把前一个元组的第0位加上后面新来的信号的值。得到的值拼成新的元组,新的元组由next和value值构成。
    
    如果打印出上述例子中combinePreviousWithStart: reduce:的加合过程中每个信号的值,如下:
    
    

    <RACTuple: 0x608000200010> (
    1,
    1
    )

    <RACTuple: 0x60000001fe70> (
    2,
    3
    )
    <RACTuple: 0x60000001fe90> (
    3,
    5
    )
    <RACTuple: 0x60000001feb0> (
    4,
    7
    )

    由于这样拆成元组之后,下次再进行操作的时候,还可以拿到前一个信号的值,这样就不会形成累加的效果。
    
    13. sample:
    
    
    • (RACSignal *)sample:(RACSignal *)sampler {
      NSCParameterAssert(sampler != nil);

      return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
      NSLock *lock = [[NSLock alloc] init];
      __block id lastValue;
      __block BOOL hasValue = NO;

        RACSerialDisposable *samplerDisposable = [[RACSerialDisposable alloc] init];
        RACDisposable *sourceDisposable = [self subscribeNext:^(id x) { // 暂时省略 }];
      
        samplerDisposable.disposable = [sampler subscribeNext:^(id _) { // 暂时省略 }];
      
        return [RACDisposable disposableWithBlock:^{
            [samplerDisposable dispose];
            [sourceDisposable dispose];
        }];
      

      }] setNameWithFormat:@"[%@] -sample: %@", self.name, sampler];
      }

    sample:内部实现也是对原信号和入参信号sampler分别进行订阅。具体实现就是这两个信号订阅内部都干了些什么。
    
    

    RACSerialDisposable *samplerDisposable = [[RACSerialDisposable alloc] init];
    RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
    [lock lock];
    hasValue = YES;
    lastValue = x;
    [lock unlock];
    } error:^(NSError *error) {
    [samplerDisposable dispose];
    [subscriber sendError:error];
    } completed:^{
    [samplerDisposable dispose];
    [subscriber sendCompleted];
    }];

    这是对原信号的操作,原信号的操作在subscribeNext中就记录了两个变量的值,hasValue记录原信号有值,lastValue记录了原信号的最新的值。这里加了一层NSLock锁进行保护。
    
    在发生error的时候,先把sampler信号取消订阅,然后再sendError:。当原信号完成的时候,同样是先把sampler信号取消订阅,然后再sendCompleted。
    
    

    samplerDisposable.disposable = [sampler subscribeNext:^(id _) {
    BOOL shouldSend = NO;
    id value;
    [lock lock];
    shouldSend = hasValue;
    value = lastValue;
    [lock unlock];

    if (shouldSend) {
        [subscriber sendNext:value];
    }
    

    } error:^(NSError *error) {
    [sourceDisposable dispose];
    [subscriber sendError:error];
    } completed:^{
    [sourceDisposable dispose];
    [subscriber sendCompleted];
    }];

    这是对入参信号sampler的操作。shouldSend默认值是NO,这个变量控制着是否sendNext:值。只有当原信号有值的时候,hasValue = YES,所以shouldSend = YES,这个时候才能发送原信号的值。这里我们并不关心入参信号sampler的值,从subscribeNext:^(id _)这里可以看出, _代表并不需要它的值。
    
    在发生error的时候,先把原信号取消订阅,然后再sendError:。当sampler信号完成的时候,同样是先把原信号取消订阅,然后再sendCompleted。
    
    
    经过sample:变换就会变成这个样子。只是把原信号的值都移动到了sampler信号发送信号的时刻,值还是和原信号的值一样。
    
    最后
    
    关于RACSignal的变换操作还剩下 冷热信号转换操作,高阶信号操作,下篇接着继续分析。最后请大家多多指教。
    
    转载自:https://gold.xitu.io/post/583e096461ff4b007edddbc8

    相关文章

      网友评论

          本文标题:ReactiveCocoa 中 RACSignal 所有变换操作

          本文链接:https://www.haomeiwen.com/subject/kefbittx.html