Observable创建
empty
demo
首先来一个空的序列 - 本来序列事件是Int类型的,这里调用emty函数 没有序列,只能complete
let emtyOb = Observable<Int>.empty()
_ = emtyOb.subscribe(onNext: { (number) in
print("订阅:",number)
}, onError: { (error) in
print("error:",error)
}, onCompleted: {
print("完成回调")
}) {
print("释放回调")
}
解析
内部实现很简单,就是返回一个EmptyProducer
空序列,订阅的直接.completed
。
extension ObservableType {
public static func empty() -> Observable<E> {
return EmptyProducer<E>()
}
}
final private class EmptyProducer<Element>: Producer<Element> {
override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
observer.on(.completed)
return Disposables.create()
}
}
just
不使用调度者
单个信号序列创建
demo
let array = ["ooci","ody"]
Observable<[String]>.just(array)
.subscribe { (event) in
print(event)
}, onError: { (error) in
print("error:",error)
}, onCompleted: {
print("完成回调")
}) {
print("释放回调")
}.disposed(by: disposeBag)
解析
创建一个Just
序列,保存_element
。订阅后直接发出.next
和.completed
。
extension ObservableType {
public static func just(_ element: E) -> Observable<E> {
return Just(element: element)
}
}
final private class Just<Element>: Producer<Element> {
private let _element: Element
init(element: Element) {
self._element = element
}
override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
observer.on(.next(self._element))
observer.on(.completed)
return Disposables.create()
}
}
使用调度者
同样是单个信号序列创建,这个可以指定所执行的线程。
demo
let _ = Observable.just(["slg", "wxk"], scheduler: SerialDispatchQueueScheduler.init(internalSerialQueueName: "slg_queue"))
.subscribe(onNext: { (str) in
print(str, Thread.current)
}, onError: { (error) in
print(error)
}, onCompleted: {
print("over")
}) {
print("death")
}
解析
创建了JustScheduled
序列,保存调度者和元素。订阅时在指定的线程中发出.next
,然后马上又在指定的那个线程中发出.completed
,最后销毁。
extension ObservableType {
public static func just(_ element: Element, scheduler: ImmediateSchedulerType) -> Observable<Element> {
return JustScheduled(element: element, scheduler: scheduler)
}
}
final private class JustScheduled<Element>: Producer<Element> {
fileprivate let _scheduler: ImmediateSchedulerType
fileprivate let _element: Element
init(element: Element, scheduler: ImmediateSchedulerType) {
self._scheduler = scheduler
self._element = element
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = JustScheduledSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
final private class JustScheduledSink<Observer: ObserverType>: Sink<Observer> {
typealias Parent = JustScheduled<Observer.Element>
private let _parent: Parent
init(parent: Parent, observer: Observer, cancel: Cancelable) {
self._parent = parent
super.init(observer: observer, cancel: cancel)
}
func run() -> Disposable {
let scheduler = self._parent._scheduler
return scheduler.schedule(self._parent._element) { element in
self.forwardOn(.next(element))
return scheduler.schedule(()) { _ in
self.forwardOn(.completed)
self.dispose()
return Disposables.create()
}
}
}
}
of 和 from
of
和from
其实差不多,内部都是用的ObservableSequence
序列。
of
多个元素 - 针对序列处理
demo
Observable<String>.of("LG_Cooci","LG_Kody")
.subscribe { (event) in
print(event)
}.disposed(by: disposeBag)
// 字典
Observable<[String: Any]>.of(["name":"LG_Cooci","age":18])
.subscribe { (event) in
print(event)
}.disposed(by: disposeBag)
// 数组
Observable<[String]>.of(["LG_Cooci","LG_Kody"])
.subscribe { (event) in
print(event)
}.disposed(by: disposeBag)
解析
内部返回一个ObservableSequence
序列。
extension ObservableType {
public static func of(_ elements: E ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
return ObservableSequence(elements: elements, scheduler: scheduler)
}
}
of 和 from 的核心源码解析
final private class ObservableSequenceSink<S: Sequence, O: ObserverType>: Sink<O> where S.Iterator.Element == O.E {
typealias Parent = ObservableSequence<S>
private let _parent: Parent
init(parent: Parent, observer: O, cancel: Cancelable) {
self._parent = parent
super.init(observer: observer, cancel: cancel)
}
func run() -> Disposable {
return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
var mutableIterator = iterator
if let next = mutableIterator.next() {
self.forwardOn(.next(next))
recurse(mutableIterator)
}
else {
self.forwardOn(.completed)
self.dispose()
}
}
}
}
final private class ObservableSequence<S: Sequence>: Producer<S.Iterator.Element> {
fileprivate let _elements: S
fileprivate let _scheduler: ImmediateSchedulerType
init(elements: S, scheduler: ImmediateSchedulerType) {
self._elements = elements
self._scheduler = scheduler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
ObservableSequence
序列保存了元素和调度者。订阅时,在指定的线程中执行,回调中用迭代器循环发出.next
,最后发出.completed
。
from
demo
从集合中获取序列:数组,集合,set 获取序列 - 有可选项处理 - 更安全
Observable<[String]>.from(optional: ["LG_Cooci","LG_Kody"])
.subscribe { (event) in
print(event)
}.disposed(by: disposeBag)
解析
extension ObservableType {
public static func from(_ array: [E], scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
return ObservableSequence(elements: array, scheduler: scheduler)
}
public static func from<S: Sequence>(_ sequence: S, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> where S.Iterator.Element == E {
return ObservableSequence(elements: sequence, scheduler: scheduler)
}
}
defer
demo
这里有一个需求:动态序列 - 根据外界的标识 - 动态输出
使用deferred()方法延迟Observable序列的初始化,通过传入的block来实现Observable序列的初始化并且返回。
var isOdd = true
_ = Observable<Int>.deferred { () -> Observable<Int> in
// 这里设计我们的序列
isOdd = !isOdd
if isOdd {
return Observable.of(1,3,5,7,9)
}
return Observable.of(0,2,4,6,8)
}
.subscribe { (event) in
print(event)
}
解析
内部创建了Deferred
序列,保存了初始化中的闭包。在订阅时,先回调初始化里的闭包,获取到闭包的返回值(真正要使用的序列)后。用这个返回值序列去订阅,并把自己当做这个序列的观察者,等这个序列发出信号的时候就会传到defferred
序列的on
函数中,然后给外界做出响应。
extension ObservableType {
public static func deferred(_ observableFactory: @escaping () throws -> Observable<E>)
-> Observable<E> {
return Deferred(observableFactory: observableFactory)
}
}
final private class DeferredSink<S: ObservableType, O: ObserverType>: Sink<O>, ObserverType where S.E == O.E {
typealias E = O.E
private let _observableFactory: () throws -> S
init(observableFactory: @escaping () throws -> S, observer: O, cancel: Cancelable) {
self._observableFactory = observableFactory
super.init(observer: observer, cancel: cancel)
}
func run() -> Disposable {
do {
let result = try self._observableFactory()
return result.subscribe(self)
}
catch let e {
self.forwardOn(.error(e))
self.dispose()
return Disposables.create()
}
}
func on(_ event: Event<E>) {
self.forwardOn(event)
switch event {
case .next:
break
case .error:
self.dispose()
case .completed:
self.dispose()
}
}
}
final private class Deferred<S: ObservableType>: Producer<S.E> {
typealias Factory = () throws -> S
private let _observableFactory : Factory
init(observableFactory: @escaping Factory) {
self._observableFactory = observableFactory
}
override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == S.E {
let sink = DeferredSink(observableFactory: self._observableFactory, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
range
demo
生成指定范围内的可观察整数序列。
Observable.range(start: 2, count: 5)
.subscribe { (event) in
print(event)
}.disposed(by: disposeBag)
解析
内部创建RangeProducer
,保存外界传入的start
、count
,还有调度者。订阅时会在指定线程中递归调用,回调时根据count
来判定发出next
信号还是completed
信号。
extension ObservableType where E : RxAbstractInteger {
public static func range(start: E, count: E, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
return RangeProducer<E>(start: start, count: count, scheduler: scheduler)
}
}
final private class RangeProducer<E: RxAbstractInteger>: Producer<E> {
fileprivate let _start: E
fileprivate let _count: E
fileprivate let _scheduler: ImmediateSchedulerType
init(start: E, count: E, scheduler: ImmediateSchedulerType) {
guard count >= 0 else {
rxFatalError("count can't be negative")
}
guard start &+ (count - 1) >= start || count == 0 else {
rxFatalError("overflow of count")
}
self._start = start
self._count = count
self._scheduler = scheduler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
let sink = RangeSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
final private class RangeSink<O: ObserverType>: Sink<O> where O.E: RxAbstractInteger {
typealias Parent = RangeProducer<O.E>
private let _parent: Parent
init(parent: Parent, observer: O, cancel: Cancelable) {
self._parent = parent
super.init(observer: observer, cancel: cancel)
}
func run() -> Disposable {
return self._parent._scheduler.scheduleRecursive(0 as O.E) { i, recurse in
if i < self._parent._count {
self.forwardOn(.next(self._parent._start + i))
recurse(i + 1)
}
else {
self.forwardOn(.completed)
self.dispose()
}
}
}
}
generate
demo
- 该方法创建一个只有当提供的所有的判断条件都为 true 的时候,才会给出动作的 Observable 序列。
- 初始值给定 然后判断条件1 再判断条件2 会一直递归下去,直到条件1或者条件2不满足
- 类似 数组遍历循环
let arr = ["LG_Cooci_1","LG_Cooci_2","LG_Cooci_3"]
Observable.generate(initialState: 0,// 初始值
condition: { $0 < arr.count}, // 条件1
iterate: { $0 + 1 }) // 条件2 +2
.subscribe(onNext: {
print("遍历arr:",arr[$0])
})
.disposed(by: disposeBag)
解析
和range
类似,也是保存了必需的参数,内部在指定线程中递归调用。这里只贴出源码:
extension ObservableType {
public static func generate(initialState: E, condition: @escaping (E) throws -> Bool, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance, iterate: @escaping (E) throws -> E) -> Observable<E> {
return Generate(initialState: initialState, condition: condition, iterate: iterate, resultSelector: { $0 }, scheduler: scheduler)
}
}
final private class GenerateSink<S, O: ObserverType>: Sink<O> {
typealias Parent = Generate<S, O.E>
private let _parent: Parent
private var _state: S
init(parent: Parent, observer: O, cancel: Cancelable) {
self._parent = parent
self._state = parent._initialState
super.init(observer: observer, cancel: cancel)
}
func run() -> Disposable {
return self._parent._scheduler.scheduleRecursive(true) { isFirst, recurse -> Void in
do {
if !isFirst {
self._state = try self._parent._iterate(self._state)
}
if try self._parent._condition(self._state) {
let result = try self._parent._resultSelector(self._state)
self.forwardOn(.next(result))
recurse(false)
}
else {
self.forwardOn(.completed)
self.dispose()
}
}
catch let error {
self.forwardOn(.error(error))
self.dispose()
}
}
}
}
final private class Generate<S, E>: Producer<E> {
fileprivate let _initialState: S
fileprivate let _condition: (S) throws -> Bool
fileprivate let _iterate: (S) throws -> S
fileprivate let _resultSelector: (S) throws -> E
fileprivate let _scheduler: ImmediateSchedulerType
init(initialState: S, condition: @escaping (S) throws -> Bool, iterate: @escaping (S) throws -> S, resultSelector: @escaping (S) throws -> E, scheduler: ImmediateSchedulerType) {
self._initialState = initialState
self._condition = condition
self._iterate = iterate
self._resultSelector = resultSelector
self._scheduler = scheduler
super.init()
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
let sink = GenerateSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
timer 和 interval 有其他的专篇
repeatElement
该方法创建一个可以无限发出给定元素的 Event的 Observable 序列(永不终止)
demo
Observable<Int>.repeatElement(5)
.subscribe { (event) in
// print("订阅:",event)
}
.disposed(by: disposeBag)
解析
内部创建了RepeatElement
序列,最后一个函数中可以看出,会一直不停的发出这个元素的next
信号。
extension ObservableType {
public static func repeatElement(_ element: E, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<E> {
return RepeatElement(element: element, scheduler: scheduler)
}
}
final private class RepeatElement<Element>: Producer<Element> {
fileprivate let _element: Element
fileprivate let _scheduler: ImmediateSchedulerType
init(element: Element, scheduler: ImmediateSchedulerType) {
self._element = element
self._scheduler = scheduler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = RepeatElementSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
final private class RepeatElementSink<O: ObserverType>: Sink<O> {
typealias Parent = RepeatElement<O.E>
private let _parent: Parent
init(parent: Parent, observer: O, cancel: Cancelable) {
self._parent = parent
super.init(observer: observer, cancel: cancel)
}
func run() -> Disposable {
return self._parent._scheduler.scheduleRecursive(self._parent._element) { e, recurse in
self.forwardOn(.next(e))
recurse(e)
}
}
}
error
demo
对消费者发出一个错误信号
Observable<String>.error(NSError.init(domain: "lgerror", code: 10086, userInfo: ["reason":"unknow"]))
.subscribe { (event) in
print("订阅:",event)
}
.disposed(by: disposeBag)
解析
没啥好看的,直接发出error
信号。
extension ObservableType {
public static func error(_ error: Swift.Error) -> Observable<E> {
return ErrorProducer(error: error)
}
}
final private class ErrorProducer<Element>: Producer<Element> {
private let _error: Swift.Error
init(error: Swift.Error) {
self._error = error
}
override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
observer.on(.error(self._error))
return Disposables.create()
}
}
never
demo
- 该方法创建一个永远不会发出 Event(也不会终止)的 Observable 序列。
- 这种类型的响应源 在测试或者在组合操作符中禁用确切的源非常有
Observable<String>.never()
.subscribe { (event) in
print("走你",event)
}
.disposed(by: disposeBag)
解析
一看就看明白了,这货什么都不做。
extension ObservableType {
public static func never() -> Observable<E> {
return NeverProducer()
}
}
final private class NeverProducer<Element>: Producer<Element> {
override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
return Disposables.create()
}
}
网友评论