美文网首页
Observable创建

Observable创建

作者: silasjs | 来源:发表于2019-08-09 10:12 被阅读0次

Observable创建

empty

demo

首先来一个空的序列 - 本来序列事件是Int类型的,这里调用emty函数 没有序列,只能complete

let emtyOb = Observable<Int>.empty()
_ = emtyOb.subscribe(onNext: { (number) in
        print("订阅:",number)
    }, onError: { (error) in
        print("error:",error)
    }, onCompleted: {
        print("完成回调")
    }) {
        print("释放回调")
    }

解析

内部实现很简单,就是返回一个EmptyProducer空序列,订阅的直接.completed

extension ObservableType {
    public static func empty() -> Observable<E> {
        return EmptyProducer<E>()
    }
}

final private class EmptyProducer<Element>: Producer<Element> {
    override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        observer.on(.completed)
        return Disposables.create()
    }
}

just

不使用调度者

单个信号序列创建

demo

let array = ["ooci","ody"]
Observable<[String]>.just(array)
    .subscribe { (event) in
        print(event)
    }, onError: { (error) in
        print("error:",error)
    }, onCompleted: {
        print("完成回调")
    }) {
        print("释放回调")
    }.disposed(by: disposeBag)

解析

创建一个Just序列,保存_element。订阅后直接发出.next.completed

extension ObservableType {
    public static func just(_ element: E) -> Observable<E> {
        return Just(element: element)
    }
}

final private class Just<Element>: Producer<Element> {
    private let _element: Element
    
    init(element: Element) {
        self._element = element
    }
    
    override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        observer.on(.next(self._element))
        observer.on(.completed)
        return Disposables.create()
    }
}

使用调度者

同样是单个信号序列创建,这个可以指定所执行的线程。

demo

let _ = Observable.just(["slg", "wxk"], scheduler: SerialDispatchQueueScheduler.init(internalSerialQueueName: "slg_queue"))
    .subscribe(onNext: { (str) in
        print(str, Thread.current)
    }, onError: { (error) in
        print(error)
    }, onCompleted: {
        print("over")
    }) {
        print("death")
}

解析

创建了JustScheduled序列,保存调度者和元素。订阅时在指定的线程中发出.next,然后马上又在指定的那个线程中发出.completed,最后销毁。

extension ObservableType {
    public static func just(_ element: Element, scheduler: ImmediateSchedulerType) -> Observable<Element> {
        return JustScheduled(element: element, scheduler: scheduler)
    }
}

final private class JustScheduled<Element>: Producer<Element> {
    fileprivate let _scheduler: ImmediateSchedulerType
    fileprivate let _element: Element

