最简单的RACSignal用法
//1.创建信号
RACSignal *signal = [RACSignal createSignal:^RACDisposable * _Nullable(id<RACSubscriber> _Nonnull subscriber) {
//3.发送信号
[subscriber sendNext:@"发出信号"];
return [RACDisposable disposableWithBlock:^{
NSLog(@"销毁了");
}];
}];
//2.订阅信号
[signal subscribeNext:^(id _Nullablex) {
NSLog(@"%@",x);
}];
先跟踪下调用流程:createSignal
#[RACSignal createSignal:]的调用栈
// 创建了一个RACDynamicSignal的signal,并返回
//把创建时传进来的didSubscribe block保存在signal的_didSubscribe里面,这里copy是深拷贝
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
return [RACDynamicSignal createSignal:didSubscribe];
}
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
RACDynamicSignal *signal = [[self alloc] init];
signal->_didSubscribe = [didSubscribe copy];
return [signal setNameWithFormat:@"+createSignal:"];
}
#[signal subscribeNext:]调用栈
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock {
NSCParameterAssert(nextBlock != NULL);
//创建一个订阅者 ,内部者内部保存了next block,
//订阅者init方法创建了RACDisposable对象selfDisposable
//还创建了一个复合销毁者RACCompoundDisposable对象_disposable
//_disposable对象内部_inlineDisposables数组的第一个元素保存了这个selfDisposable
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
return [self subscribe:o];
}
#RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:NULL completed:NULL];
+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
RACSubscriber *subscriber = [[self alloc] init];
subscriber->_next = [next copy];
subscriber->_error = [error copy];
subscriber->_completed = [completed copy];
return subscriber;
}
RACSubscriber的init方法里面
- (instancetype)init {
self = [super init];
@unsafeify(self);
//内部的销毁对象
RACDisposable *selfDisposable = [RACDisposable disposableWithBlock:^{
@strongify(self);
@synchronized (self) {
self.next = nil;
self.error = nil;
self.completed = nil;
}
}];
//创建一个复合销毁对象 compoundDisposable,里面是工厂方法初始化的,这里面其他操作不执行
_disposable = [RACCompoundDisposable compoundDisposable];
//将selfDisposable存在 _disposable的_inlineDisposables数组中,数组中第一个就是存的它
[_disposable addDisposable:selfDisposable];
return self;
}
@interface RACCompoundDisposable () {
pthread_mutex_t _mutex;
#define RACCompoundDisposableInlineCount 2
#if RACCompoundDisposableInlineCount
//c语言数组
RACDisposable *_inlineDisposables[RACCompoundDisposableInlineCount];
CFMutableArrayRef _disposables;
BOOL _disposed;
}
#[_disposable addDisposable:selfDisposable];
- (void)addDisposable:(RACDisposable *)disposable {
NSCParameterAssert(disposable != self);
if (disposable == nil || disposable.disposed) return;
BOOL shouldDispose = NO;
//互斥锁防止资源抢夺
pthread_mutex_lock(&_mutex);
{
if (_disposed) {
shouldDispose = YES;
} else {
for (unsigned i = 0; i < RACCompoundDisposableInlineCount; i++) {
if (_inlineDisposables[i] == nil) {
_inlineDisposables[i] = disposable;
goto foundSlot;
}
}
#endif
if (_disposables == NULL) _disposables = RACCreateDisposablesArray();
CFArrayAppendValue(_disposables, (__bridge void *)disposable);
if (RACCOMPOUNDDISPOSABLE_ADDED_ENABLED()) { RACCOMPOUNDDISPOSABLE_ADDED(self.description.UTF8String, disposable.description.UTF8String, CFArrayGetCount(_disposables) + RACCompoundDisposableInlineCount);
}
#if RACCompoundDisposableInlineCount
foundSlot:;
#endif
}
}
pthread_mutex_unlock(&_mutex);
// Performed outside of the lock in case the compound disposable is used
// recursively.
if (shouldDispose) [disposable dispose];
}
#[self subscribe:o] 是调用RACDynamicSignal内部的
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
//仅仅是创建了一个复合销毁对象而已
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
//创建一个RACPassthroughSubscriber类的对象subscriber
//这个Passthroughd类subscriber内的_innerSubscriber保存这个subscriber
//subscriber内部_signal保存这个signal
//subscriber内部_disposable保存这个复合销毁者disposable
//Passthroughd类subscriber内将上面创建的复合销毁对象disposable加入到_innerSubscriber数组的第二个位置
//Passthroughd类subscribe内部将上面创建的复合销毁对象disposable的_innerSubscriber数组中加入一个新的销毁者对象,传入了一个block,block中将Passthroughd类subscribe中的_innerSubscriber中复合销毁者对象_inlineDisposables数组中移除上面创建的这个复合销毁对象disposable
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
if (self.didSubscribe != NULL) {
//单例创建了一个RACSubscriptionScheduler对象
RACDisposable *schedulingDisposable =
[RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];
[disposable addDisposable:schedulingDisposable];
}
return disposable;
}
#RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
+ (instancetype)compoundDisposable {
return [[self alloc] initWithDisposables:nil];
}
#define RACCompoundDisposableInlineCount 2
- (instancetype)initWithDisposables:(NSArray *)otherDisposables {
self = [self init];
#if RACCompoundDisposableInlineCount
[otherDisposables enumerateObjectsUsingBlock:^(RACDisposable *disposable, NSUInteger index, BOOL *stop) {
self->_inlineDisposables[index] = disposable;
// Stop after this iteration if we've reached the end of the inlined
// array.
if (index == RACCompoundDisposableInlineCount - 1) *stop = YES;
}];
#endif
if (otherDisposables.count > RACCompoundDisposableInlineCount) {
_disposables = RACCreateDisposablesArray();
CFRange range = CFRangeMake(RACCompoundDisposableInlineCount, (CFIndex)otherDisposables.count - RACCompoundDisposableInlineCount);
CFArrayAppendArray(_disposables, (__bridge CFArrayRef)otherDisposables, range);
}
return self;
}
#subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
- (instancetype)initWithSubscriber:(id<RACSubscriber>)subscriber signal:(RACSignal *)signal disposable:(RACCompoundDisposable *)disposable {
NSCParameterAssert(subscriber != nil);
self = [super init];
_innerSubscriber = subscriber;
_signal = signal;
_disposable = disposable;
[self.innerSubscriber didSubscribeWithDisposable:self.disposable];
return self;
}
#[self.innerSubscriber didSubscribeWithDisposable:self.disposable];
- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)otherDisposable {
if (otherDisposable.disposed) return;
RACCompoundDisposable *selfDisposable = self.disposable;
[selfDisposable addDisposable:otherDisposable];
@unsafeify(otherDisposable);
// If this subscription terminates, purge its disposable to avoid unbounded
// memory growth.
[otherDisposable addDisposable:[RACDisposable disposableWithBlock:^{
@strongify(otherDisposable);
[selfDisposable removeDisposable:otherDisposable];
}]];
}
#RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
# RACDisposable *innerDisposable = self.didSubscribe(subscriber);
//复合销毁者disposable的_inlineDisposables数组第二个位置存储 创建signal返回的那个dispose
# [disposable addDisposable:innerDisposable];
# }];
//主线程的schedulingDisposable==nil里面直接return了
# [disposable addDisposable:schedulingDisposable];
# }
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);
//currentScheduler是mainThreadScheduler,会执行下面的block()
if (RACScheduler.currentScheduler == nil) return [self.backgroundScheduler schedule:block];
//RACDisposable *innerDisposable = self.didSubscribe(subscriber);
//会执行dysignal保存的didSubscribe,执行这段代码[subscriber sendNext:@"发送呀"];
block();
return nil;
}
#RACScheduler.currentScheduler
+ (RACScheduler *)currentScheduler {
RACScheduler *scheduler = NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey];
if (scheduler != nil) return scheduler;
//在主线程中 返回主线程的mainThreadScheduler
if ([self.class isOnMainThread]) return RACScheduler.mainThreadScheduler;
return nil;
}
#RACScheduler.mainThreadScheduler
+ (RACScheduler *)mainThreadScheduler {
static dispatch_once_t onceToken;
static RACScheduler *mainThreadScheduler;
dispatch_once(&onceToken, ^{
mainThreadScheduler = [[RACTargetQueueScheduler alloc] initWithName:@"org.reactivecocoa.ReactiveObjC.RACScheduler.mainThreadScheduler" targetQueue:dispatch_get_main_queue()];
});
return mainThreadScheduler;
}
# [[RACTargetQueueScheduler alloc] initWithName:@"org.reactivecocoa.ReactiveObjC.RACScheduler.mainThreadScheduler" targetQueue:dispatch_get_main_queue()];
- (instancetype)initWithName:(NSString *)name targetQueue:(dispatch_queue_t)targetQueue {
NSCParameterAssert(targetQueue != NULL);
if (name == nil) {
name = [NSString stringWithFormat:@"org.reactivecocoa.ReactiveObjC.RACTargetQueueScheduler(%s)", dispatch_queue_get_label(targetQueue)];
}
dispatch_queue_t queue = dispatch_queue_create(name.UTF8String, DISPATCH_QUEUE_SERIAL);
if (queue == NULL) return nil;
//设置queue跟targetQueue执行优先级一样
dispatch_set_target_queue(queue, targetQueue);
return [super initWithName:name queue:queue];
}
#[subscriber sendNext:@"发送呀"];
//RACPassthroughSubscriber内部
- (void)sendNext:(id)value {
if (self.disposable.disposed) return;
if (RACSIGNAL_NEXT_ENABLED()) {
RACSIGNAL_NEXT(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description), cleanedDTraceString([value description]));
}
[self.innerSubscriber sendNext:value];
}
#[self.innerSubscriber sendNext:value];
- (void)sendNext:(id)value {
@synchronized (self) {
void (^nextBlock)(id) = [self.next copy];
if (nextBlock == nil) return;
//这里会执行RACSubscriber保存的的next block,把value传进去
nextBlock(value);
}
}
整个调用的流程走完了,销毁的流程等待后续,上面的理解可能有偏差,看到的可给指正
网友评论