RACSignal

作者: 昵称是乱起的 | 来源:发表于2019-01-18 01:10 被阅读3次
最简单的RACSignal用法
    //1.创建信号
    RACSignal *signal = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber>  _Nonnull subscriber) {
    //3.发送信号
    [subscriber sendNext:@"发出信号"];
        return [RACDisposable disposableWithBlock:^{
            NSLog(@"销毁了");
        }];
    }];
    //2.订阅信号
    [signal subscribeNext:^(id  _Nullablex) {
        NSLog(@"%@",x);
    }];

先跟踪下调用流程:createSignal

#[RACSignal createSignal:]的调用栈
// 创建了一个RACDynamicSignal的signal,并返回
//把创建时传进来的didSubscribe block保存在signal的_didSubscribe里面,这里copy是深拷贝
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
    return [RACDynamicSignal createSignal:didSubscribe];
}
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
    RACDynamicSignal *signal = [[self alloc] init];
    signal->_didSubscribe = [didSubscribe copy];
    return [signal setNameWithFormat:@"+createSignal:"];
}
#[signal subscribeNext:]调用栈
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
    NSCParameterAssert(nextBlock != NULL);
    //创建一个订阅者 ,内部者内部保存了next block,
    //订阅者init方法创建了RACDisposable对象selfDisposable
    //还创建了一个复合销毁者RACCompoundDisposable对象_disposable
    //_disposable对象内部_inlineDisposables数组的第一个元素保存了这个selfDisposable
    RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
    return [self subscribe:o];
}
#RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
    RACSubscriber *subscriber = [[self alloc] init];
    subscriber->_next = [next copy];
    subscriber->_error = [error copy];
    subscriber->_completed = [completed copy];
    return subscriber;
}
RACSubscriber的init方法里面
- (instancetype)init {
    self = [super init];
    @unsafeify(self);
    //内部的销毁对象
    RACDisposable *selfDisposable = [RACDisposable disposableWithBlock:^{
        @strongify(self);
        @synchronized (self) {
            self.next = nil;
            self.error = nil;
            self.completed = nil;
        }
    }];
    //创建一个复合销毁对象 compoundDisposable,里面是工厂方法初始化的,这里面其他操作不执行
    _disposable = [RACCompoundDisposable compoundDisposable];
    //将selfDisposable存在 _disposable的_inlineDisposables数组中,数组中第一个就是存的它
    [_disposable addDisposable:selfDisposable];
    return self;
}

@interface RACCompoundDisposable () {
    pthread_mutex_t _mutex;
    #define RACCompoundDisposableInlineCount 2
    #if RACCompoundDisposableInlineCount
    //c语言数组
    RACDisposable *_inlineDisposables[RACCompoundDisposableInlineCount];
    CFMutableArrayRef _disposables;
    BOOL _disposed;
}
#[_disposable addDisposable:selfDisposable];
- (void)addDisposable:(RACDisposable *)disposable {
    NSCParameterAssert(disposable != self);
    if (disposable == nil || disposable.disposed) return;
    BOOL shouldDispose = NO;
    //互斥锁防止资源抢夺
    pthread_mutex_lock(&_mutex);
    {
        if (_disposed) {
            shouldDispose = YES;
        } else {
            for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
                if (_inlineDisposables[i] == nil) {
                    _inlineDisposables[i] = disposable;
                    goto foundSlot;
                }
            }
            #endif
            if (_disposables == NULL) _disposables = RACCreateDisposablesArray();
            CFArrayAppendValue(_disposables, (__bridge void *)disposable);
            if (RACCOMPOUNDDISPOSABLE_ADDED_ENABLED()) {            RACCOMPOUNDDISPOSABLE_ADDED(self.description.UTF8String, disposable.description.UTF8String, CFArrayGetCount(_disposables) + RACCompoundDisposableInlineCount);
            }
        #if RACCompoundDisposableInlineCount
        foundSlot:;
        #endif
        }
    }
    pthread_mutex_unlock(&_mutex);
    // Performed outside of the lock in case the compound disposable is used
    // recursively.
    if (shouldDispose) [disposable dispose];
}
#[self subscribe:o] 是调用RACDynamicSignal内部的
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
    NSCParameterAssert(subscriber != nil);
    //仅仅是创建了一个复合销毁对象而已
    RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
    //创建一个RACPassthroughSubscriber类的对象subscriber
    //这个Passthroughd类subscriber内的_innerSubscriber保存这个subscriber
    //subscriber内部_signal保存这个signal
    //subscriber内部_disposable保存这个复合销毁者disposable
