// 1:创建序列
// AnonymousObservable -> producer.subscriber -> run
// 保存闭包 - 函数式 保存 _subscribeHandler
//
Observable<Any>.create(<#T##subscribe: (AnyObserver<Any>) -> Disposable##(AnyObserver<Any>) -> Disposable#>)
let ob = Observable<Any>.create { (obserber) -> Disposable in
// 3:发送信号
obserber.onNext("框架班级")
obserber.onCompleted()
// obserber.onError(NSError.init(domain: "coocieeror", code: 10087, userInfo: nil))
return Disposables.create()
}

// 2:订阅信号
// AnonymousObserver - event .next -> onNext()
// _eventHandler
// AnonymousObservable._subscribeHandler(observer)
// 销毁
let _ = ob.subscribe(onNext: { (text) in
print("订阅到:\(text)")
}, onError: { (error) in
print("error: \(error)")
}, onCompleted: {
print("完成")
}) {
print("销毁")
}
/** Creates an observable sequence from a specified subscribe method implementation.
- seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)
- parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
- returns: The observable sequence with the specified implementation for the `subscribe` method.
*/
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
通过AnonymousObservable
创建一个可观察队列,将闭包保存起来
public enum Event<Element> {
/// Next element is produced.
case next(Element)
/// Sequence terminated with an error.
case error(Swift.Error)
/// Sequence completed successfully.
case completed
}
Event
枚举三种情况
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
let observer = AnonymousObserver<E> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
}
subscribe
调用的时候根据三种情况进行回调。
最后又在return
的时候调用subscribe(observer)
public func onNext(_ element: E) {
self.on(.next(element))
}
func on(_ event: Event<Element>) {
self._sink(self, event)
if event.isStopEvent {
self._cancel.dispose()
}
}
在调用onNext
时候又会调用闭包

网友评论