RACSubject *subject = [RACSubject subject];
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"before:%@",x);
}];
[subject sendNext:@"test"];
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"after:%@",x);
}];
打印结果:
ReactiveCocoa[5111:179158] before:test
上面的例子中,发送消息前的订阅触发了,发送消息后的订阅没有触发,这是为什么呢?我们来看看具体的源码实现:
+ (instancetype)subject {
return [[self alloc] init];
}
- (instancetype)init {
self = [super init];
if (self == nil) return nil;
_disposable = [RACCompoundDisposable compoundDisposable];
_subscribers = [[NSMutableArray alloc] initWithCapacity:1];
return self;
}
创建的源码实现比较简单,但看到RACSubject内部有个成员变量_subscribers,是可变数组。接下来看看订阅的实现:
// 订阅时创建订阅者对象RACSubscriber,并调用自己的subscribe方法
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
NSCParameterAssert(nextBlock != NULL);
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
return [self subscribe:o];
}
//RACSubject有个subscribers属性,用来存储所有的订阅者
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
NSMutableArray *subscribers = self.subscribers;
@synchronized (subscribers) {
[subscribers addObject:subscriber];
}
[disposable addDisposable:[RACDisposable disposableWithBlock:^{
@synchronized (subscribers) {
// Since newer subscribers are generally shorter-lived, search
// starting from the end of the list.
NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {
return obj == subscriber;
}];
//一次所有订阅则
if (index != NSNotFound) [subscribers removeObjectAtIndex:index];
}
}]];
return disposable;
}
订阅的主要实现就是创建订阅者并加入到_subscribers中。RACSubject可以多次订阅,每次订阅都会创建一个订阅者并添加到_subscribers数组中。
下面是发送消息的实现:
- (void)sendNext:(id)value {
[self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
//遍历所有订阅者发送消息
[subscriber sendNext:value];
}];
}
- (void)enumerateSubscribersUsingBlock:(void (^)(id<RACSubscriber> subscriber))block {
NSArray *subscribers;
@synchronized (self.subscribers) {
subscribers = [self.subscribers copy];
}
for (id<RACSubscriber> subscriber in subscribers) {
block(subscriber);
}
}
发送消息时,遍历subscribers取出每一个subscriber发送消息。所以,必须先订阅才能接收到消息。这就是为什么开头的例子中,只打印了 before:test。
RACSubject既能订阅信号又能发送信号,所以RACSubject是一个热信号。上面有说到,RACSubject必须在信号发送前订阅才能触发,RAC还有另外一个类RACReplaySubject可以解决这个问题。
RACReplaySubject *subject = [RACReplaySubject subject];
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"before:%@",x);
}];
[subject sendNext:@"test"];
[subject subscribeNext:^(id _Nullable x) {
NSLog(@"after:%@",x);
}];
打印结果:
ReactiveCocoa[5247:209617] before:test
ReactiveCocoa[5247:209617] after:test
还是同样的例子,只是这次换成了RACReplaySubject,发送消息前后的订阅都触发了,这是怎么做到的呢?
RACReplaySubject是继承自RACSubject。首先看RACReplaySubject初始化方法的实现:
- (instancetype)init {
return [self initWithCapacity:RACReplaySubjectUnlimitedCapacity];
}
- (instancetype)initWithCapacity:(NSUInteger)capacity {
self = [super init];
_capacity = capacity;
_valuesReceived = (capacity == RACReplaySubjectUnlimitedCapacity ? [NSMutableArray array] : [NSMutableArray arrayWithCapacity:capacity]);
return self;
}
RACReplaySubject也有个成员变量_valuesReceived是个可变数组。
RACReplaySubject的订阅实现如下:
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
@synchronized (self) {
for (id value in self.valuesReceived) {
if (compoundDisposable.disposed) return;
// 消息订阅时发送消息
[subscriber sendNext:(value == RACTupleNil.tupleNil ? nil : value)];
}
if (compoundDisposable.disposed) return;
if (self.hasCompleted) {
[subscriber sendCompleted];
} else if (self.hasError) {
[subscriber sendError:self.error];
} else {
RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
[compoundDisposable addDisposable:subscriptionDisposable];
}
}
}];
[compoundDisposable addDisposable:schedulingDisposable];
return compoundDisposable;
}
从上面源码能看到消息订阅时,如果检测到self.valuesReceived中有值,那么当前的订阅者就会将值发送出去。
- (void)sendNext:(id)value {
@synchronized (self) {
//消息发送前先将value存储到valuesReceived中,留给后面订阅时发送
[self.valuesReceived addObject:value ?: RACTupleNil.tupleNil];
[super sendNext:value];
if (self.capacity != RACReplaySubjectUnlimitedCapacity && self.valuesReceived.count > self.capacity) {
[self.valuesReceived removeObjectsInRange:NSMakeRange(0, self.valuesReceived.count - self.capacity)];
}
}
}
能看到RACReplaySubject的sendNext实现,是先将消息发送的值value存储到valuesReceived中,然后调用父类RACSubject的sendNext。RACReplaySubjec在每次订阅时,会遍历_valuesReceived中存储的值给订阅者发送一遍。
网友评论