在前面的篇章中,多次遇见Scheduler,这是一个调度者,但是都没详细说,因为想在这篇章中详细说说。
-
Scheduler其实就是封装了一套GCD,在一些函数源码里已经使用了,我们也可以主动调用这个Scheduler。主要的有:MainScheduler、SerialDispatchQueueScheduler、ConcurrentDispatchQueueScheduler、OperationQueueScheduler。
我们看看下面的例子:
Observable.of(1, 2, 3)
.observeOn(MainScheduler.instance) //主线程
.subscribe(onNext: { (num) in
print(num)
}).disposed(by: disposeBag)
Observable.of(1, 2, 3)
.observeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName: "obOnSerial")) //串行
.subscribe(onNext: { (num) in
print(num)
}).disposed(by: disposeBag)
Observable.of(1, 2, 3)
.observeOn(ConcurrentDispatchQueueScheduler.init(qos: .background)) //并行
.subscribe(onNext: { (num) in
print(num)
}).disposed(by: disposeBag)
- 我们从调度者的初始化入手,前3个都是继承了
SchedulerType
协议,SchedulerType
又继承了ImmediateSchedulerType
:
(1)MainScheduler
-
MainScheduler
的本质其实就是SerialDispatchQueueScheduler
,可以看到初始化时,默认保存了DispatchQueue.main
队列:
public final class MainScheduler : SerialDispatchQueueScheduler {
...
public init() {
self._mainQueue = DispatchQueue.main
super.init(serialQueue: self._mainQueue)
}
public static let instance = MainScheduler()
...
}
(2)SerialDispatchQueueScheduler
public class SerialDispatchQueueScheduler : SchedulerType {
...
public convenience init(qos: DispatchQoS, internalSerialQueueName: String = "rx.global_dispatch_queue.serial", leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
self.init(queue: DispatchQueue.global(qos: qos.qosClass),
internalSerialQueueName: internalSerialQueueName,
leeway: leeway)
}
...
}
(3)ConcurrentDispatchQueueScheduler
ConcurrentDispatchQueueScheduler
和SerialDispatchQueueScheduler
的思路其实是一样的:
public class ConcurrentDispatchQueueScheduler: SchedulerType {
...
public convenience init(qos: DispatchQoS, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
self.init(queue: DispatchQueue(
label: "rxswift.queue.\(qos)",
qos: qos,
attributes: [DispatchQueue.Attributes.concurrent],
target: nil),
leeway: leeway
)
}
...
}
(4)OperationQueueScheduler
OperationQueueScheduler
封装了NSOperationQueue
:
public class OperationQueueScheduler: ImmediateSchedulerType {
...
public init(operationQueue: OperationQueue, queuePriority: Operation.QueuePriority = .normal) {
self.operationQueue = operationQueue
self.queuePriority = queuePriority
}
...
}
- 我们走
MainScheduler
的流程,然后我们来看看下一步的函数observeOn
,这里会判断是否为串行队列:
extension ObservableType {
...
public func observeOn(_ scheduler: ImmediateSchedulerType)
-> Observable<Element> {
if let scheduler = scheduler as? SerialDispatchQueueScheduler {
return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
}
else {
return ObserveOn(source: self.asObservable(), scheduler: scheduler)
}
}
}
- 判断自然是
SerialDispatchQueueScheduler
,然后返回了ObserveOnSerialDispatchQueue
序列(ObserveOnSerialDispatchQueue
继承了Producer
)。ObserveOnSerialDispatchQueue
保存了scheduler
调度者和source
源序列:
final private class ObserveOnSerialDispatchQueue<Element>: Producer<Element> {
let scheduler: SerialDispatchQueueScheduler
let source: Observable<Element>
init(source: Observable<Element>, scheduler: SerialDispatchQueueScheduler) {
self.scheduler = scheduler
self.source = source
...
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
let subscription = self.source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
...
}
final private class ObserveOnSerialDispatchQueueSink<Observer: ObserverType>: ObserverBase<Observer.Element> {
...
init(scheduler: SerialDispatchQueueScheduler, observer: Observer, cancel: Cancelable) {
self.scheduler = scheduler
self.observer = observer
self.cancel = cancel
super.init()
//默认初始化cachedScheduleLambda
self.cachedScheduleLambda = { pair in
...
}
}
...
}
根据RxSwift核心逻辑,来到ObserveOnSerialDispatchQueue
的run
函数,run
函数创建了ObserveOnSerialDispatchQueueSink
(业务下沉),ObserveOnSerialDispatchQueueSink
创建时默认初始化了self.cachedScheduleLambda
。然后由保存的self.source
源序列进行subscribe
。
- 继续根据核心逻辑就会来到
ObserveOnSerialDispatchQueueSink
的onCore
函数:
final private class ObserveOnSerialDispatchQueueSink<Observer: ObserverType>: ObserverBase<Observer.Element> {
...
override func onCore(_ event: Event<Element>) {
_ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
}
...
}
- 在这里保存的
self.scheduler
调度者开始执行操作。
- 接着一步步走到
scheduleInternal
函数:
public class SerialDispatchQueueScheduler : SchedulerType {
...
public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.scheduleInternal(state, action: action)
}
//不是它
func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.configuration.schedule(state, action: action)
}
...
}
- 在这里要注意,调用的是
MainScheduler
的scheduleInternal
函数,千万不要被骗了:
public final class MainScheduler : SerialDispatchQueueScheduler {
...
override func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let previousNumberEnqueued = increment(self.numberEnqueued)
if DispatchQueue.isMain && previousNumberEnqueued == 0 {
let disposable = action(state)
decrement(self.numberEnqueued)
return disposable
}
let cancel = SingleAssignmentDisposable()
self._mainQueue.async {
if !cancel.isDisposed {
_ = action(state)
}
decrement(self.numberEnqueued)
}
return cancel
}
}
- 但是不管哪个调度者,最后都会在相应的队列调用传过来的
action
:
而action
便是ObserveOnSerialDispatchQueueSink
初始化保存的self.cachedScheduleLambda
闭包:
final private class ObserveOnSerialDispatchQueueSink<Observer: ObserverType>: ObserverBase<Observer.Element> {
...
init(scheduler: SerialDispatchQueueScheduler, observer: Observer, cancel: Cancelable) {
self.scheduler = scheduler
self.observer = observer
self.cancel = cancel
super.init()
self.cachedScheduleLambda = { pair in
guard !cancel.isDisposed else { return Disposables.create() }
pair.sink.observer.on(pair.event)
if pair.event.isStopEvent {
pair.sink.dispose()
}
return Disposables.create()
}
}
...
}
- 最终调用了
pair.sink.observer.on(pair.event)
,这语法让人看得有点懵,慢慢一一对应:
pair
原来是元组,pair.sink.observer
就是保存的self.observer
,所以
pair.sink.observer.on
=>
self.observer.on
=>
AnonymousObserver.on
。 - 最后的最后根据RxSwift核心逻辑,便会调用外面的响应闭包:
.subscribe(onNext: { (num) in
print(num)
})
其实调度者封装的GCD就是这么简单,非常好的帮我们控制在相应的队列执行任务。
网友评论