    init(element: Element, scheduler: ImmediateSchedulerType) {
        self._scheduler = scheduler
        self._element = element
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = JustScheduledSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

final private class JustScheduledSink<Observer: ObserverType>: Sink<Observer> {
    typealias Parent = JustScheduled<Observer.Element>

    private let _parent: Parent

    init(parent: Parent, observer: Observer, cancel: Cancelable) {
        self._parent = parent
        super.init(observer: observer, cancel: cancel)
    }

    func run() -> Disposable {
        let scheduler = self._parent._scheduler
        return scheduler.schedule(self._parent._element) { element in
            self.forwardOn(.next(element))
            return scheduler.schedule(()) { _ in
                self.forwardOn(.completed)
                self.dispose()
                return Disposables.create()
            }
        }
    }
}

of 和 from

offrom其实差不多,内部都是用的ObservableSequence序列。

of

多个元素 - 针对序列处理

demo

Observable<String>.of("LG_Cooci","LG_Kody")
    .subscribe { (event) in
        print(event)
    }.disposed(by: disposeBag)

// 字典
Observable<[String: Any]>.of(["name":"LG_Cooci","age":18])
    .subscribe { (event) in
        print(event)
    }.disposed(by: disposeBag)

// 数组
Observable<[String]>.of(["LG_Cooci","LG_Kody"])
    .subscribe { (event) in
        print(event)
    }.disposed(by: disposeBag)

解析

内部返回一个ObservableSequence序列。

extension ObservableType {
    public static func of(_ elements: E ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
        return ObservableSequence(elements: elements, scheduler: scheduler)
    }
}

of 和 from 的核心源码解析

final private class ObservableSequenceSink<S: Sequence, O: ObserverType>: Sink<O> where S.Iterator.Element == O.E {
    typealias Parent = ObservableSequence<S>

    private let _parent: Parent

    init(parent: Parent, observer: O, cancel: Cancelable) {
        self._parent = parent
        super.init(observer: observer, cancel: cancel)
    }

    func run() -> Disposable {
        return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
            var mutableIterator = iterator
            if let next = mutableIterator.next() {
                self.forwardOn(.next(next))
                recurse(mutableIterator)
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }
}

final private class ObservableSequence<S: Sequence>: Producer<S.Iterator.Element> {
    fileprivate let _elements: S
    fileprivate let _scheduler: ImmediateSchedulerType

    init(elements: S, scheduler: ImmediateSchedulerType) {
        self._elements = elements
        self._scheduler = scheduler
    }

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
        let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

ObservableSequence序列保存了元素和调度者。订阅时,在指定的线程中执行,回调中用迭代器循环发出.next,最后发出.completed

from

demo

从集合中获取序列:数组,集合,set 获取序列 - 有可选项处理 - 更安全

Observable<[String]>.from(optional: ["LG_Cooci","LG_Kody"])
    .subscribe { (event) in
        print(event)
    }.disposed(by: disposeBag)

解析

extension ObservableType {
    public static func from(_ array: [E], scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
        return ObservableSequence(elements: array, scheduler: scheduler)
    }
    public static func from<S: Sequence>(_ sequence: S, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> where S.Iterator.Element == E {
        return ObservableSequence(elements: sequence, scheduler: scheduler)
    }
}

defer

demo

这里有一个需求:动态序列 - 根据外界的标识 - 动态输出

使用deferred()方法延迟Observable序列的初始化,通过传入的block来实现Observable序列的初始化并且返回。

var isOdd = true
_ = Observable<Int>.deferred { () -> Observable<Int> in
        // 这里设计我们的序列
        isOdd = !isOdd
        if isOdd {
            return Observable.of(1,3,5,7,9)
        }
        return Observable.of(0,2,4,6,8)
    }
    .subscribe { (event) in
        print(event)
    }

解析

内部创建了Deferred序列,保存了初始化中的闭包。在订阅时,先回调初始化里的闭包,获取到闭包的返回值(真正要使用的序列)后。用这个返回值序列去订阅,并把自己当做这个序列的观察者,等这个序列发出信号的时候就会传到defferred序列的on函数中,然后给外界做出响应。

extension ObservableType {
    public static func deferred(_ observableFactory: @escaping () throws -> Observable<E>)
        -> Observable<E> {
        return Deferred(observableFactory: observableFactory)
    }
}

final private class DeferredSink<S: ObservableType, O: ObserverType>: Sink<O>, ObserverType where S.E == O.E {
    typealias E = O.E

    private let _observableFactory: () throws -> S

    init(observableFactory: @escaping () throws -> S, observer: O, cancel: Cancelable) {
        self._observableFactory = observableFactory
        super.init(observer: observer, cancel: cancel)
    }
    
    func run() -> Disposable {
        do {
            let result = try self._observableFactory()
            return result.subscribe(self)
        }
        catch let e {
            self.forwardOn(.error(e))
            self.dispose()
            return Disposables.create()
        }
    }
    
    func on(_ event: Event<E>) {
        self.forwardOn(event)
        
        switch event {
        case .next:
            break
        case .error:
            self.dispose()
        case .completed:
            self.dispose()
        }
    }
}

final private class Deferred<S: ObservableType>: Producer<S.E> {
    typealias Factory = () throws -> S
    
    private let _observableFactory : Factory
    
    init(observableFactory: @escaping Factory) {
        self._observableFactory = observableFactory
    }
    
    override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == S.E {
        let sink = DeferredSink(observableFactory: self._observableFactory, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

range

demo

生成指定范围内的可观察整数序列。

Observable.range(start: 2, count: 5)
    .subscribe { (event) in
        print(event)
    }.disposed(by: disposeBag)

解析

内部创建RangeProducer,保存外界传入的startcount,还有调度者。订阅时会在指定线程中递归调用,回调时根据count来判定发出next信号还是completed信号。

extension ObservableType where E : RxAbstractInteger {
    public static func range(start: E, count: E, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
        return RangeProducer<E>(start: start, count: count, scheduler: scheduler)
    }
}

final private class RangeProducer<E: RxAbstractInteger>: Producer<E> {
    fileprivate let _start: E
    fileprivate let _count: E
    fileprivate let _scheduler: ImmediateSchedulerType

    init(start: E, count: E, scheduler: ImmediateSchedulerType) {
        guard count >= 0 else {
            rxFatalError("count can't be negative")
        }

        guard start &+ (count - 1) >= start || count == 0 else {
            rxFatalError("overflow of count")
        }

        self._start = start
        self._count = count
        self._scheduler = scheduler
    }
    
    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
        let sink = RangeSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

final private class RangeSink<O: ObserverType>: Sink<O> where O.E: RxAbstractInteger {
    typealias Parent = RangeProducer<O.E>
    
    private let _parent: Parent
    
    init(parent: Parent, observer: O, cancel: Cancelable) {
        self._parent = parent
        super.init(observer: observer, cancel: cancel)
    }
    
    func run() -> Disposable {
        return self._parent._scheduler.scheduleRecursive(0 as O.E) { i, recurse in
            if i < self._parent._count {
                self.forwardOn(.next(self._parent._start + i))
                recurse(i + 1)
            }
            else {
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }
}

generate

demo

  • 该方法创建一个只有当提供的所有的判断条件都为 true 的时候,才会给出动作的 Observable 序列。
  • 初始值给定 然后判断条件1 再判断条件2 会一直递归下去,直到条件1或者条件2不满足
  • 类似 数组遍历循环
let arr = ["LG_Cooci_1","LG_Cooci_2","LG_Cooci_3"]
Observable.generate(initialState: 0,// 初始值
    condition: { $0 < arr.count}, // 条件1
    iterate: { $0 + 1 })  // 条件2 +2
    .subscribe(onNext: {
        print("遍历arr:",arr[$0])
    })
    .disposed(by: disposeBag)

解析

range类似,也是保存了必需的参数,内部在指定线程中递归调用。这里只贴出源码:

extension ObservableType {
    public static func generate(initialState: E, condition: @escaping (E) throws -> Bool, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance, iterate: @escaping (E) throws -> E) -> Observable<E> {
        return Generate(initialState: initialState, condition: condition, iterate: iterate, resultSelector: { $0 }, scheduler: scheduler)
    }
}

final private class GenerateSink<S, O: ObserverType>: Sink<O> {
    typealias Parent = Generate<S, O.E>
    
    private let _parent: Parent
    
    private var _state: S
    
    init(parent: Parent, observer: O, cancel: Cancelable) {
        self._parent = parent
        self._state = parent._initialState
        super.init(observer: observer, cancel: cancel)
    }
    
    func run() -> Disposable {
        return self._parent._scheduler.scheduleRecursive(true) { isFirst, recurse -> Void in
            do {
                if !isFirst {
                    self._state = try self._parent._iterate(self._state)
                }
                
                if try self._parent._condition(self._state) {
                    let result = try self._parent._resultSelector(self._state)
                    self.forwardOn(.next(result))
                    
                    recurse(false)
                }
                else {
                    self.forwardOn(.completed)
                    self.dispose()
                }
            }
            catch let error {
                self.forwardOn(.error(error))
                self.dispose()
            }
        }
    }
}

final private class Generate<S, E>: Producer<E> {
    fileprivate let _initialState: S
    fileprivate let _condition: (S) throws -> Bool
    fileprivate let _iterate: (S) throws -> S
    fileprivate let _resultSelector: (S) throws -> E
    fileprivate let _scheduler: ImmediateSchedulerType
    
    init(initialState: S, condition: @escaping (S) throws -> Bool, iterate: @escaping (S) throws -> S, resultSelector: @escaping (S) throws -> E, scheduler: ImmediateSchedulerType) {
        self._initialState = initialState
        self._condition = condition
        self._iterate = iterate
        self._resultSelector = resultSelector
        self._scheduler = scheduler
        super.init()
    }
    
    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
        let sink = GenerateSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}

timer 和 interval 有其他的专篇

repeatElement

该方法创建一个可以无限发出给定元素的 Event的 Observable 序列(永不终止)

demo

Observable<Int>.repeatElement(5)
    .subscribe { (event) in
        // print("订阅:",event)
    }
    .disposed(by: disposeBag)

解析

内部创建了RepeatElement序列,最后一个函数中可以看出,会一直不停的发出这个元素的next信号。

extension ObservableType {
    public static func repeatElement(_ element: E, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
        return RepeatElement(element: element, scheduler: scheduler)
    }
}

final private class RepeatElement<Element>: Producer<Element> {
    fileprivate let _element: Element
    fileprivate let _scheduler: ImmediateSchedulerType
    
    init(element: Element, scheduler: ImmediateSchedulerType) {
        self._element = element
        self._scheduler = scheduler
    }
    
    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = RepeatElementSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()

        return (sink: sink, subscription: subscription)
    }
}

final private class RepeatElementSink<O: ObserverType>: Sink<O> {
    typealias Parent = RepeatElement<O.E>
    
    private let _parent: Parent
    
    init(parent: Parent, observer: O, cancel: Cancelable) {
        self._parent = parent
        super.init(observer: observer, cancel: cancel)
    }
    
    func run() -> Disposable {
        return self._parent._scheduler.scheduleRecursive(self._parent._element) { e, recurse in
            self.forwardOn(.next(e))
            recurse(e)
        }
    }
}

error

demo

对消费者发出一个错误信号

Observable<String>.error(NSError.init(domain: "lgerror", code: 10086, userInfo: ["reason":"unknow"]))
    .subscribe { (event) in
        print("订阅:",event)
    }
    .disposed(by: disposeBag)

解析

没啥好看的,直接发出error信号。

extension ObservableType {
    public static func error(_ error: Swift.Error) -> Observable<E> {
        return ErrorProducer(error: error)
    }
}

final private class ErrorProducer<Element>: Producer<Element> {
    private let _error: Swift.Error
    
    init(error: Swift.Error) {
        self._error = error
    }
    
    override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        observer.on(.error(self._error))
        return Disposables.create()
    }
}

never

demo

  • 该方法创建一个永远不会发出 Event(也不会终止)的 Observable 序列。
  • 这种类型的响应源 在测试或者在组合操作符中禁用确切的源非常有
Observable<String>.never()
    .subscribe { (event) in
        print("走你",event)
    }
    .disposed(by: disposeBag)

解析

一看就看明白了,这货什么都不做。

extension ObservableType {
    public static func never() -> Observable<E> {
        return NeverProducer()
    }
}

final private class NeverProducer<Element>: Producer<Element> {
    override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        return Disposables.create()
    }
}

相关文章

网友评论

      本文标题:Observable创建

      本文链接:https://www.haomeiwen.com/subject/avrcjctx.html