let ob = Observable.of(1,2,3,4,5)
ob.observeOn(ConcurrentDispatchQueueScheduler.init(qos: .background))
.subscribe {
print("observeOn",$0,Thread.current)
}
.disposed(by: disposeBag)
步骤1、代码改写
1.1 ob -> ObservableSequence<Array<Int>>
1.2 obOn -> ObserveOn<Int>
1.3 obOnSubscribe -> SinkDisposer
let ob = Observable.of(1,2,3,4,5)
let obOn = ob.observeOn(ConcurrentDispatchQueueScheduler.init(qos: .background))
let obOnSubscribe = obOn.subscribe {
print("observeOn",$0,Thread.current)
}
let obOnSubscribeDisposed = obOnSubscribe.disposed(by: disposeBag)
步骤2、点击observeOn
2.1 ObservableSequence继承Producer,所以点击observeOn来到ObservableType协议的observeOn方法,如下
2.2 scheduler -> ConcurrentDispatchQueueScheduler
2.3 self.asObservable() -> ObservableSequence<Array<Int>>
extension ObservableType {
public func observeOn(_ scheduler: ImmediateSchedulerType)
-> Observable<Element> {
if let scheduler = scheduler as? SerialDispatchQueueScheduler {
return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
}
else {
return ObserveOn(source: self.asObservable(), scheduler: scheduler)
}
}
}
步骤3、点击ObserveOn(source: self.asObservable(), scheduler: scheduler)
3.1 ObserveOn 继承 Producer
3.2 初始化 ObserveOn
3.3 self.scheduler = scheduler 保存传进来的 ConcurrentDispatchQueueScheduler 2.2
3.4 self.source = source 保存传进来的 self.asObservable() 即 ObservableSequence<Array<Int>> 2.3
final private class ObserveOn<Element>: Producer<Element> {
let scheduler: ImmediateSchedulerType
let source: Observable<Element>
init(source: Observable<Element>, scheduler: ImmediateSchedulerType) {
self.scheduler = scheduler
self.source = source
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = ObserveOnSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
let subscription = self.source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
#if TRACE_RESOURCES
deinit {
_ = Resources.decrementTotal()
}
#endif
}
步骤4、外界.subscribe(on) 即ObserveOn.subscribe(on)
4.1 保存AnonymousObserver
extension ObservableType {
public func subscribe(_ on: @escaping (Event<Element>) -> Void)
-> Disposable {
let observer = AnonymousObserver { e in
on(e)
}
return self.asObservable().subscribe(observer)
}
}
步骤5、 来到Producer 的 subscribe方法
5.1 CurrentThreadScheduler.isScheduleRequired 为 true,来到else ,执行CurrentThreadScheduler.instance.schedule 来到步骤6
5.2 去到步骤3中的run方法,ObservableSequence调用subscribe,把ObserveOnSink作为观察者传入进去,来到5.3
5.2 进入!CurrentThreadScheduler.isScheduleRequired 中,此时的self是 ObservableSequence
class Producer<Element> : Observable<Element> {
override init() {
super.init()
}
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
rxAbstractMethod()
}
}
步骤6 点击 schedule
6.1 schedule 方法
6.1.1 CurrentThreadScheduler.isScheduleRequired = false 置为false
6.1.2 let disposable = action(state) 执行闭包 回到5.1
public class CurrentThreadScheduler : ImmediateSchedulerType {
typealias ScheduleQueue = RxMutableBox<Queue<ScheduledItemType>>
/// The singleton instance of the current thread scheduler.
public static let instance = CurrentThreadScheduler()
private static var isScheduleRequiredKey: pthread_key_t = { () -> pthread_key_t in
let key = UnsafeMutablePointer<pthread_key_t>.allocate(capacity: 1)
defer { key.deallocate() }
guard pthread_key_create(key, nil) == 0 else {
rxFatalError("isScheduleRequired key creation failed")
}
return key.pointee
}()
private static var scheduleInProgressSentinel: UnsafeRawPointer = { () -> UnsafeRawPointer in
return UnsafeRawPointer(UnsafeMutablePointer<Int>.allocate(capacity: 1))
}()
static var queue : ScheduleQueue? {
get {
return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance)
}
set {
Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance)
}
}
/// Gets a value that indicates whether the caller must call a `schedule` method.
public static fileprivate(set) var isScheduleRequired: Bool {
get {
return pthread_getspecific(CurrentThreadScheduler.isScheduleRequiredKey) == nil
}
set(isScheduleRequired) {
if pthread_setspecific(CurrentThreadScheduler.isScheduleRequiredKey, isScheduleRequired ? nil : scheduleInProgressSentinel) != 0 {
rxFatalError("pthread_setspecific failed")
}
}
}
public func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
if CurrentThreadScheduler.isScheduleRequired {
CurrentThreadScheduler.isScheduleRequired = false
let disposable = action(state)
defer {
CurrentThreadScheduler.isScheduleRequired = true
CurrentThreadScheduler.queue = nil
}
guard let queue = CurrentThreadScheduler.queue else {
return disposable
}
while let latest = queue.value.dequeue() {
if latest.isDisposed {
continue
}
latest.invoke()
}
return disposable
}
let existingQueue = CurrentThreadScheduler.queue
let queue: RxMutableBox<Queue<ScheduledItemType>>
if let existingQueue = existingQueue {
queue = existingQueue
}
else {
queue = RxMutableBox(Queue<ScheduledItemType>(capacity: 1))
CurrentThreadScheduler.queue = queue
}
let scheduledItem = ScheduledItem(action: action, state: state)
queue.value.enqueue(scheduledItem)
return scheduledItem
}
}
步骤6 来到 ObservableSequence的run 方法
6.1 run方法中的observer -> <ObserveOnSink<AnonymousObserver<Int>>
final private class ObservableSequence<Sequence: Swift.Sequence>: Producer<Sequence.Element> {
fileprivate let _elements: Sequence
fileprivate let _scheduler: ImmediateSchedulerType
init(elements: Sequence, scheduler: ImmediateSchedulerType) {
self._elements = elements
self._scheduler = scheduler
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
步骤7 来到ObservableSequenceSink的 run方法
7.1 self._parent -> ObservableSequence
7.2 self._parent._scheduler -> CurrentThreadScheduler
7.3 self._parent._element -> (1,2,3,4,5)
7.4 CurrentThreadScheduler类继承ImmediateSchedulerType协议
7.4 self._parent._scheduler.scheduleRecursive -> CurrentThreadScheduler.scheduleRecursive 来到 步骤8
final private class ObservableSequenceSink<Sequence: Swift.Sequence, Observer: ObserverType>: Sink<Observer> where Sequence.Element == Observer.Element {
typealias Parent = ObservableSequence<Sequence>
private let _parent: Parent
init(parent: Parent, observer: Observer, cancel: Cancelable) {
self._parent = parent
super.init(observer: observer, cancel: cancel)
}
func run() -> Disposable {
return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
var mutableIterator = iterator
if let next = mutableIterator.next() {
self.forwardOn(.next(next))
recurse(mutableIterator)
}
else {
self.forwardOn(.completed)
self.dispose()
}
}
}
}
步骤8 点击scheduleRecursive
8.1 self -> CurrentThreadScheduler
8.2 recursiveScheduler -> RecursiveImmediateScheduler
8.3 初始化 RecursiveImmediateScheduler
8.4 执行 schedule 来到 步骤9
extension ImmediateSchedulerType {
public func scheduleRecursive<State>(_ state: State, action: @escaping (_ state: State, _ recurse: (State) -> Void) -> Void) -> Disposable {
let recursiveScheduler = RecursiveImmediateScheduler(action: action, scheduler: self)
recursiveScheduler.schedule(state)
return Disposables.create(with: recursiveScheduler.dispose)
}
}
步骤9 点击RecursiveImmediateScheduler
9.1 let d = self._scheduler.schedule(state)
,self._scheduler是CurrentThreadScheduler,即CurrentThreadScheduler执行schedule,来到步骤6.1
final class RecursiveImmediateScheduler<State> {
typealias Action = (_ state: State, _ recurse: (State) -> Void) -> Void
private var _lock = SpinLock()
private let _group = CompositeDisposable()
private var _action: Action?
private let _scheduler: ImmediateSchedulerType
init(action: @escaping Action, scheduler: ImmediateSchedulerType) {
self._action = action
self._scheduler = scheduler
}
func schedule(_ state: State) {
var scheduleState: ScheduleState = .initial
let d = self._scheduler.schedule(state) { state -> Disposable in
// best effort
if self._group.isDisposed {
return Disposables.create()
}
let action = self._lock.calculateLocked { () -> Action? in
switch scheduleState {
case let .added(removeKey):
self._group.remove(for: removeKey)
case .initial:
break
case .done:
break
}
scheduleState = .done
return self._action
}
if let action = action {
action(state, self.schedule)
}
return Disposables.create()
}
self._lock.performLocked {
switch scheduleState {
case .added:
rxFatalError("Invalid state")
case .initial:
if let removeKey = self._group.insert(d) {
scheduleState = .added(removeKey)
}
else {
scheduleState = .done
}
case .done:
break
}
}
}
func dispose() {
self._lock.performLocked {
self._action = nil
}
self._group.dispose()
}
}
步骤10 self.forwardOn(.next(next)) 此时的观察者是ObserveOnSink
10.1 来到 ObserveOnSink 的 onCore
10.2 self._scheduler -> ConcurrentDispatchQueueScheduler,ConcurrentDispatchQueueScheduler类继承SchedulerType协议,SchedulerType协议继承ImmediateSchedulerType协议
final private class ObserveOnSink<Observer: ObserverType>: ObserverBase<Observer.Element> {
typealias Element = Observer.Element
let _scheduler: ImmediateSchedulerType
var _lock = SpinLock()
let _observer: Observer
// state
var _state = ObserveOnState.stopped
var _queue = Queue<Event<Element>>(capacity: 10)
let _scheduleDisposable = SerialDisposable()
let _cancel: Cancelable
init(scheduler: ImmediateSchedulerType, observer: Observer, cancel: Cancelable) {
self._scheduler = scheduler
self._observer = observer
self._cancel = cancel
}
override func onCore(_ event: Event<Element>) {
let shouldStart = self._lock.calculateLocked { () -> Bool in
self._queue.enqueue(event)
switch self._state {
case .stopped:
self._state = .running
return true
case .running:
return false
}
}
if shouldStart {
self._scheduleDisposable.disposable = self._scheduler.scheduleRecursive((), action: self.run)
}
}
func run(_ state: (), _ recurse: (()) -> Void) {
let (nextEvent, observer) = self._lock.calculateLocked { () -> (Event<Element>?, Observer) in
if !self._queue.isEmpty {
return (self._queue.dequeue(), self._observer)
}
else {
self._state = .stopped
return (nil, self._observer)
}
}
if let nextEvent = nextEvent, !self._cancel.isDisposed {
observer.on(nextEvent)
if nextEvent.isStopEvent {
self.dispose()
}
}
else {
return
}
let shouldContinue = self._shouldContinue_synchronized()
if shouldContinue {
recurse(())
}
}
func _shouldContinue_synchronized() -> Bool {
self._lock.lock(); defer { self._lock.unlock() } // {
if !self._queue.isEmpty {
return true
}
else {
self._state = .stopped
return false
}
// }
}
override func dispose() {
super.dispose()
self._cancel.dispose()
self._scheduleDisposable.dispose()
}
}
网友评论