//Passthroughd类subscriber内将上面创建的复合销毁对象disposable加入到_innerSubscriber数组的第二个位置
//Passthroughd类subscribe内部将上面创建的复合销毁对象disposable的_innerSubscriber数组中加入一个新的销毁者对象,传入了一个block,block中将Passthroughd类subscribe中的_innerSubscriber中复合销毁者对象_inlineDisposables数组中移除上面创建的这个复合销毁对象disposable
    subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
    if (self.didSubscribe != NULL) {
//单例创建了一个RACSubscriptionScheduler对象
        RACDisposable *schedulingDisposable = 
[RACScheduler.subscriptionScheduler schedule:^{
            RACDisposable *innerDisposable = self.didSubscribe(subscriber);
            [disposable addDisposable:innerDisposable];
        }];
        [disposable addDisposable:schedulingDisposable];
    }
    return disposable;
}
#RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
+ (instancetype)compoundDisposable {
    return [[self alloc] initWithDisposables:nil];
}
#define RACCompoundDisposableInlineCount 2
- (instancetype)initWithDisposables:(NSArray *)otherDisposables {
    self = [self init];
    #if RACCompoundDisposableInlineCount
    [otherDisposables enumerateObjectsUsingBlock:^(RACDisposable *disposable, NSUInteger index, BOOL *stop) {
        self->_inlineDisposables[index] = disposable;
        // Stop after this iteration if we've reached the end of the inlined
        // array.
        if (index == RACCompoundDisposableInlineCount - 1) *stop = YES;
    }];
    #endif
    if (otherDisposables.count > RACCompoundDisposableInlineCount) {
        _disposables = RACCreateDisposablesArray();
        CFRange range = CFRangeMake(RACCompoundDisposableInlineCount, (CFIndex)otherDisposables.count - RACCompoundDisposableInlineCount);
        CFArrayAppendArray(_disposables, (__bridge CFArrayRef)otherDisposables, range);
    }
    return self;
}
#subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
- (instancetype)initWithSubscriber:(id<RACSubscriber>)subscriber signal:(RACSignal *)signal disposable:(RACCompoundDisposable *)disposable {
    NSCParameterAssert(subscriber != nil);
    self = [super init];
    _innerSubscriber = subscriber;
    _signal = signal;
    _disposable = disposable;
    [self.innerSubscriber didSubscribeWithDisposable:self.disposable];
    return self;
}
#[self.innerSubscriber didSubscribeWithDisposable:self.disposable];
- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)otherDisposable {
    if (otherDisposable.disposed) return;
    RACCompoundDisposable *selfDisposable = self.disposable;
    [selfDisposable addDisposable:otherDisposable];
    @unsafeify(otherDisposable);
    // If this subscription terminates, purge its disposable to avoid unbounded
    // memory growth.
    [otherDisposable addDisposable:[RACDisposable disposableWithBlock:^{
        @strongify(otherDisposable);
        [selfDisposable removeDisposable:otherDisposable];
    }]];
}
#RACDisposable *schedulingDisposable =  [RACScheduler.subscriptionScheduler schedule:^{
#           RACDisposable *innerDisposable = self.didSubscribe(subscriber);
//复合销毁者disposable的_inlineDisposables数组第二个位置存储 创建signal返回的那个dispose
#           [disposable addDisposable:innerDisposable];
#       }];
    //主线程的schedulingDisposable==nil里面直接return了
#       [disposable addDisposable:schedulingDisposable];
#   }
- (RACDisposable *)schedule:(void (^)(void))block {
    NSCParameterAssert(block != NULL);
    //currentScheduler是mainThreadScheduler,会执行下面的block()
    if (RACScheduler.currentScheduler == nil) return [self.backgroundScheduler schedule:block];
//RACDisposable *innerDisposable = self.didSubscribe(subscriber);
//会执行dysignal保存的didSubscribe,执行这段代码[subscriber sendNext:@"发送呀"];
    block();
    return nil;
}
#RACScheduler.currentScheduler
+ (RACScheduler *)currentScheduler {
    RACScheduler *scheduler = NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey];
    if (scheduler != nil) return scheduler;
    //在主线程中 返回主线程的mainThreadScheduler
    if ([self.class isOnMainThread]) return RACScheduler.mainThreadScheduler;
    return nil;
}
#RACScheduler.mainThreadScheduler
+ (RACScheduler *)mainThreadScheduler {
    static dispatch_once_t onceToken;
    static RACScheduler *mainThreadScheduler;
    dispatch_once(&onceToken, ^{
        mainThreadScheduler = [[RACTargetQueueScheduler alloc] initWithName:@"org.reactivecocoa.ReactiveObjC.RACScheduler.mainThreadScheduler" targetQueue:dispatch_get_main_queue()];
    });
    return mainThreadScheduler;
}
# [[RACTargetQueueScheduler alloc] initWithName:@"org.reactivecocoa.ReactiveObjC.RACScheduler.mainThreadScheduler" targetQueue:dispatch_get_main_queue()];
- (instancetype)initWithName:(NSString *)name targetQueue:(dispatch_queue_t)targetQueue {
    NSCParameterAssert(targetQueue != NULL);
    if (name == nil) {
        name = [NSString stringWithFormat:@"org.reactivecocoa.ReactiveObjC.RACTargetQueueScheduler(%s)", dispatch_queue_get_label(targetQueue)];
    }
    dispatch_queue_t queue = dispatch_queue_create(name.UTF8String, DISPATCH_QUEUE_SERIAL);
    if (queue == NULL) return nil;
    //设置queue跟targetQueue执行优先级一样
    dispatch_set_target_queue(queue, targetQueue);
    return [super initWithName:name queue:queue];
}
#[subscriber sendNext:@"发送呀"];
//RACPassthroughSubscriber内部
- (void)sendNext:(id)value {
    if (self.disposable.disposed) return;
    if (RACSIGNAL_NEXT_ENABLED()) {
        RACSIGNAL_NEXT(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description), cleanedDTraceString([value description]));
    }
    [self.innerSubscriber sendNext:value];
}
#[self.innerSubscriber sendNext:value];
- (void)sendNext:(id)value {
    @synchronized (self) {
        void (^nextBlock)(id) = [self.next copy];
        if (nextBlock == nil) return;
    //这里会执行RACSubscriber保存的的next block,把value传进去
        nextBlock(value);
    }
}

整个调用的流程走完了,销毁的流程等待后续,上面的理解可能有偏差,看到的可给指正

相关文章

网友评论

      本文标题:RACSignal

      本文链接:https://www.haomeiwen.com/subject/qqcwdqtx.html