PFEventuallyQueue
事件队列
PFPinningEventuallyQueue
只是 同步线程,进行数据保护
PFMemoryEventuallyQueue
内存线程,一样的
// 串行队列,设置 special key, value, 进行队列验证。
- (instancetype)initWithDataSource:(id<PFCommandRunnerProvider>)dataSource
maxAttemptsCount:(NSUInteger)attemptsCount
retryInterval:(NSTimeInterval)retryInterval {
...
// Set up all the queues
NSString *queueBaseLabel = [NSString stringWithFormat:@"com.parse.%@", NSStringFromClass([self class])];
_synchronizationQueue = dispatch_queue_create([NSString stringWithFormat:@"%@.synchronization", queueBaseLabel].UTF8String,
DISPATCH_QUEUE_SERIAL);
PFMarkDispatchQueue(_synchronizationQueue);
_synchronizationExecutor = [BFExecutor executorWithDispatchQueue:_synchronizationQueue];
_processingQueue = dispatch_queue_create([NSString stringWithFormat:@"%@.processing", queueBaseLabel].UTF8String,
DISPATCH_QUEUE_SERIAL);
PFMarkDispatchQueue(_processingQueue);
_processingQueueSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_ADD, 0, 0, _processingQueue);
_commandEnqueueTaskQueue = [[PFTaskQueue alloc] init];
...
return self;
}
下面是对 dispatch_source_type_add 的处理,
// 处理 source 的队列
_processingQueue = dispatch_queue_create([NSString stringWithFormat:@"%@.processing", queueBaseLabel].UTF8String,
DISPATCH_QUEUE_SERIAL);
// source type add
_processingQueueSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_ADD, 0, 0, _processingQueue);
// 触发调用 source event block
dispatch_source_merge_data(_processingQueueSource, 1);
// set event block
- (void)start {
dispatch_source_set_event_handler(_processingQueueSource, ^{
[self _runCommands];
});
[self resume];
}
// 触发一次
- (void)resume {
if (self.running) {
return;
}
self.running = YES;
dispatch_resume(_processingQueueSource); // 启动
dispatch_source_merge_data(_processingQueueSource, 1);
}
// 暂停
- (void)pause {
if (!self.running) {
return;
}
self.running = NO;
dispatch_suspend(_processingQueueSource);
}
// 关闭
- (void)terminate {
[self _stopMonitoringNetworkReachability];
dispatch_source_cancel(_processingQueueSource);
}
通过
dispatch_source_merge_data(_processingQueueSource, 1);
进行异步调用,执行 event handler,在此处的作用,只是异步到 _processingQueue 进行处理数据,并没有数据上的计数,可以合并触发,不用那么及时的触发当了解这个规则后,可以更简洁的调用相同的异步操作
在子线程中,同步等待 retryInterval * NSEC_PER_SEC
, dispatch_semaphore
的作用相当于阻塞当前线程等待一定时间
__block dispatch_semaphore_t semaphore = NULL;
dispatch_sync(_synchronizationQueue, ^{
self->_retryingSemaphore = dispatch_semaphore_create(0);
semaphore = self->_retryingSemaphore;
});
dispatch_time_t timeoutTime = dispatch_time(DISPATCH_TIME_NOW,
(int64_t)(self.retryInterval * NSEC_PER_SEC));
long waitResult = dispatch_semaphore_wait(semaphore, timeoutTime);
dispatch_sync(_synchronizationQueue, ^{
self->_retryingSemaphore = NULL;
});
// 网络重新连接,肯定是不,重新开始
if (connected) {
dispatch_async(_synchronizationQueue, ^{
if (self->_retryingSemaphore) {
dispatch_semaphore_signal(self->_retryingSemaphore);
}
});
}
总结:使用 dispatch_semaphore 进行队列阻塞。
网友评论