目录
- 简介
- 操作符列表
- 如何选择操作符?
简介
操作符(Operator
)可以帮助大家创建新的序列,或者变化组合原有的序列,从而生成一个新的序列。
使用操作符是非常容易的。你可以直接调用实例方法,或者静态方法。
操作符列表
- amb
当你传入多个 Observables
到 amb
操作符时,它将取其中一个 Observable
:第一个产生事件的那个 Observable
,可以是一个 next
,error
或者 completed
事件。 amb
将忽略掉其他的 Observables
,示例代码如下:
func test() {
let disposeBag = DisposeBag()
let a = Observable<String>.just("A")
let b = Observable<String>.just("B")
let c = Observable<String>.just("C")
Observable<String>.amb([a, b, c]).subscribe { (e) in
switch e {
case .next(let element):
print(element)
case .error(let error):
print(error)
case .completed:
print("completed")
}
}.disposed(by: disposeBag)
}
打印结果:
A
completed
- buffer
buffer
方法作用是缓冲组合,第一个参数是缓冲时间,第二个参数是缓冲个数,第三个参数是线程。
public func buffer(timeSpan: RxSwift.RxTimeInterval, count: Int, scheduler: RxSwift.SchedulerType) -> RxSwift.Observable<[Self.Element]>
该方法简单来说就是缓存 Observable
中发出的新元素,当元素达到某个数量,或者经过了特定的时间,它就会将这个元素集合发送出来,示例代码如下:
func test() {
let disposeBag = DisposeBag()
let publicSubject = PublishSubject<String>()
publicSubject.buffer(timeSpan: DispatchTimeInterval.seconds(1), count: 3, scheduler: MainScheduler.instance).subscribe { (e) in
switch e {
case .next(let element):
print(element)
case .error(let error):
print(error)
case .completed:
print("completed")
}
}.disposed(by: disposeBag)
publicSubject.onNext("A")
publicSubject.onNext("B")
publicSubject.onNext("C")
publicSubject.onNext("D")
publicSubject.onNext("E")
publicSubject.onNext("F")
publicSubject.onNext("G")
}
打印结果:
["A", "B", "C"]
["D", "E", "F"]
- catchError
catchError
会在要发生 onError
事件时将其拦截,并使用备用的序列替代它,再次执行操作,下面给出两段示例代码,如下:
eg1
:
enum MyError: Error {
case errorA
case errorB
var errorType: String {
switch self {
case .errorA:
return "I am error A"
case .errorB:
return "I am error B"
}
}
}
func test() {
let disposeBag = DisposeBag()
let errorObservable = Observable<Int>.error(MyError.errorA)
let intObservable = Observable<Int>.just(666)
errorObservable.catchError({ error in
print("catch:", error)
return intObservable
}).subscribe { event in
switch event {
case .next(let element):
print("element:", element)
case .error(let error):
print("error:", error)
case .completed:
print("completed")
}}.disposed(by: disposeBag)
}
打印结果:
catch: errorA
element: 666
completed
eg2
:
enum MyError: Error {
case errorA
case errorB
var errorType: String {
switch self {
case .errorA:
return "I am error A"
case .errorB:
return "I am error B"
}
}
}
func test() {
let disposeBag = DisposeBag()
let sequenceThatFails = PublishSubject<String>()
let recoverySequence = PublishSubject<String>()
sequenceThatFails.catchError { (error) -> Observable<String> in
print(error)
return recoverySequence
}.subscribe { (e) in
print(e.element ?? "")
}.disposed(by: disposeBag)
sequenceThatFails.onNext("A")
sequenceThatFails.onNext("B")
sequenceThatFails.onNext("C")
sequenceThatFails.onError(MyError.errorA)
recoverySequence.onNext("D")
}
打印结果:
A
B
C
errorA
D
- combineLatest
combineLatest
操作符将多个 Observables
中最新的元素通过一个函数组合起来,然后将这个组合的结果发出来。这些源 Observables
中任何一个发出一个元素,他都会发出一个元素(前提是,这些 Observables
曾经发出过元素),示例代码如下:
func test() {
let disposeBag = DisposeBag()
let pb1 = PublishSubject<String>()
let pb2 = PublishSubject<String>()
Observable<String>.combineLatest(pb1, pb2) { (element1, element2) -> String in
return "\(element1) --- \(element2)"
}.subscribe { (e) in
switch e {
case .next(let element):
print("结果:", element)
case .error(let error):
print("error:", error)
case .completed:
print("completed")
}
}.disposed(by: disposeBag)
pb1.onNext("1")
pb2.onNext("A")
pb1.onNext("2")
pb1.onNext("3")
pb2.onNext("B")
}
打印结果:
结果: 1 --- A
结果: 2 --- A
结果: 3 --- A
结果: 3 --- B
注:如果有一个源 Observable
没有发出,就会继承上次的值,具体见demo
打印结果。
- concat
concat
操作符将多个 Observables
按顺序串联起来,当前一个 Observable
元素发送完毕后,后一个 Observable
才可以开始发出元素。
concat
将等待前一个 Observable
产生完成事件后,才对后一个 Observable
进行订阅。如果后一个是“热” Observable
,在它前一个 Observable
产生完成事件前,所产生的元素将不会被发送出来,示例代码如下:
func test() {
let disposeBag = DisposeBag()
let pb1 = PublishSubject<String>()
let pb2 = PublishSubject<String>()
pb1.concat(pb2).subscribe { (e) in
switch e {
case .next(let element):
print(element)
case .error(let error):
print("error:", error)
case .completed:
print("completed")
}
}.disposed(by: disposeBag)
pb1.onNext("A")
pb1.onNext("B")
pb2.onNext("C")
pb1.onCompleted()
pb2.onNext("D")
}
打印结果:
A
B
D
注:pb1
发出了A
和 B
后,pb2
发出了C
,但是这个时候 pb1
调用了 completed
事件,所以 pb2
发出的 C
就会被忽略了,所以只有在 pb1
发出 completed
事件后,pb2
的事件才不会被忽略。
- concatMap
concatMap
操作符将源 Observable
的每一个元素应用一个转换方法,将他们转换成 Observables
。然后让这些 Observables
按顺序的发出元素,当前一个 Observable
元素发送完毕后,后一个 Observable
才可以开始发出元素。等待前一个 Observable
产生完成事件后,才对后一个 Observable
进行订阅。这个操作符平时很少用到,这里就用官网的例子吧,示例代码如下:
let disposeBag = DisposeBag()
let subject1 = BehaviorSubject(value: "🍎")
let subject2 = BehaviorSubject(value: "🐶")
let variable = Variable(subject1)
variable.asObservable()
.concatMap { $0 }
.subscribe { print($0) }
.disposed(by: disposeBag)
subject1.onNext("🍐")
subject1.onNext("🍊")
variable.value = subject2
subject2.onNext("I would be ignored")
subject2.onNext("🐱")
subject1.onCompleted()
subject2.onNext("🐭")
打印结果:
next(🍎)
next(🍐)
next(🍊)
next(🐱)
next(🐭)
- connect 和 publish
publish
要和 connect
一起使用才能有效果。当一个被观察者被 publish
后,它被订阅也是不能收到消息的,只有它被 connect
操作符连接后,才能收到数据,示例代码如下:
func test() {
let disposeBag = DisposeBag()
let observable = Observable<String>.just("A").publish()
observable.subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
// 等2s后开始连接
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
print("开始连接")
observable.connect().disposed(by: disposeBag)
}
}
打印结果:
开始连接
A
- create
create
操作符将创建一个 Observable
,你需要提供一个构建函数,在构建函数里面描述事件(next
,error
,completed
)的产生过程。
通常情况下一个有限的序列,只会调用一次观察者的 onCompleted
或者 onError
方法。并且在调用它们后,不会再去调用观察者的其他方法,示例代码如下:
func test() {
let disposeBag = DisposeBag()
let observable = Observable<String>.create { (observer) -> Disposable in
observer.onNext("A")
observer.onNext("B")
observer.onNext("C")
observer.onCompleted()
return Disposables.create()
}
observable.subscribe(onNext: { (e) in
print(e)
}, onCompleted: {
print("onCompleted")
}).disposed(by: disposeBag)
}
打印结果:
A
B
C
onCompleted
- debounce
在指定的时间内,只接受最新的数据,示例代码如下:
func test() {
let disposeBag = DisposeBag()
let publishSubject = PublishSubject<String>()
publishSubject.debounce(DispatchTimeInterval.seconds(2), scheduler: MainScheduler.instance).subscribe { (e) in
print(e)
}.disposed(by: disposeBag)
publishSubject.onNext("A")
publishSubject.onNext("B")
publishSubject.onNext("C")
publishSubject.onNext("D")
}
预期打印结果:
next(D)
注:这个 disposeBag
必须是全局的,要不 debounce
没有效果。
- debug
打印所有的订阅,事件以及销毁信息,示例代码如下:
func test() {
let disposeBag = DisposeBag()
let observable = Observable<String>.create { observer in
observer.onNext("A")
observer.onNext("B")
observer.onCompleted()
return Disposables.create()
}
observable.debug("打印字符").subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
}
打印结果:
2019-10-10 18:01:54.294: 打印字符 -> subscribed
2019-10-10 18:01:54.296: 打印字符 -> Event next(A)
A
2019-10-10 18:01:54.297: 打印字符 -> Event next(B)
B
2019-10-10 18:01:54.297: 打印字符 -> Event completed
2019-10-10 18:01:54.297: 打印字符 -> isDisposed
- deferred
直到订阅发生的时候,才会创建序列,起到一个延迟创建的作用,看似订阅的都是同一个 Observable
,其实每次都是订阅了一个新的 Observable
,示例代码如下:
func test() {
let disposeBag = DisposeBag()
let observable = Observable<Int>.deferred { () -> Observable<Int> in
let ob = Observable<Int>.create { (observer) -> Disposable in
observer.onNext(1)
observer.onNext(2)
observer.onCompleted()
return Disposables.create()
}
return ob
}
observable.subscribe(onNext: { (e) in
print(e)
}, onCompleted: {
print("onCompleted")
}).disposed(by: disposeBag)
}
打印结果:
1
2
onCompleted
- delay
delay
操作符将修改一个 Observable
,它会将 Observable
的所有元素都拖延一段设定好的时间, 然后才将它们发送出来,示例代码如下:
let disposeBag = DisposeBag()
func test() {
// 元素延迟2秒才发出
Observable.of(1, 5, 9).delay(RxTimeInterval.seconds(2), scheduler: MainScheduler.instance).subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
}
打印结果:
1
5
9
注:这个 disposeBag
必须是全局的,要不 delay
没有效果。
- delaySubscription
delaySubscription
操作符将在经过所设定的时间后,才对 Observable
进行订阅操作,示例代码如下:
let disposeBag = DisposeBag()
func test() {
Observable<String>.of("A", "B", "C").delaySubscription(RxTimeInterval.seconds(3), scheduler: MainScheduler.instance).subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
}
打印结果:
A
B
C
注:这个 disposeBag
必须是全局的,要不 delaySubscription
没有效果。
- dematerialize 和 materialize
materialize
:是将序列产生的事件,转换成元素。通常,一个有限的 Observable
将产生零个或者多个 onNext
事件,然后产生一个 onCompleted
或者 onError
事件。materialize
操作符将 Observable
产生的这些事件全部转换成元素,然后发送出来。
dematerialize
: dematerialize
操作符将 materialize
转换后的元素还原。
materialize
使用相关示例代码如下:
func test() {
let disposeBag = DisposeBag()
Observable<String>.of("A", "B", "C").materialize().subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
}
打印结果:
next(A)
next(B)
next(C)
completed
dematerialize
使用相关示例代码如下:
func test() {
let disposeBag = DisposeBag()
Observable<String>.of("A", "B", "C").materialize().dematerialize().subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
}
打印结果:
A
B
C
- distinctUntilChanged
distinctUntilChanged
操作符将阻止 Observable
发出相同的元素。如果后一个元素和前一个元素是相同的,那么这个元素将不会被发出来。如果后一个元素和前一个元素不相同,那么这个元素才会被发出来。示例代码如下:
func test() {
let disposeBag = DisposeBag()
Observable.of("A", "B", "C", "C", "C", "D", "C").distinctUntilChanged().subscribe(onNext: { (e) in
print(e)}
).disposed(by: disposeBag)
}
打印结果:
A
B
C
D
C
- do
当 Observable
的某些事件产生时,你可以使用 do
操作符来注册一些回调操作。这些回调会被单独调用,它们会和 Observable
原本的回调分离,示例代码如下:
func test() {
Observable<[Int]>.of([1, 2, 3]).do(onNext: { element in
print("do onNext:" ,element)
}, onError: { error in
print("do error:", error)
}, onCompleted: {
print("do completed")
}, onSubscribe: {
print("do subscribe")
}, onSubscribed: {
print("do subscribed")
}, onDispose: {
print("do dispose")
})
.subscribe(onNext: { (element) in
print("onNext:", element)
}, onError: { (error) in
print("error:", error)
}, onCompleted: {
print("completed")
}) {
print("dispose")
}.disposed(by: disposeBag)
}
打印结果:
do subscribe
do subscribed
do onNext: [1, 2, 3]
onNext: [1, 2, 3]
do completed
completed
dispose
do dispose
- elementAt
elementAt
操作符将拉取 Observable
序列中指定索引数的元素,然后将它作为唯一的元素发出,示例代码如下:
func test() {
let disposeBag = DisposeBag()
Observable.of("A", "B", "C", "D", "E", "F").elementAt(3).subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
}
打印结果:
D
- empty
empty
操作符将创建一个 Observable
,这个 Observable
只有一个完成事件,示例代码如下:
func test() {
let disposeBag = DisposeBag()
let emptyObservable = Observable<Int>.empty()
emptyObservable.subscribe(onNext: { (element) in
print("onNext: \(element)")
}, onError: { (error) in
print("onError: \(error)")
}, onCompleted: {
print("onCompleted")
}).disposed(by: disposeBag)
}
打印结果:
onCompleted
- error
error
操作符将创建一个 Observable
,这个 Observable
只会产生一个 error
事件,示例代码如下:
enum MyError: Error {
case errorA
case errorB
var errorType: String {
switch self {
case .errorA:
return "I am error A"
case .errorB:
return "I am error B"
}
}
}
func test() {
let disposeBag = DisposeBag()
let emptyObservable = Observable<Int>.error(MyError.errorA)
emptyObservable.subscribe(onNext: { (element) in
print("onNext: \(element)")
}, onError: { (error) in
print("onError: \(error)")
}, onCompleted: {
print("onCompleted")
}).disposed(by: disposeBag)
}
打印结果:
onError: errorA
- filter
filter
操作符将通过你提供的判定方法过滤一个 Observable
,示例代码如下:
func test() {
let disposeBag = DisposeBag()
Observable.of(2, 30, 22, 5, 60, 1).filter({ (e) -> Bool in
return (e > 10)
}).subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
}
打印结果:
30
22
60
- flatMap
flatMap
操作符将源 Observable
的每一个元素应用一个转换方法,将他们转换成 Observables
。 然后将这些 Observables
的元素合并之后再发送出来,示例代码如下:
func test() {
let disposeBag = DisposeBag()
Observable<Int>.of(1, 2, 3, 4).flatMap { (e) -> Observable<String> in
return Observable<String>.just(">> \(e)")
}.subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
}
打印结果:
>> 1
>> 2
>> 3
>> 4
- flatMapLatest 和 flatMapFirst
flatMapLatest
操作符将源 Observable
的每一个元素应用一个转换方法,将他们转换成 Observables
。一旦转换出一个新的 Observable
,就只发出它的元素,旧的 Observables
的元素将被忽略掉,示例代码如下:
func test() {
let disposeBag = DisposeBag()
let first = BehaviorSubject(value: "A")
let second = BehaviorSubject(value: "1")
let behaviorRelay = BehaviorRelay<BehaviorSubject>(value: first)
behaviorRelay.asObservable().flatMapLatest { (subject) -> BehaviorSubject<String> in
return subject
}.subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
first.onNext("B")
behaviorRelay.accept(second)
second.onNext("2")
first.onNext("C")
}
打印结果:
A
B
1
2
注:flatMapFirst
只会接收最初的 value
事件,感觉没什么用,不过也给出一段示例代码吧。
func test() {
let disposeBag = DisposeBag()
let first = BehaviorSubject(value: "A")
let second = BehaviorSubject(value: "1")
let behaviorRelay = BehaviorRelay<BehaviorSubject>(value: first)
behaviorRelay.asObservable().flatMapFirst { (subject) -> BehaviorSubject<String> in
return subject
}.subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
first.onNext("B")
behaviorRelay.accept(second)
second.onNext("2")
first.onNext("C")
}
打印结果:
A
B
C
- from
from
可以将其他类型或者数据结构转换为 Observable
,示例代码如下:
1、将一个数组转换为 Observable
:
func test() {
let disposeBag = DisposeBag()
let numbers = Observable.from([0, 1, 2])
numbers.subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
}
打印结果:
0
1
2
它相当于:
func test() {
let disposeBag = DisposeBag()
let numbers = Observable<Int>.create { observer in
observer.onNext(0)
observer.onNext(1)
observer.onNext(2)
observer.onCompleted()
return Disposables.create()
}
numbers.subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
}
2、将一个可选值转换为 Observable
:
func test() {
let disposeBag = DisposeBag()
let optional: Int? = 1
let value = Observable.from(optional: optional)
value.subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
}
打印结果:
1
它相当于:
func test() {
let disposeBag = DisposeBag()
let optional: Int? = 1
let value = Observable<Int>.create { observer in
if let element = optional {
observer.onNext(element)
}
observer.onCompleted()
return Disposables.create()
}
value.subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
}
- groupBy
groupBy
操作符将源 Observable
分解为多个子 Observable
,然后将这些子 Observable
发送出来。
它会将元素通过某个键进行分组,然后将分组后的元素序列以 Observable
的形态发送出来,示例代码如下:
func test() {
let disposeBag = DisposeBag()
// 将奇数偶数分成两组
Observable<Int>.of(0, 1, 2, 3, 4, 5).groupBy(keySelector: { (element) -> String in
return (element % 2 == 0 ? "偶数" : "基数")
}).subscribe { (event) in
switch event {
case .next(let group):
group.asObservable().subscribe({ (event) in
print("key:\(group.key) event:\(event)")
})
.disposed(by: disposeBag)
default:
print("")
}
}.disposed(by: disposeBag)
}
打印结果:
key:偶数 event:next(0)
key:基数 event:next(1)
key:偶数 event:next(2)
key:基数 event:next(3)
key:偶数 event:next(4)
key:基数 event:next(5)
key:偶数 event:completed
key:基数 event:completed
- ignoreElements
ignoreElements
忽略掉所有的元素,只发出 error
或 completed
事件,示例代码如下:
func test() {
let disposeBag = DisposeBag()
Observable<Int>.of(1, 2, 3, 4, 5, 6).ignoreElements().subscribe { (e) in
print(e)
}.disposed(by: disposeBag)
}
打印结果:
completed
- interval
interval
操作符将创建一个 Observable
,它每隔一段设定的时间,发出一个索引数的元素。它将发出无数个元素,示例代码如下:
let disposeBag = DisposeBag()
func test() {
let intervalObservable = Observable<Int>.interval(DispatchTimeInterval.seconds(2), scheduler: MainScheduler.instance)
intervalObservable.subscribe { (event) in
print(event)
}.disposed(by: disposeBag)
}
打印结果:
next(0)
next(1)
next(2)
next(3)
next(4)
next(5)
next(6)
....
注:这个 disposeBag
必须是全局的,要不 interval
没有效果。
- just
创建一个 Observable
,只包含一个元素,示例代码如下:
func test() {
let disposeBag = DisposeBag()
let justObservable = Observable.just("100")
justObservable.subscribe({ (e) in
print(e)
}).disposed(by: disposeBag)
}
打印结果:
next(100)
completed
- map
map
操作符将源 Observable
的每个元素应用你提供的转换方法,然后返回含有转换结果的 Observable
,示例代码如下:
func test() {
let disposeBag = DisposeBag()
let observable = Observable<Int>.of(1, 32, 66, 88, 100)
observable.map { (e) -> String in
return "== \(e) =="
}.subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
}
打印结果:
== 1 ==
== 32 ==
== 66 ==
== 88 ==
== 100 ==
- merge
通过使用 merge
操作符可以将多个 Observables
合并成一个,当某一个 Observable
发出一个元素时,他就将这个元素发出,示例代码如下:
func test() {
let disposeBag = DisposeBag()
let subject1 = PublishSubject<Int>()
let subject2 = PublishSubject<Int>()
Observable.of(subject1, subject2).merge().subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
subject1.onNext(20)
subject1.onNext(40)
subject2.onNext(1)
subject1.onNext(80)
subject1.onNext(100)
}
打印结果:
20
40
1
80
100
- never
never
操作符将创建一个 Observable
,这个 Observable
不会产生任何事件。
创建一个不会产生任何事件的 Observable
:
let id = Observable<Int>.never()
它相当于:
let id = Observable<Int>.create { observer in
return Disposables.create()
}
- observeOn 和 subscribeOn
ReactiveX
使用Scheduler
来让Observable
支持多线程。你可以使用observeOn
操作符,来指示Observable
在哪个Scheduler
发出通知。
注意⚠️:一旦产生了 onError
事件, observeOn
操作符将立即转发。他不会等待 onError
之前的事件全部被收到。这意味着 onError
事件可能会跳过一些元素提前发送出去。
subscribeOn
操作符非常相似。它指示 Observable
在哪个 Scheduler
发出执行。
默认情况下,Observable
创建,应用操作符以及发出通知都会在 Subscribe
方法调用的 Scheduler
执行。subscribeOn
操作符将改变这种行为,它会指定一个不同的 Scheduler
来让 Observable
执行,observeOn
操作符将指定一个不同的 Scheduler
来让 Observable
通知观察者。
以下是 subscribeOn
和 observeOn
简单例子,示例代码如下:
eg1
:
func test() {
Observable<String>.create { (observer) -> Disposable in
observer.onNext("A")
sleep(1)
observer.onNext("B")
return Disposables.create()
}.subscribeOn(ConcurrentDispatchQueueScheduler(qos: DispatchQoS.background)).subscribe(onNext: { (e) in
print("\(e) -- 是否在主线程:\(Thread.isMainThread)")
}).disposed(by: disposeBag)
}
打印结果:
A -- 是否在主线程:false
B -- 是否在主线程:false
eg2
: 结合上面示例代码和以下的示例代码,自己去体会 subscribeOn
和 observeOn
用法。
func test() {
Observable<String>.create { (observer) -> Disposable in
observer.onNext("A")
sleep(1)
observer.onNext("B")
return Disposables.create()
}.subscribeOn(ConcurrentDispatchQueueScheduler(qos: DispatchQoS.background)).observeOn(MainScheduler.instance).subscribe(onNext: { (e) in
print("\(e) -- 是否在主线程:\(Thread.isMainThread)")
}).disposed(by: disposeBag)
}
打印结果:
A -- 是否在主线程:true
B -- 是否在主线程:true
- reduce
reduce
操作符将对第一个元素应用一个函数。然后,将结果作为参数填入到第二个元素的应用函数中。以此类推,直到遍历完全部的元素后发出最终结果,以下是方法及注释:
/**
- parameter seed: 累加器的初始值
- parameter accumulator: 累加器闭包
- returns: 结合累加器的初始值,返回所有元素的累加值
*/
public func reduce<A>(_ seed: A, accumulator: @escaping (A, Self.Element) throws -> A) -> RxSwift.Observable<A>
示例代码如下:
func test() {
Observable<Int>.from([1, 2, 3]).reduce(0) { (seed, e) -> Int in
return (e + seed)
}.subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
}
打印结果:
6
- refCount
refCount
操作符将自动连接和断开可被连接的 Observable
。它将可被连接的 Observable
转换为普通 Observable
。当第一个观察者对它订阅时,那么底层的 Observable
将被连接。当最后一个观察者离开时,那么底层的 Observable
将被断开连接,示例代码如下:
func test() {
let disposeBag = DisposeBag()
let observable = Observable<String>.just("A").publish()
observable.refCount().subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
}
打印结果:
A
注:请结合操作符 connect
和 publish
来理解示例代码。
- repeatElement
repeatElement
操作符将创建一个 Observable
,这个 Observable
将无止尽的发出同一个元素,示例代码如下:
let id = Observable.repeatElement(0)
它相当于:
let id = Observable<Int>.create { observer in
observer.onNext(0)
observer.onNext(0)
observer.onNext(0)
observer.onNext(0)
... // 无数次
return Disposables.create()
}
- replay
replay
操作符将 Observable
转换为可被连接的 Observable
,并且这个可被连接的 Observable
将缓存最新的 n
个元素。当有新的观察者对它进行订阅时,它就把这些被缓存的元素发送给观察者。
func test() {
let intSequence = Observable<Int>.interval(DispatchTimeInterval.seconds(1), scheduler: MainScheduler.instance)
.replay(5)
print("====== subscribe 1 ======")
_ = intSequence.subscribe(onNext: { (e) in
print("Subscription 1:, Event: \(e)")
})
DispatchQueue.main.asyncAfter(deadline: .now() + 4) {
print("====== connect ======")
_ = intSequence.connect()
}
DispatchQueue.main.asyncAfter(deadline: .now() + 7) {
print("====== subscribe 2 ======")
_ = intSequence.subscribe(onNext: { (e) in
print("Subscription 2:, Event: \(e)")
})
}
DispatchQueue.main.asyncAfter(deadline: .now() + 10) {
print("====== subscribe 3 ======")
_ = intSequence.subscribe(onNext: { (e) in
print("Subscription 3:, Event: \(e)")
})
}
}
打印结果:
====== subscribe 1 ======
====== connect ======
Subscription 1:, Event: 0
Subscription 1:, Event: 1
====== subscribe 2 ======
Subscription 2:, Event: 0
Subscription 2:, Event: 1
Subscription 1:, Event: 2
Subscription 2:, Event: 2
Subscription 1:, Event: 3
Subscription 2:, Event: 3
Subscription 1:, Event: 4
Subscription 2:, Event: 4
====== subscribe 3 ======
Subscription 3:, Event: 0
Subscription 3:, Event: 1
Subscription 3:, Event: 2
Subscription 3:, Event: 3
Subscription 3:, Event: 4
Subscription 1:, Event: 5
Subscription 2:, Event: 5
Subscription 3:, Event: 5
Subscription 1:, Event: 6
Subscription 2:, Event: 6
Subscription 3:, Event: 6
Subscription 1:, Event: 7
Subscription 2:, Event: 7
Subscription 3:, Event: 7
....
- retry
retry
操作符将不会将 error
事件,传递给观察者,然而,它会从新订阅源 Observable
,给这个 Observable
一个重试的机会,让它有机会不产生 error
事件。retry 总是对观察者发出 next
事件,即便源序列产生了一个 error
事件,所以这样可能会产生重复的元素,示例代码如下:
enum MyError: Error {
case errorA
case errorB
var errorType: String {
switch self {
case .errorA:
return "I am error A"
case .errorB:
return "I am error B"
}
}
}
func test() {
let disposeBag = DisposeBag()
var count = 1
let sequenceThatErrors = Observable<String>.create { (observer) in
observer.onNext("1")
observer.onNext("2")
observer.onNext("3")
if count == 1 {
observer.onError(MyError.errorA)
print("Error encountered")
count += 1
}
observer.onNext("A")
observer.onNext("B")
observer.onNext("C")
observer.onCompleted()
return Disposables.create()
}
sequenceThatErrors.retry().subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
}
打印结果:
1
2
3
Error encountered
1
2
3
A
B
C
- sample
sample
操作符将不定期的对源 Observable
进行取样操作。通过第二个 Observable
来控制取样时机。一旦第二个 Observable
发出一个元素,就从源 Observable
中取出最后产生的元素。这个操作符很少用到,这里就不举例了。
- scan
scan
操作符将对第一个元素应用一个函数,将结果作为第一个元素发出。然后,将结果作为参数填入到第二个元素的应用函数中,创建第二个元素。以此类推,直到遍历完全部的元素。
这种操作符在其他地方有时候被称作是 accumulator
,示例代码如下:
func test() {
let disposeBag = DisposeBag()
Observable.of(10, 100, 1000).scan(1) { (seed, e) -> Int in
return seed + e
}.subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
}
打印结果:
11
111
1111
- shareReplay
shareReplay
操作符将使得观察者共享源 Observable
,并且缓存最新的 n
个元素,将这些元素直接发送给新的观察者。
注:从框架源码中可以看出,shareReplay
是 replay
和 refCount
的组合,具体自己去 replay
和 refCount
。
- single
single
操作符将限制 Observable
只产生一个元素。如果 Observable
只有一个元素,它将镜像这个 Observable
。如果 Observable
没有元素或者元素数量大于一,它将产生一个 error
事件,示例代码如下:
func test() {
let disposeBag = DisposeBag()
Observable<Int>.of(1, 2, 3).single().subscribe(onNext: { (e) in
print("A == ", e)
}, onError: { (error) in
print("error:", error)
}).disposed(by: disposeBag)
Observable<Int>.of(1, 2, 3).single { (e) -> Bool in
return (e == 3)
}.subscribe(onNext: { (e) in
print("B == ", e)
}).disposed(by: disposeBag)
}
打印结果:
A == 1
error: Sequence contains more than one element.
B == 3
- skip、skipUntil 和 skipWhile
skip
操作符可以让你跳过 Observable
中头 n
个元素,只关注后面的元素,示例代码如下:
skipUntil
操作符可以让你忽略源 Observable
中头几个元素,直到另一个 Observable
发出一个元素后,它才镜像源 Observable
。
skipWhile
操作符可以让你忽略源 Observable
中头几个元素,直到元素的判定为否后,它才镜像源 Observable
。
注:三个操作符用法类似,这里就举一个 skip
的例子供参考,示例代码如下:
func test() {
let disposeBag = DisposeBag()
Observable.of("A", "B", "C", "D", "E", "F").skip(2).subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
}
打印结果:
C
D
E
F
- startWith
startWith
操作符会在 Observable
头部插入一些元素,示例代码如下:
func test() {
let disposeBag = DisposeBag()
Observable.of("A", "B", "C", "D").startWith("1").startWith("2").startWith("3", "a", "b").subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
}
打印结果:
3
a
b
2
1
A
B
C
D
- take、takeLast、takeUntil 和 takeWhile
take
:通过 take
操作符你可以只发出头 n
个元素。并且忽略掉后面的元素,直接结束序列。
takeLast
:通过 takeLast
操作符你可以只发出尾部 n
个元素。并且忽略掉前面的元素。
takeUntil
操作符将镜像源 Observable
,它同时观测第二个 Observable
。一旦第二个 Observable
发出一个元素或者产生一个终止事件,那个镜像的 Observable
将立即终止。
takeWhile
操作符将镜像源 Observable
直到某个元素的判定为 false
。此时,这个镜像的 Observable
将立即终止。
注:四个操作符用法类似,这里就举一个 take
的例子供参考,示例代码如下:
func test() {
let disposeBag = DisposeBag()
Observable.of("A", "B", "C", "D", "E", "F").take(3).subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
}
打印结果:
A
B
C
- timeout
如果 Observable
在一段时间内没有产生元素,timeout
操作符将使它发出一个 error
事件。
let disposeBag = DisposeBag()
func test() {
Observable<String>.create { (observer) -> Disposable in
return Disposables.create()
}.timeout(RxTimeInterval.seconds(3), scheduler: MainScheduler.instance).subscribe(onNext: { (e) in
print("onNext:", e)
}, onError: { (error) in
print("onError:", error)
}).disposed(by: disposeBag)
}
打印结果:
onError: Sequence timeout.
注:等了 3s
之后,发出一个 error
事件,这时你就看到了打印结果。
- timer
timer
操作符将创建一个Observable
,它在经过设定的一段时间后,产生唯一的一个元素,示例代码如下:
let disposeBag = DisposeBag()
func test() {
Observable<Int>.timer(DispatchTimeInterval.seconds(1), scheduler: MainScheduler.instance).subscribe(onNext: { (e) in
print(e)
}, onError: { (error) in
print(error)
}, onCompleted: {
print("onCompleted")
}).disposed(by: disposeBag)
}
打印结果:
0
onCompleted
注:这里存在其他版本的 timer
操作符,可以实现:创建一个 Observable
在一段延时后,每隔一段时间产生一个元素。方法如下:
/**
- parameter dueTime: 初始延时
- parameter period: 时间间隔
- parameter scheduler: 运行在哪个线程
*/
public static func timer(_ dueTime: RxSwift.RxTimeInterval, period: RxSwift.RxTimeInterval? = nil, scheduler: RxSwift.SchedulerType) -> RxSwift.Observable<Self.Element>
示例代码如下:
let disposeBag = DisposeBag()
func test() {
Observable<Int>.timer(RxTimeInterval.seconds(2), period: RxTimeInterval.seconds(1), scheduler: MainScheduler.instance).subscribe(onNext: { (e) in
print(e)
}, onError: { (error) in
print(error)
}, onCompleted: {
print("onCompleted")
}).disposed(by: disposeBag)
}
打印结果:
0
1
2
3
4
5
6
....
- using
通过使用 using
操作符创建 Observable
时,同时创建一个可被清除的资源,一旦 Observable
终止了,那么这个资源就会被清除掉了。
func test() {
// 一个无限序列(每隔1秒创建一个序列数 )
let infiniteInterval = Observable<Int>.interval(DispatchTimeInterval.seconds(1), scheduler: MainScheduler.instance).do(onNext: { (e) in
print("infinite : \(e)")
}, onSubscribe: {
print("开始订阅 infinite")
}, onDispose: {
print("销毁 infinite")
})
// 一个有限序列(每隔2秒创建一个序列数,共创建三个 )
let limited = Observable<Int>
.interval(DispatchTimeInterval.seconds(2), scheduler: MainScheduler.instance).take(2).do(onNext: { (e) in
print("limited: \(e)")
}, onSubscribe: {
print("开始订阅 limited")
}, onDispose: {
print("销毁 limited")
})
//使用using操作符创建序列
let o: Observable<Int> = Observable.using({ () -> AnyDisposable in
return AnyDisposable(infiniteInterval.subscribe())
}) { (disposable) -> Observable<Int> in
return limited
}
o.subscribe()
}
class AnyDisposable: Disposable {
let _dispose: () -> Void
init(_ disposable: Disposable) {
_dispose = disposable.dispose
}
func dispose() {
_dispose()
}
}
打印结果如下:
开始订阅 infinite
开始订阅 limited
infinite : 0
infinite : 1
limited: 0
infinite : 2
infinite : 3
limited: 1
销毁 limited
销毁 infinite
- window
window
操作符和 buffer
十分相似,buffer
周期性的将缓存的元素集合发送出来,而 window
周期性的将元素集合以 Observable
的形态发送出来。
buffer
要等到元素搜集完毕后,才会发出元素序列。而 window
可以实时发出元素序列。
func test() {
let disposeBag = DisposeBag()
let publicSubject = PublishSubject<String>()
publicSubject.window(timeSpan: RxTimeInterval.seconds(1), count: 3, scheduler: MainScheduler.instance).subscribe(onNext: { (observable) in
_ = observable.subscribe(onNext: { (e) in
print("onNext: ", e)
})
}, onError: { (error) in
print("onError:", error)
}, onCompleted: {
print("onCompleted")
}).disposed(by: disposeBag)
publicSubject.onNext("A")
publicSubject.onNext("B")
publicSubject.onNext("C")
publicSubject.onNext("D")
publicSubject.onNext("E")
publicSubject.onNext("F")
publicSubject.onNext("G")
}
打印结果如下:
onNext: A
onNext: B
onNext: C
onNext: D
onNext: E
onNext: F
onNext: G
- withLatestFrom
eg1
:
func test() {
let disposeBag = DisposeBag()
let firstSubject = PublishSubject<String>()
let secondSubject = PublishSubject<String>()
firstSubject.withLatestFrom(secondSubject).subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
firstSubject.onNext("A")
firstSubject.onNext("B")
secondSubject.onNext("1")
secondSubject.onNext("2")
firstSubject.onNext("C")
}
打印结果如下:
2
eg2
:
func test() {
let disposeBag = DisposeBag()
let firstSubject = PublishSubject<String>()
let secondSubject = PublishSubject<String>()
firstSubject.withLatestFrom(secondSubject, resultSelector: { (first, second) -> String in
return "\(first) -- \(second)"
}).subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
firstSubject.onNext("A")
firstSubject.onNext("B")
secondSubject.onNext("1")
secondSubject.onNext("2")
firstSubject.onNext("C")
}
打印结果如下:
C -- 2
- zip
zip
操作符将多个(最多不超过 8
个) Observables
的元素通过一个函数组合起来,然后将这个组合的结果发出来。它会严格的按照序列的索引数进行组合。例如,返回的 Observable
的第一个元素,是由每一个源 Observables
的第一个元素组合出来的。它的第二个元素 ,是由每一个源 Observables
的第二个元素组合出来的。它的第三个元素 ,是由每一个源 Observables
的第三个元素组合出来的,以此类推。它的元素数量等于源 Observables
中元素数量最少的那个。
func test() {
let disposeBag = DisposeBag()
let first = PublishSubject<String>()
let second = PublishSubject<String>()
Observable.zip(first, second) { (e1, e2) -> String in
return "\(e1) -- \(e2)"
}.subscribe(onNext: { (e) in
print(e)
}).disposed(by: disposeBag)
first.onNext("1")
second.onNext("A")
first.onNext("2")
second.onNext("B")
second.onNext("C")
second.onNext("D")
first.onNext("3")
first.onNext("4")
}
打印结果如下:
1 -- A
2 -- B
3 -- C
4 -- D
如何选择操作符?
我想要创建一个
Observable
- 产生特定的一个元素:
just
◦ 经过一段延时:timer
- 从一个序列拉取元素:
from
- 重复的产生某一个元素:
repeatElement
- 存在自定义逻辑:
create
- 每次订阅时产生:
deferred
- 每隔一段时间,发出一个元素:
interval
◦ 在一段延时后:timer
- 一个空序列,只有一个完成事件:
empty
- 一个任何事件都没有产生的序列:
never
我想要创建一个
Observable
通过组合其他的Observables
- 任意一个
Observable
产生了元素,就发出这个元素:merge
- 让这些
Observables
一个接一个的发出元素,当上一个Observable
元素发送完毕后,下一个Observable
才能开始发出元素:concat
- 组合多个
Observables
的元素
◦ 当每一个Observable
都发出一个新的元素:zip
◦ 当任意一个Observable
发出一个新的元素:combineLatest
我想要转换
Observable
的元素后,再将它们发出来
- 对每个元素直接转换:
map
- 转换到另一个
Observable
:flatMap
◦ 只接收最新的元素转换的Observable
所产生的元素:flatMapLatest
◦ 每一个元素转换的Observable
按顺序产生元素:concatMap
- 基于所有遍历过的元素:
scan
我想要将产生的每一个元素,拖延一段时间后再发出:
delay
我想要将产生的事件封装成元素发送出来
- 将他们封装成
Event<Element>:materialize
◦ 然后解封出来:dematerialize
我想要忽略掉所有的
next
事件,只接收completed
和error
事件:ignoreElements
我想创建一个新的
Observable
在原有的序列前面加入一些元素:startWith
我想从
Observable
中收集元素,缓存这些元素之后在发出:buffer
我想将
Observable
拆分成多个Observables
:window
- 基于元素的共同特征:
groupBy
我想只接收
Observable
中特定的元素
- 发出唯一的元素:
single
我想重新从
Observable
中发出某些元素
- 通过判定条件过滤出一些元素:
filter
- 仅仅发出头几个元素:
take
- 仅仅发出尾部的几个元素:
takeLast
- 仅仅发出第
n
个元素:elementAt
- 跳过头几个元素
◦ 跳过头 n 个元素:skip
◦ 跳过头几个满足判定的元素:skipWhile
,skipWhileWithIndex
◦ 跳过某段时间内产生的头几个元素:skip
◦ 跳过头几个元素直到另一个Observable
发出一个元素:skipUntil
- 只取头几个元素
◦ 只取头几个满足判定的元素:takeWhile
,takeWhileWithIndex
◦ 只取某段时间内产生的头几个元素:take
◦ 只取头几个元素直到另一个Observable
发出一个元素:takeUntil
- 周期性的对
Observable
抽样:sample
- 发出那些元素,这些元素产生后的特定的时间内,没有新的元素产生:
debounce
- 直到元素的值发生变化,才发出新的元素:
distinctUntilChanged
◦ 并提供元素是否相等的判定函数:distinctUntilChanged
- 在开始发出元素时,延时后进行订阅:
delaySubscription
我想要从一些
Observables
中,只取第一个产生元素的Observable
:amb
我想评估
Observable
的全部元素
- 并且对每个元素应用聚合方法,待所有元素都应用聚合方法后,发出结果:
reduce
- 并且对每个元素应用聚合方法,每次应用聚合方法后,发出结果:
scan
我想把
Observable
转换为其他的数据结构:as...
我想在某个
Scheduler
应用操作符:subscribeOn
- 在某个
Scheduler
监听:observeOn
我想要
Observable
发生某个事件时, 采取某个行动:do
我想要
Observable
发出一个error
事件:error
- 如果规定时间内没有产生元素:
timeout
我想要
Observable
发生错误时,优雅的恢复
- 如果规定时间内没有产生元素,就切换到备选
Observable
:timeout
- 如果产生错误,将错误替换成某个元素 :
catchErrorJustReturn
- 如果产生错误,就切换到备选
Observable
:catchError
- 如果产生错误,就重试 :
retry
我创建一个
Disposable
资源,使它与Observable
具有相同的寿命:using
我创建一个
Observable
,直到我通知它可以产生元素后,才能产生元素:publish
- 并且,就算是在产生元素后订阅,也要发出全部元素:
replay
- 并且,一旦所有观察者取消观察,他就被释放掉:
refCount
- 通知它可以产生元素了:
connect
Author
如果你有什么建议,可以关注我,直接留言,留言必回。
网友评论