1 Introduction
- Observable被观察
- Observer观察者
- 官网https://github.com/ReactiveX/RxSwift
- 参考网址http://reactivex.io/documentation/operators.html
2 Creating and Subscribing to Observablesnever
never
let disposeBag = DisposeBag()
let neverq = Observable<String>.never()
let neverqSub = neverq.subscribe{ _ in
print("never show")
}
//不会结束也不会发出事件
empty
let disposeBag = DisposeBag()
Observable<Int>.empty().subscribe({ (event) in
print(event)
}).disposed(by: disposeBag)
- 空可被观察, 只发出个完成事件
- 打印 completed
just
let disposeBag = DisposeBag()
Observable.just("1").subscribe{ event in
print(event)
}.disposed(by: disposeBag)
- 创建被观察队列,仅有一个元素
- 打印next()和complete
of
let disposeBag = DisposeBag()
Observable.of("1","2","3","4").subscribe(onNext: { (element) in
print(element)
}).disposed(by: disposeBag)
- 从固定数组中创建被观察队列
- 使用了 .subscribe(onNext:)
这个例子同时还介绍了.subscribe(onNext:)这个通用方法,这个方法只产生Next 事件元素忽略error和complete,像这么的还有 onError, onComplete
form
let disposeBag = DisposeBag()
Observable.from(["1","3","5","7"]).subscribe(onNext: { (element) in
print(element)
}).disposed(by: disposeBag)
- 从队列中创建被观察者,例如array,dictionary,set
和of类似,不过of是直接写入,在括号内,form是可以传入序列
create
let disposeBag = DisposeBag()
let myJust = { (element:String) -> Observable<String> in
return Observable.create{ observer in
observer.on(.next(element))
observer.on(.completed)
return Disposables.create()
}
}
myJust("hello").subscribe{print($0)}.disposed(by: disposeBag)
- 自定义被观察者
range
let disposeBag = DisposeBag()
Observable.range(start: 1, count: 10).subscribe{
print($0)
}.disposed(by: disposeBag)
- 创建被观察者,并发送一串数字,最后发送完成
repeat
let disposeBag = DisposeBag()
Observable.repeatElement("101")
.take(3)
.subscribe{print($0)}
.disposed(by: disposeBag)
- 从给定元素中创建观察者,重复发出事件
同时也介绍了take,返回重复几次
generate
let disposeBag = DisposeBag()
Observable.generate(
initialState: 0,
condition: {$0 < 10},
iterate: {$0 + 2})
.subscribe(onNext:{print($0)})
.disposed(by: disposeBag)
*创造一个observable序列,只要值为true,那就生成值
deferred
let disposeBag = DisposeBag()
var count = 1
let defferedSeq = Observable<String>.deferred{ () -> Observable<String> in
print("Creating \(count)")
count += 1
return Observable.create{ observer in
print("Emitting")
observer.onNext("喵喵")
observer.onNext("狗子")
observer.onNext("猴子")
return Disposables.create()
}
}
defferedSeq.subscribe{ event in
print(event)
}.disposed(by: disposeBag)
defferedSeq.subscribe{ event in
print(event)
}.disposed(by: disposeBag)
- 为每一个订阅者创建一个observable
- 延后执行
error
let disposeBag = DisposeBag()
Observable<Int>.error(TestError.test)
.subscribe{print($0)}
.disposed(by: disposeBag)
- 创建一个observable,发送一个error
doOn
let disposeBag = DisposeBag()
Observable.of("1","2","3","4")
.do(onNext: {print("do",$0)}, onError: {print("error",$0)}, onCompleted: {print("completed")})
.subscribe(onNext:{print($0)})
.disposed(by: disposeBag)
- 拦截方法
- 调用一个附加动作为每个发送的动作,并且返回最初的事件
3 Working with Subjects
简单的比喻下,Observable像是一个水管,会源源不断的有水冒出来。Subject就像一个水龙头,它可以套在水管上,接受Observable上面的事件。但是作为水龙头,它下面还可以被别的observer给subscribe了。subject在rx中类似是桥或者协议在observable和observer中,可以作为observable,也可以作为observer
参考网址:http://www.jianshu.com/p/209cae2a54a1
PublishSubjectlet disposeBag = DisposeBag() let subject = PublishSubject<String>() subject.addObserver("1").disposed(by: disposeBag) subject.onNext("🐶") subject.onNext("🐱") subject.addObserver("2").disposed(by: disposeBag) subject.onNext("🅰️") subject.onNext("🅱️")对所有观察者广播一条新的事件,接受只能在订阅的时间后
ReplaySubjectlet disposeBag = DisposeBag() let subject = ReplaySubject<String>.create(bufferSize: 1) subject.addObserver("1").disposed(by: disposeBag) subject.onNext("🐶") subject.onNext("🐱") subject.addObserver("2").disposed(by: disposeBag) subject.onNext("🅰️") subject.onNext("🅱️")对所有观察者广播一条新的事件,但是通过缓冲可以数字保留需要缓存的数量
BehaviorSubjectlet disposeBag = DisposeBag() let subject = BehaviorSubject(value: "🔴") subject.addObserver("1").disposed(by: disposeBag) subject.onNext("🐶") subject.onNext("🐱") subject.addObserver("2").disposed(by: disposeBag) subject.onNext("🅰️") subject.onNext("🅱️") subject.addObserver("3").disposed(by: disposeBag) subject.onNext("🍐") subject.onNext("🍊")广播新的事件到所有的订阅者们上,并且首先发送最近的一个(或者默认的)值到订阅者们上,然后再发送订阅之后的事件。
Variablelet disposeBag = DisposeBag() let variable = Variable("🔴") variable.asObservable().addObserver("1").disposed(by: disposeBag) variable.value = "🐶" variable.value = "🐱" variable.asObservable().addObserver("2").disposed(by: disposeBag) variable.value = "🅰️" variable.value = "🅱️"封装了一层的BehaviorSubject, 还会发送个完成指令
4 Combination Operators将多个不同源的Observable合并成单一的Observable
Startwithlet disposeBag = DisposeBag() Observable.of("🐶", "🐱", "🐭", "🐹") .startWith("1️⃣") .startWith("2️⃣") .startWith("3️⃣", "🅰️", "🅱️") .subscribe(onNext: { print($0) }) .disposed(by: disposeBag)在源Observable序列开始发散元素之前发出元素的特殊序列。
相当于提前发送别的元素在该序列发散前
在这个例子。startwith就是个后进先出的栈,而且后面的startwith会比前面的startwith先出来
mergelet disposeBag = DisposeBag() let subject1 = PublishSubject<String>() let subject2 = PublishSubject<String>() Observable.of(subject1, subject2) .merge() .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) subject1.onNext("🅰️") subject1.onNext("🅱️") subject2.onNext("①") subject2.onNext("②") subject1.onNext("🆎") subject2.onNext("③")真合并,然后会发送每个元素
ziplet disposeBag = DisposeBag() let stringSubject = PublishSubject<String>() let intSubject = PublishSubject<Int>() Observable.zip(stringSubject, intSubject) { stringElement, intElement in "(stringElement) (intElement)" } .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) stringSubject.onNext("🅰️") stringSubject.onNext("🅱️") intSubject.onNext(1) intSubject.onNext(2) stringSubject.onNext("🆎") intSubject.onNext(3)将8个源Observable组合成一个新的Observable序列,并且将会根据每个源Observable序列的索引位置发散组合的Observable序列元素🅰️ 1🅱️ 2🆎 3
combineLatestlet disposeBag = DisposeBag() let stringSubject = PublishSubject<String>() let intSubject = PublishSubject<Int>() Observable.combineLatest(stringSubject, intSubject) { stringElement, intElement in "(stringElement) (intElement)" } .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) stringSubject.onNext("🅰️") stringSubject.onNext("🅱️") intSubject.onNext(1) intSubject.onNext(2) stringSubject.onNext("🆎")将8个源Observable组合成一个新的Observable序列,并且一旦所有源序列发出至少一个元素以及当任何源Observable序列发出一个新的元素的时候,将开始发散组合的Observable序列的最新的元素。
还可以数组合并let disposeBag = DisposeBag() let stringObservable = Observable.just("❤️") let fruitObservable = Observable.from(["🍎", "🍐", "🍊"]) let animalObservable = Observable.of("🐶", "🐱", "🐭", "🐹") Observable.combineLatest([stringObservable, fruitObservable, animalObservable]) { "($0[0]) ($0[1]) ($0[2])" } .subscribe(onNext: { print($0) }) .disposed(by: disposeBag)
switchLatestlet disposeBag = DisposeBag() let subject1 = BehaviorSubject(value: "⚽️") let subject2 = BehaviorSubject(value: "🍎") let variable = Variable(subject1) variable.asObservable() .switchLatest() .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) subject1.onNext("🏈") subject1.onNext("🏀") variable.value = subject2 subject1.onNext("⚾️") subject2.onNext("🍐")通过一个在Observable序列队里的Observable序列发出转换元素,并且发出最近的Observable序列里的元素
关键是切换内在
5 Transforming Operators在Observable序列转换下一次事件发生的操作
maplet disposeBag = DisposeBag() Observable.of(1, 2, 3) .map { $0 * $0 } .subscribe(onNext: { print($0) }) .disposed(by: disposeBag)在序列中应用闭包来转换,并且返回一个新的转换后的observable序列
faltmap和flatMapLatest将通过一个Observable序列发散出的元素转换成Observable序列队,并且合并这些发散元素作为一个单独的Observable序列。有时候这是非常有用的,比如,当你有一个Observable序列本身能自己发散Observable序列,而且你希望能够响应新的发散和Observable序列。flatMap和flatMapLatest不同的是,flatMapLatest只会发出最新的内部Observable序列的元素。也就是扁平化二维化
faltmap可能不同效果
scan let disposeBag = DisposeBag()Observable.of(10, 100, 1000).scan(1) { aggregateValue, newValue inaggregateValue + newValue}.subscribe(onNext: { print($0) }).addDisposableTo(disposeBag)从一开始的种子值,使用累加器闭包来致使observable序列中每个元素,最后返回一个并返回每个中间结果作为一个含有单个元素的Observable序列
也就是说是累加器,而且会保存之前的值
6 Filtering and Conditional Operators有选择性的从observable序列中发生元素的操作
filterlet disposeBag = DisposeBag() Observable.of( "🐱", "🐰", "🐶", "🐸", "🐱", "🐰", "🐹", "🐸", "🐱") .filter { $0 == "🐱" } .subscribe(onNext: { print($0) }) .disposed(by: disposeBag)特殊条件才返回的选择性序列
distinctUntilChangedlet disposeBag = DisposeBag() Observable.of("🐱", "🐷", "🐱", "🐱", "🐱", "🐵", "🐱") .distinctUntilChanged() .subscribe(onNext: { print($0) }) .disposed(by: disposeBag)封锁序列中连续的重复元素
elementAtlet disposeBag = DisposeBag() Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵") .elementAt(3) .subscribe(onNext: { print($0) }) .disposed(by: disposeBag)返回某个位置上的元素
signlelet disposeBag = DisposeBag() Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵") .single() .subscribe(onNext: { print($0) }) .disposed(by: disposeBag)只发生单个元素(或者首个符合条件),如果找不到符合要求或者多个符合要求的然后就抛出个error
有条件的let disposeBag = DisposeBag() Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵") .single { $0 == "🐸" } .subscribe { print($0) } .disposed(by: disposeBag) Observable.of("🐱", "🐰", "🐶", "🐱", "🐰", "🐶") .single { $0 == "🐰" } .subscribe { print($0) } .disposed(by: disposeBag) Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵") .single { $0 == "🔵" } .subscribe { print($0) } .disposed(by: disposeBag)
takelet disposeBag = DisposeBag() Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵") .take(3) .subscribe(onNext: { print($0) }) .disposed(by: disposeBag)从开头到take的number中发出元素
takeLastlet disposeBag = DisposeBag() Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵") .takeLast(3) .subscribe(onNext: { print($0) }) .disposed(by: disposeBag)从结束到take的number中发出元素
takeWhilelet disposeBag = DisposeBag() Observable.of(1, 2, 3, 4, 5, 6) .takeWhile { $0 < 4 } .subscribe(onNext: { print($0) }) .disposed(by: disposeBag)发出一个源Observable序列中的只要指定条件的值为true的元素。
takeUnitllet disposeBag = DisposeBag() let sourceSequence = PublishSubject<String>() let referenceSequence = PublishSubject<String>() sourceSequence .takeUntil(referenceSequence) .subscribe { print($0) } .disposed(by: disposeBag) sourceSequence.onNext("🐱") sourceSequence.onNext("🐰") sourceSequence.onNext("🐶") referenceSequence.onNext("🔴") sourceSequence.onNext("🐸") sourceSequence.onNext("🐷") sourceSequence.onNext("🐵")从一个observable发出,直到另一个序列发出信号
skiplet disposeBag = DisposeBag() Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵") .skip(2) .subscribe(onNext: { print($0) }) .disposed(by: disposeBag)跳过几个
skipWhilelet disposeBag = DisposeBag() Observable.of(1, 2, 3, 4, 5, 6) .skipWhile { $0 < 4 } .subscribe(onNext: { print($0) }) .disposed(by: disposeBag)通过判断跳过几个
skipWhileWithIndexlet disposeBag = DisposeBag() Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵") .skipWhileWithIndex { element, index in index < 3 } .subscribe(onNext: { print($0) }) .disposed(by: disposeBag)通过index判断跳过几个
skipUntillet disposeBag = DisposeBag() let sourceSequence = PublishSubject<String>() let referenceSequence = PublishSubject<String>() sourceSequence .skipUntil(referenceSequence) .subscribe(onNext: { print($0) }) .disposed(by: disposeBag) sourceSequence.onNext("🐱") sourceSequence.onNext("🐰") sourceSequence.onNext("🐶") referenceSequence.onNext("🔴") sourceSequence.onNext("🐸") sourceSequence.onNext("🐷") sourceSequence.onNext("🐵")从一个observable序列发出信号,直到另一份序列发出信号
7 Mathematical and Aggregate Operators数学运算符和集合运算符
toArraylet disposeBag = DisposeBag() Observable.range(start: 1, count: 10) .toArray() .subscribe { print($0) } .disposed(by: disposeBag)把一个observable序列转换成array
reducelet disposeBag = DisposeBag() Observable.of(10, 100, 1000) .reduce(1, accumulator: +) .subscribe(onNext: { print($0) }) .disposed(by: disposeBag)累加,从一个给点开始值累加,给好累加符号(+ - * /),返回一个集合运算符
concatlet disposeBag = DisposeBag() let subject1 = BehaviorSubject(value: "🍎") let subject2 = BehaviorSubject(value: "🐶") let variable = Variable(subject1) variable.asObservable() .concat() .subscribe { print($0) } .disposed(by: disposeBag) subject1.onNext("🍐") subject1.onNext("🍊") variable.value = subject2 subject2.onNext("I would be ignored") subject2.onNext("🐱") subject1.onCompleted() subject2.onNext("🐭")合并
按照顺序把多个 Observable 序列中的元素加入到一个Observable 序列中,在下一个序列发散元素之前等待每个序列成功终止。
8 Connectable Operators可连接的Observable序列类似于普通的Observable序列,除了当它们被订阅后一开始不发散元素,只有当它们的connect方法被调用才会发散元素。这样,你可以等待所有的订阅者在可连接的Observable序列开始发散元素之前订阅它。
先看一个不可连接操作printExampleHeader(#function) let interval = Observable<Int>.interval(1, scheduler: MainScheduler.instance) _ = interval .subscribe(onNext: { print("Subscription: 1, Event: ($0)") }) delay(5) { _ = interval .subscribe(onNext: { print("Subscription: 2, Event: ($0)") }) }
publish printExampleHeader(#function) let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance) .publish() _ = intSequence .subscribe(onNext: { print("Subscription 1:, Event: ($0)") }) delay(2) { _ = intSequence.connect() } delay(4) { _ = intSequence .subscribe(onNext: { print("Subscription 2:, Event: ($0)") }) } delay(6) { _ = intSequence .subscribe(onNext: { print("Subscription 3:, Event: ($0)") }) }讲一个普通observable转换成可连接observable
replayprintExampleHeader(#function) let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance) .replay(5) _ = intSequence .subscribe(onNext: { print("Subscription 1:, Event: ($0)") }) delay(2) { _ = intSequence.connect() } delay(4) { _ = intSequence .subscribe(onNext: { print("Subscription 2:, Event: ($0)") }) } delay(8) { _ = intSequence .subscribe(onNext: { print("Subscription 3:, Event: ($0)") }) }讲一个普通observable转换成可连接observable,同时保存buffer
multicastprintExampleHeader(#function) let subject = PublishSubject<Int>() _ = subject .subscribe(onNext: { print("Subject: ($0)") }) let intSequence = Observable<Int>.interval(1, scheduler: MainScheduler.instance) .multicast(subject) _ = intSequence .subscribe(onNext: { print("\tSubscription 1:, Event: ($0)") }) delay(2) { _ = intSequence.connect() } delay(4) { _ = intSequence .subscribe(onNext: { print("\tSubscription 2:, Event: ($0)") }) } delay(6) { _ = intSequence .subscribe(onNext: { print("\tSubscription 3:, Event: ($0)") }) }多路广播
把源Observable序列转换成一个可连接的Observable序列,并通过指定的subject广播发散。
9 Error Handling Operators从observable发送错误通知来帮助恢复的操作
catchErrorJustReturn从一个错误事件中修复,返回一个observable序列然后发送一个单个元素最后结束
catchError从一个错误事件中修复,切换到提供修复的observable序列
retry重试错误时间,凭依订阅observable序列
retry(_:)加上重试次数
10 Debugging Operators帮组debug的操作
debug打印所有的订阅,事件和处理
RxSwift.Resource.total提供所有的rx resource的分配,对内存泄漏查探很有帮助
网友评论