When subclassing RACStream, only the methods in the main @interface body need to be overridden.
以上是RACStream类的简要介绍中的一段话,大意为只有主类中声明的方法需要在子类重写。查看类实现文件发现主类中声明的方法全部都很厚道的 return nil ,因此我们在介绍以下方法时会使用其子类RACSignal做演示。
+ (instancetype)empty;
empty函数返回一个名为 +empty 的stream,这个stream会被立即结束,不做任何操作。
+ (instancetype)return:(id)value;
创建一个名为 +return: value 的stream,这个stream返回给定值后结束。
//value:值 stop:如果设置为YES则终止后续值
typedef RACStream * (^RACStreamBindBlock)(id value, BOOL *stop);
- (instancetype)bind:(RACStreamBindBlock (^)(void))block;
原始 stream 每产生一个 value 将 value 传入 RACStreamBindBlock 产生一个新的 stream 并订阅这个 stream 。 当 stop 被设置为 YES 时终止后续值发送。一旦任意一个由 RACStreamBindBlock 产生的 stream sendError 则原始 stream sendError,当所有由 RACStreamBindBlock 产生的 stream sendComplete 则原始 stream sendComplete。
例:
创建一个signal对象发送十次值,并且在其中随机发送@" "。我们要做的是过滤掉@" "并且在接收到@"5"时终止后续发送返回一个新的signal并且发送@"a",@"b"。
RACSignal * signal = [[RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
//发送10次值
for (int i = 0; i < 10; i ++) {
//随机发送 @" "
BOOL empty = arc4random_uniform(2);
if (!empty || i == 5) {
[subscriber sendNext:[NSString stringWithFormat:@"%zd",i]];
}
else {
[subscriber sendNext:@" "];
}
}
[subscriber sendCompleted];
return nil;
}] bind:^RACStreamBindBlock{
return ^id (NSString * value, BOOL * stop) {
if ([value isEqualToString:@" "]) { //过滤@" "
return [RACSignal empty];
}
else if ([value isEqualToString:@"5"]){ //接收到 @"5" 终止
* stop = YES;
RACSignal * result = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"a"];
[subscriber sendNext:@"b"];
[subscriber sendCompleted];
return nil;
}];
return result;
}
else {
return [RACSignal return:value];
}
};
}];
[signal subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
[signal subscribeCompleted:^{
NSLog(@"complete");
}];
/* 某次执行结果
2018-03-27 15:14:02.334078+0800 ReactCocoaTest[91664:2403921] 1
2018-03-27 15:14:02.334390+0800 ReactCocoaTest[91664:2403921] 3
2018-03-27 15:14:02.334562+0800 ReactCocoaTest[91664:2403921] 4
2018-03-27 15:14:02.334836+0800 ReactCocoaTest[91664:2403921] a
2018-03-27 15:14:02.334957+0800 ReactCocoaTest[91664:2403921] b
2018-03-27 15:14:02.335673+0800 ReactCocoaTest[91664:2403921] complete
*/
- (instancetype)concat:(RACStream *)stream;
将 stream a与 stream b 联结起来并产生一个新的 stream c,只有当前stream a sendComplete 之后 stream b 才能发送值,当 stream a 和 stream b 都 sendComplete 之后,联结产生的stream c 才会 sendComplete。
注意:如果联结的两个stream中某个stream sendError 则联结产生的stream立即 sendError 。
例:
创建两个stream,并且各自延迟5秒 sendComplete
RACSignal * signal1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"1"];
[[RACScheduler mainThreadScheduler]afterDelay:5 schedule:^{
[subscriber sendCompleted];
}];
return nil;
}];
RACSignal * signal2 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"2"];
[[RACScheduler mainThreadScheduler]afterDelay:5 schedule:^{
[subscriber sendCompleted];
}];
return nil;
}];
RACSignal * concat = [signal1 concat:signal2];
[concat subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
[concat subscribeCompleted:^{
NSLog(@"complete");
}];
[concat subscribeError:^(NSError *error) {
NSLog(@"error");
}];
/*
2018-03-27 16:37:18.546755+0800 ReactCocoaTest[94470:2467226] 1
2018-03-27 16:37:24.040041+0800 ReactCocoaTest[94470:2467226] 2
2018-03-27 16:37:29.513247+0800 ReactCocoaTest[94470:2467226] complete
*/
可以看到控制台三次输出间隔时间为5秒,大家可以将signal1 或者signal2中任意一个 sendComplete 修改为 sendError 验证signal sendError的情况。
//- (instancetype)concat:(RACStream *)stream 的升级版 针对多个stream的情况
+ (instancetype)concat:(id<NSFastEnumeration>)streams;
- (instancetype)zipWith:(RACStream *)stream;
将 stream a 与 stream b 结合起来产生一个新的 stream c,当 stream a 和 stream b 都发送过一次值之后 stream c 才会将这两个值通过RACTuple对象发送。
注意:
1.stream c 发送的 RACTuple 中的值为 stream a 与 stream b 同次发送的值。
2.stream a 与 stream b 中任意一个 sendComplete 则 stream c sendComplete,当 stream a 与 stream b 任意一个 sendError 则 stream c sendError。
例:
RACSignal * signal1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"1"];
[subscriber sendNext:@"2"];
[subscriber sendNext:@"3"];
[subscriber sendNext:@"4"];
[subscriber sendNext:@"5"];
[subscriber sendNext:@"6"];
[[RACScheduler mainThreadScheduler]afterDelay:7 schedule:^{
[subscriber sendError:nil];
}];
return nil;
}];
RACSignal * signal2 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"a"];
[subscriber sendNext:@"b"];
[[RACScheduler mainThreadScheduler]afterDelay:5 schedule:^{
[subscriber sendNext:@"d"];
[subscriber sendCompleted];
}];
return nil;
}];
RACSignal * zip = [signal1 zipWith:signal2];
[zip subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
[zip subscribeCompleted:^{
NSLog(@"complete");
}];
[zip subscribeError:^(NSError *error) {
NSLog(@"error");
}];
/*
2018-03-27 17:20:15.922457+0800 ReactCocoaTest[96661:2499724] <RACTuple: 0x604000206030> (
1,
a
)
2018-03-27 17:20:15.922787+0800 ReactCocoaTest[96661:2499724] <RACTuple: 0x604000206010> (
2,
b
)
2018-03-27 17:20:21.414965+0800 ReactCocoaTest[96661:2499724] <RACTuple: 0x604000207110> (
3,
d
)
2018-03-27 17:20:21.415268+0800 ReactCocoaTest[96661:2499724] complete
*/
//- (instancetype)zipWith:(RACStream *)stream 的升级版 针对多个stream的情况
+ (instancetype)zip:(id<NSFastEnumeration>)streams;
- (instancetype)flattenMap:(RACStream * (^)(id value))block;
产生一个新的stream 每当stream发送一个值时会先询问block并且订阅block中返回的stream。
flattenMap 适用于stream中嵌套stream的场景,例如当用户点击登录按钮后我们需要将用户名密码等发送至服务器效验,由服务器返回效验结果以判断用户名密码等是否正确。
RACSignal * loginSignal = [self.loginButton rac_signalForControlEvents:UIControlEventTouchUpInside];
RACSignal * checkUserSignal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
// 延时模拟请求
[[RACScheduler mainThreadScheduler]afterDelay:3 schedule:^{
[subscriber sendNext:@"success"];
[subscriber sendCompleted];
}];
return nil;
}];
RACSignal * loginResultSignal = [loginSignal flattenMap:^RACStream *(id value) {
return checkUserSignal;
}];
[loginResultSignal subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
- (instancetype)map:(id (^)(id value))block;
每当stream发送一个value时会将value传入block产生一个新的value并发送。
RACSignal * signal = [[RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"1"];
[subscriber sendNext:@" "];
[subscriber sendNext:@"1"];
[subscriber sendCompleted];
return nil;
}] map:^id(NSString * value) {
if ([value isEqualToString:@" "]) {
return @"0";
}
return value;
}];
[signal subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
- (instancetype)mapReplace:(id)object;
将 stream 发送的值全部替换为为object
- (instancetype)filter:(BOOL (^)(id value))block;
stream 每产生一个值都传入block进行测试,当block返回NO时该值将不会被订阅者接收到。
- (instancetype)ignore:(id)innerValue;
stream 每产生一个value进行 value != innerValue && [value -isEqual:innerValue] 运算,运算结果为YES时订阅者不会接收到value。innerValue 可以被设置为 nil 。
例
RACSignal * signal = [[RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"1"];
[subscriber sendNext:nil];
[subscriber sendNext:@"1"];
[subscriber sendCompleted];
return nil;
}]ignore:nil];
[signal subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
/*
2018-03-28 17:39:59.041 ReactCocoaTest[3000:1590755] 1
2018-03-28 17:39:59.041 ReactCocoaTest[3000:1590755] 1
*/
- (instancetype)reduceEach:(id (^)())reduceBlock;
将stream发送的RACTuple中的值使用reduceBlock处理为一个值。在方法声明中并没有说明reduceBlock的参数个数,使用时注意reduceBlock的参数个数要与RACTuple中包含的值的个数相同。
例
RACSignal * signal1 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"1"];
[subscriber sendNext:@"11"];
[subscriber sendCompleted];
return nil;
}];
RACSignal * signal2 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"2"];
[subscriber sendNext:@"22"];
[subscriber sendCompleted];
return nil;
}];
RACSignal * signal3 = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"3"];
[[RACScheduler mainThreadScheduler]afterDelay:2 schedule:^{
[subscriber sendNext:@"33"];
[subscriber sendCompleted];
}];
return nil;
}];
[[[RACSignal zip:@[signal1,signal2,signal3]] reduceEach:^id(NSString * a, NSString * b, NSString * c){
return [NSString stringWithFormat:@"%@ %@ %@",a,b,c];
}] subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
+ (instancetype)zip:(id<NSFastEnumeration>)streams reduce:(id (^)())reduceBlock;
+zip: 与 -reduceEach:的组合。
- (instancetype)startWith:(id)value;
指定stream的起始值,当订阅者订阅stream时会立即收到value所指定的值。
例
RACSignal * signal = [[RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[[RACScheduler mainThreadScheduler]afterDelay:2 schedule:^{
[subscriber sendNext:@"1"];
[subscriber sendCompleted];
}];
return nil;
}] startWith:@"0"];
[signal subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
2018-04-12 11:39:17.538975+0800 ReactiveCocoaTest[31885:730340] 0
2018-04-12 11:39:19.540758+0800 ReactiveCocoaTest[31885:730340] 1
- (instancetype)skip:(NSUInteger)skipCount;
跳过指定次数的值,订阅者只能接收到第skipCount次之后的值。
例
RACSignal * signal = [[RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[[RACScheduler mainThreadScheduler]afterDelay:2 schedule:^{
[subscriber sendNext:@"1"];
[subscriber sendNext:@"2"];
[subscriber sendNext:@"3"];
[subscriber sendNext:@"4"];
[subscriber sendNext:@"5"];
[subscriber sendCompleted];
}];
return nil;
}] skip:3];
[signal subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
2018-04-12 15:26:04.206220+0800 ReactiveCocoaTest[36205:852172] 4
2018-04-12 15:26:04.206473+0800 ReactiveCocoaTest[36205:852172] 5
- (instancetype)take:(NSUInteger)count;
只发送前count次的值。
RACSignal * signal = [[RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[[RACScheduler mainThreadScheduler]afterDelay:2 schedule:^{
[subscriber sendNext:@"1"];
[subscriber sendNext:@"2"];
[subscriber sendNext:@"3"];
[subscriber sendNext:@"4"];
[subscriber sendNext:@"5"];
[subscriber sendCompleted];
}];
return nil;
}] take:3];
[signal subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
2018-04-12 15:27:04.557102+0800 ReactiveCocoaTest[36338:854896] 1
2018-04-12 15:27:04.557391+0800 ReactiveCocoaTest[36338:854896] 2
2018-04-12 15:27:04.557603+0800 ReactiveCocoaTest[36338:854896] 3
- (instancetype)scanWithStart:(id)startingValue reduce:(id (^)(id running, id next))block;
startingValue:block中running的起始值。
block:依次将stream中发送的值作为next传入block并且将上一次block的返回值作为running传入block。每一次block返回一个值后会将原来的值替换。
官方给出的例子
RACSequence *numbers = @[ @1, @2, @3, @4 ].rac_sequence;
// Contains 1, 3, 6, 10
RACSequence *sums = [numbers scanWithStart:@0 reduce:^(NSNumber *sum, NSNumber *next) {
return @(sum.integerValue + next.integerValue);
}];
例
RACSignal * signal = [[RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[[RACScheduler mainThreadScheduler]afterDelay:2 schedule:^{
[subscriber sendNext:@"1"];
[subscriber sendNext:@"2"];
[subscriber sendNext:@"3"];
[subscriber sendNext:@"4"];
[subscriber sendNext:@"5"];
[subscriber sendCompleted];
}];
return nil;
}] scanWithStart:@"a" reduce:^id(id running, id next) {
return [NSString stringWithFormat:@"%@,%@",running,next];
}];
[signal subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
2018-04-12 16:35:38.272950+0800 ReactiveCocoaTest[38313:919048] a,1
2018-04-12 16:35:38.273245+0800 ReactiveCocoaTest[38313:919048] a,1,2
2018-04-12 16:35:38.273708+0800 ReactiveCocoaTest[38313:919048] a,1,2,3
2018-04-12 16:35:38.274369+0800 ReactiveCocoaTest[38313:919048] a,1,2,3,4
2018-04-12 16:35:38.274632+0800 ReactiveCocoaTest[38313:919048] a,1,2,3,4,5
- (instancetype)combinePreviousWithStart:(id)start reduce:(id (^)(id previous, id current))reduceBlock;
start:reduceBlock中previous的起始值。
reduceBlock:依次将stream中上一次发送的值与当前发送的值分别作为previous和current传入reduceBlock,并得到一个新的值。每一次reduceBlock返回一个值后会将原来的值替换。
官方给出的例子
RACSequence *numbers = @[ @1, @2, @3, @4 ].rac_sequence;
// Contains 1, 3, 5, 7
RACSequence *sums = [numbers combinePreviousWithStart:@0 reduce:^(NSNumber *previous, NSNumber *next) {
return @(previous.integerValue + next.integerValue);
}];
例
RACSignal * signal = [[RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[[RACScheduler mainThreadScheduler]afterDelay:2 schedule:^{
[subscriber sendNext:@"1"];
[subscriber sendNext:@"2"];
[subscriber sendNext:@"3"];
[subscriber sendNext:@"4"];
[subscriber sendNext:@"5"];
[subscriber sendCompleted];
}];
return nil;
}] combinePreviousWithStart:@"a" reduce:^id(id previous, id current) {
return [NSString stringWithFormat:@"%@,%@",previous,current];
}];
[signal subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
2018-04-12 16:41:54.005897+0800 ReactiveCocoaTest[38662:929686] a,1
2018-04-12 16:41:54.006375+0800 ReactiveCocoaTest[38662:929686] 1,2
2018-04-12 16:41:54.006617+0800 ReactiveCocoaTest[38662:929686] 2,3
2018-04-12 16:41:54.006957+0800 ReactiveCocoaTest[38662:929686] 3,4
2018-04-12 16:41:54.008215+0800 ReactiveCocoaTest[38662:929686] 4,5
- (instancetype)takeUntilBlock:(BOOL (^)(id x))predicate;
每当stream产生一个值都会询问predicate,当predicate返回YES当前值以及后续值不会被订阅者接收到。
例
RACSignal * signal = [[RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"1"];
[subscriber sendNext:@"2"];
[subscriber sendNext:@"3"];
[subscriber sendNext:@"4"];
[subscriber sendNext:@"5"];
[subscriber sendCompleted];
return nil;
}] takeUntilBlock:^BOOL(id x) {
return [x isEqualToString:@"3"];
}];
[signal subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
2018-04-12 16:54:04.421888+0800 ReactiveCocoaTest[39002:945018] 1
2018-04-12 16:54:04.422144+0800 ReactiveCocoaTest[39002:945018] 2
- (instancetype)takeWhileBlock:(BOOL (^)(id x))predicate;
每当stream产生一个值都会询问predicate,当predicate返回NO当前值以及后续值不会被订阅者接收到。
例
RACSignal * signal = [[RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"1"];
[subscriber sendNext:@"2"];
[subscriber sendNext:@"3"];
[subscriber sendNext:@"4"];
[subscriber sendNext:@"5"];
[subscriber sendCompleted];
return nil;
}] takeWhileBlock:^BOOL(id x) {
return [x isEqualToString:@"1"];
}];
[signal subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
2018-04-12 16:57:01.353910+0800 ReactiveCocoaTest[39611:952168] 1
- (instancetype)skipUntilBlock:(BOOL (^)(id x))predicate;
每当stream产生一个值都会询问predicate,当predicate返回YES当前值以及后续值才会被订阅者接收到。
例
RACSignal * signal = [[RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"1"];
[subscriber sendNext:@"2"];
[subscriber sendNext:@"3"];
[subscriber sendNext:@"4"];
[subscriber sendNext:@"5"];
[subscriber sendCompleted];
return nil;
}] skipUntilBlock:^BOOL(id x) {
return [x isEqualToString:@"3"];
}];
[signal subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
2018-04-12 17:00:28.968933+0800 ReactiveCocoaTest[39904:958633] 3
2018-04-12 17:00:28.969232+0800 ReactiveCocoaTest[39904:958633] 4
2018-04-12 17:00:28.969613+0800 ReactiveCocoaTest[39904:958633] 5
- (instancetype)skipWhileBlock:(BOOL (^)(id x))predicate;
每当stream产生一个值都会询问predicate,当predicate返回NO当前值以及后续值才会被订阅者接收到。
例
RACSignal * signal = [[RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"1"];
[subscriber sendNext:@"2"];
[subscriber sendNext:@"3"];
[subscriber sendNext:@"4"];
[subscriber sendNext:@"5"];
[subscriber sendCompleted];
return nil;
}] skipWhileBlock:^BOOL(id x) {
return [x isEqualToString:@"1"];
}];
[signal subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
2018-04-12 17:13:25.548610+0800 ReactiveCocoaTest[40829:978359] 2
2018-04-12 17:13:25.548842+0800 ReactiveCocoaTest[40829:978359] 3
2018-04-12 17:13:25.549123+0800 ReactiveCocoaTest[40829:978359] 4
2018-04-12 17:13:25.549322+0800 ReactiveCocoaTest[40829:978359] 5
- (instancetype)distinctUntilChanged;
每当stream产生一个新的值都会与上一次产生的值进行 lastValue == x || [x isEqual:lastValue] 判断,如果返回YES则订阅者不会接收到值。
例
RACSignal * signal = [[RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@"1"];
[subscriber sendNext:@"1"];
[subscriber sendNext:@"2"];
[subscriber sendNext:@"1"];
[subscriber sendNext:@"1"];
[subscriber sendCompleted];
return nil;
}] distinctUntilChanged];
[signal subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
2018-04-12 17:16:13.919028+0800 ReactiveCocoaTest[40996:981741] 1
2018-04-12 17:16:13.919383+0800 ReactiveCocoaTest[40996:981741] 2
2018-04-12 17:16:13.919676+0800 ReactiveCocoaTest[40996:981741] 1
网友评论