美文网首页
RACSubject

RACSubject

作者: YY_Lee | 来源:发表于2019-11-29 10:40 被阅读0次
RACSubject *subject = [RACSubject subject];
[subject subscribeNext:^(id  _Nullable x) {
    NSLog(@"before:%@",x);
}];
[subject sendNext:@"test"];

[subject subscribeNext:^(id  _Nullable x) {
    NSLog(@"after:%@",x);
}];

打印结果:
ReactiveCocoa[5111:179158] before:test

上面的例子中,发送消息前的订阅触发了,发送消息后的订阅没有触发,这是为什么呢?我们来看看具体的源码实现:

+ (instancetype)subject {
    return [[self alloc] init];
}
- (instancetype)init {
    self = [super init];
    if (self == nil) return nil;

    _disposable = [RACCompoundDisposable compoundDisposable];
    _subscribers = [[NSMutableArray alloc] initWithCapacity:1];
    
    return self;
}

创建的源码实现比较简单,但看到RACSubject内部有个成员变量_subscribers,是可变数组。接下来看看订阅的实现:

// 订阅时创建订阅者对象RACSubscriber,并调用自己的subscribe方法
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
    NSCParameterAssert(nextBlock != NULL);
    
    RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
    return [self subscribe:o];
}

//RACSubject有个subscribers属性,用来存储所有的订阅者
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    NSCParameterAssert(subscriber != nil);

    RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
    subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];

    NSMutableArray *subscribers = self.subscribers;
    @synchronized (subscribers) {
        [subscribers addObject:subscriber];
    }
    
    [disposable addDisposable:[RACDisposable disposableWithBlock:^{
        @synchronized (subscribers) {  
            // Since newer subscribers are generally shorter-lived, search
            // starting from the end of the list.
            NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id<RACSubscriber> obj, NSUInteger index, BOOL *stop) {
                return obj == subscriber;
            }];
            //一次所有订阅则
            if (index != NSNotFound) [subscribers removeObjectAtIndex:index];
        }
    }]];

    return disposable;
}

订阅的主要实现就是创建订阅者并加入到_subscribers中。RACSubject可以多次订阅,每次订阅都会创建一个订阅者并添加到_subscribers数组中。

下面是发送消息的实现:

- (void)sendNext:(id)value {
    [self enumerateSubscribersUsingBlock:^(id<RACSubscriber> subscriber) {
        //遍历所有订阅者发送消息
        [subscriber sendNext:value];
    }];
}

- (void)enumerateSubscribersUsingBlock:(void (^)(id<RACSubscriber> subscriber))block {
    NSArray *subscribers;
    @synchronized (self.subscribers) {
        subscribers = [self.subscribers copy];
    }

    for (id<RACSubscriber> subscriber in subscribers) {
        block(subscriber);
    }
}

发送消息时,遍历subscribers取出每一个subscriber发送消息。所以,必须先订阅才能接收到消息。这就是为什么开头的例子中,只打印了 before:test。

RACSubject既能订阅信号又能发送信号,所以RACSubject是一个热信号。上面有说到,RACSubject必须在信号发送前订阅才能触发,RAC还有另外一个类RACReplaySubject可以解决这个问题。

RACReplaySubject *subject = [RACReplaySubject subject];
    [subject subscribeNext:^(id  _Nullable x) {
        NSLog(@"before:%@",x);
    }];
    [subject sendNext:@"test"];

    [subject subscribeNext:^(id  _Nullable x) {
        NSLog(@"after:%@",x);
    }];
打印结果:
ReactiveCocoa[5247:209617] before:test
ReactiveCocoa[5247:209617] after:test

还是同样的例子,只是这次换成了RACReplaySubject,发送消息前后的订阅都触发了,这是怎么做到的呢?

RACReplaySubject是继承自RACSubject。首先看RACReplaySubject初始化方法的实现:

- (instancetype)init {
    return [self initWithCapacity:RACReplaySubjectUnlimitedCapacity];
}

- (instancetype)initWithCapacity:(NSUInteger)capacity {
    self = [super init];
    
    _capacity = capacity;
    _valuesReceived = (capacity == RACReplaySubjectUnlimitedCapacity ? [NSMutableArray array] : [NSMutableArray arrayWithCapacity:capacity]);
    
    return self;
}

RACReplaySubject也有个成员变量_valuesReceived是个可变数组。

RACReplaySubject的订阅实现如下:

- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];

    RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
        @synchronized (self) {
            for (id value in self.valuesReceived) {
                if (compoundDisposable.disposed) return;
                // 消息订阅时发送消息
                [subscriber sendNext:(value == RACTupleNil.tupleNil ? nil : value)];
            }

            if (compoundDisposable.disposed) return;

            if (self.hasCompleted) {
                [subscriber sendCompleted];
            } else if (self.hasError) {
                [subscriber sendError:self.error];
            } else {
                RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
                [compoundDisposable addDisposable:subscriptionDisposable];
            }
        }
    }];

    [compoundDisposable addDisposable:schedulingDisposable];

    return compoundDisposable;
}

从上面源码能看到消息订阅时,如果检测到self.valuesReceived中有值,那么当前的订阅者就会将值发送出去。

- (void)sendNext:(id)value {
    @synchronized (self) {
        //消息发送前先将value存储到valuesReceived中,留给后面订阅时发送
        [self.valuesReceived addObject:value ?: RACTupleNil.tupleNil];
        [super sendNext:value];
        
        if (self.capacity != RACReplaySubjectUnlimitedCapacity && self.valuesReceived.count > self.capacity) {
            [self.valuesReceived removeObjectsInRange:NSMakeRange(0, self.valuesReceived.count - self.capacity)];
        }
    }
}

能看到RACReplaySubject的sendNext实现,是先将消息发送的值value存储到valuesReceived中,然后调用父类RACSubject的sendNext。RACReplaySubjec在每次订阅时,会遍历_valuesReceived中存储的值给订阅者发送一遍。

相关文章

网友评论

      本文标题:RACSubject

      本文链接:https://www.haomeiwen.com/subject/nabvwctx.html