Rx
Rx是ReactiveX的缩写,简单来说就是基于异步事件序列的响应式编程。
RxSiwft:他只是基于Swift语言的Rx标准实现接口库,所以RxSwift里不包含任何Cocoa或者UI方面的类。
RxCocoa:是基于RxSwift针对于iOS开发的一个库,他通过Extension的方法给原生的比如UI控件添加了Rx的特性,使得我们更容易订阅和响应这些控件。
Obsevable
·Observable<T>这个类就是Rx框架的基础,我们可以称它为可观察序列。它的作用就是可以异步的产生一系列的事件,即一个Observable<T>对象会随着时间推移不定期的发出event(element: T)这样一个东西。
·而且这些事件还可以携带数据,他的泛型<T>就是用来指定这个事件携带的数据的类型。
·有了可观察序列,我们还需要有一个Observer(订阅者)来订阅它这样这个订阅者才能收到Observable<T>不是发出的事件
可以看到事件就是一个枚举,也就是说一个Observable是可以发出3种不同类型的事件(next,error, completed)
Observable的创建方法
1.just()该方法通过传入一个默认值来初始化
let observable = Observable<Int>.just(5)
2.of()该方法可以接受可变数量的参数(必须要是同类型的)
let observable = Observable.of("A", "B","C")
3.from()该方法需要一个数组参数
let observable = Observable.from(["A", "B","C"])
4.empty()该方法创建一个空内容的Observable
let observable = Observable<Int>
5.never()该方法创建一个永远不会发出事件(也不会终止)的Observable序列
let observable = Observable<Int>.never()
6.error()该方法创建一个不做任何操作,而是直接发送一个错误的Observable序列
enum MyError: Error {
case A
case B
}
let observable = Observable<int>.error(MyError.A)
7.range()该方法通过指定起始和结束数值,创建一个以这个范围内所有值作为初始值的Observable序列
let observable = Observable.range(start: 1, count: 5)
let observable = Observable.of(1, 2, 3, 4, 5)
8.repeatElement()该方法创建一个可以无限发出给定元素的Event的Observable序列(永不终止)
let observable = Observable.repeatElement(1)
9.generate()该方法创建一个只有当提供的所有的判断条件都为真的时候,才会给出动作的Observable序列
let obsevable = Observable.generate(
initialState: 0,
condition: { $0 <= 10 },
iterate: { $0 + 2 }
)
let observable = Observable.of(0, 2, 4, 6, 8, 10)
10.create()该方法接受一个block形式的参数,任务是对每一个过来的订阅进行处理
//这个block有一个回调参数observer就是订阅这个Observable对象的订阅者
//当一个订阅者订阅这个Observable对象的时候,就会讲经乐者作为参数传入这个block来执行一些内容
let observable = Observable<String>.create{ observer in
observer.onNext("Hello")
observer.onCompleted()
//因为一个订阅行为会有一个Disposable类型的返回值,所以在结尾一定要return一个Disposable
return Disposables.create()
}
observable.subscribe {
print($0)
}
11.defferred()该方法相当于创建一个Observable工厂,通过传入一个block来执行延迟Observable序列创建的行为,而这个block里就是真正的实例化序列对象的地方。
var isOdd = true
let factory: Observable<Int> = Observable.deferred {
isOdd = !isOdd
if isOdd {
return Observable.of(1, 3, 5, 7)
} else {
return Observable.of(2, 4, 6, 8)
}
}
factory.subscribe { event in
print("\(isOdd)", event)
}
factory.subscribe { event in
print("\(isOdd)", event)
}
12.interval()该方法创建的Observable序列每隔一段设定的时间,就发出一个索引数的元素。而且他会一直发送下去。
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable.subscribe { event in
print(event)
}
13.timer()
(1).这个方法有两种用法,一种是创建的Observable序列在经过设定的一段时间后,产生唯一的一个元素
// 5秒钟后发出唯一的一个元素0
let observable = Observable<int>.timer(5, scheduler: MainScheduler.instance)
observable.subscribe { event in
print(event)
}
(2).另一种是创建的Observable序列在经过设定的一段时间后,每隔一段时间产生一个元素
//延时5秒后,每隔1秒发出一个元素
let observable = Observable<Int>.timer(5, period: 1, scheduler:MainScheduler.instance)
observable.subscribe { event in
print(event)
}
订阅Observable
用法一:
我们使用subscribe()订阅了一个Observable对象,该方法的block的回调参数就是被发出的event事件
如果想要获取到这个事件中的数据,可以通过event,element得到
let observable = Observable.of(0, 2, 4, 6, 8, 10)
observable.subscribe { event in
print(event.element)
}
用法二:
RxSwift还提供了另一个subscribe方法。他可以吧event分类
let observable = Observable.of(0, 2, 4, 6, 8, 10)
observable.subscribe(onNext: { element in
print(element)
}, onError: { error in
print(error)
}, onCompleted: {
print("comepleted")
}, onDisposed: {
print("disposed")
})
监听事件的生命周期
doOn监听事件的生命周期
let observable = Observable.of(0, 2, 4, 6, 8, 10)
observable
doOn(onNext: { element in
print("Intercepted Next:", element)
}, onError: { error in
print("Intercepted Error:", error)
}, onCompleted: {
print("Intercepted comepleted")
}, onDisposed: {
print("Intercepted disposed")
})
.subscribe(onNext: { element in
print(element)
}, onError: { error in
print(error)
}, onCompleted: {
print("comepleted")
}, onDisposed: {
print("disposed")
})
Observable的销毁(Dispose)
dispose()该方法可以手动取消一个订阅行为
let observable = Observable.of(0, 2, 4, 6, 8, 10)
let subscription = observable.subscribe { event in
print(event)
}
subscription.dispose()
除了dispose()方法之外,我们更经常用到的是一个叫DisposeBag的对象来管理多个订阅行为的销毁,他会在自己快要dealloc的时候,对它里面的所有订阅行为都屌用dispose()
let disposeBag = DisposeBag()
let observable1 = Observable.of("A", "B", "C")
observable1.subscribe { event in
print(event)
}.disposed(by: disposeBag)
let observable2 = Observable.of(1, 2, 3)
observable2.subscribe { event in
print(event)
}.disposed(by: disposeBag)
观察者(Observer)
观察者的作用就是监听事件,然后对这个事件作出相应。
1.在subscribe方法中创建
创建观察者最直接的方法就是在Observable的subscribe方法后面描述当事件发生时,需要如何做出响应。
2.在bind方法中创建
import UIKit
import RxSwift
import RxCocoa
class ViewController: UIViewController {
@IBOutlet weak var label: UILabel!
let disposeBag = DisposeBag()
override func viewDidLoad() {
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable
.map { "当前缩阴术:\($0)" }
.bind { [weak self](text) in
self?.label.text = text
}
.disposed(by: disposeBag)
}
}
使用AnyObserver创建观察者
1.配合subscribe方法使用
let observer: AnyObserver<String> = AnyObserver { (event) in
switch event {
case .next(let data):
print(data)
case .error(let error):
print(error)
case .completed:
print("completed")
}
}
let observable = Observable.of("A", "B", "C")
observable.subscribe(observer)
2.配合bindTo使用
import UIKit
import RxSwift
import RxCocoa
class ViewController: UIViewController {
@IBOutlet weak var label: UILabel!
let disposeBag = DisposeBag()
override func viewDidLoad() {
let observer: AnyObserver<String> = AnyObserver { [weak self] (event) in
switch event {
case .next(let text):
self?.label.text = text
default:
break
}
}
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable
.map { "当前缩阴术:\($0)" }
.bind(to: observer)
.disposed(by: disposeBag)
}
}
使用Binder创建观察者
1.相较于AnyObserver的大而全,Binder更专注于特定的场景。Binder主要有以下两个特征:
·不会处理错误事件
·确保绑定都是在给定Scheduler上执行(默认MainScheduler)
import UIKit
import RxSwift
import RxCocoa
class ViewController: UIViewController {
@IBOutlet weak var label: UILabel!
let disposeBag = DisposeBag()
override func viewDidLoad() {
let observer: Binder<String> = Binder(label) { (view, text) in
view.text = text
}
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable
.map { "当前缩阴术:\($0)" }
.bind(to: observer)
.disposed(by: disposeBag)
}
}
自定义可绑定属性
方式一:通过对UI类进行扩展
import UIKit
import RxSwift
import RxCocoa
class ViewController: UIViewController {
@IBOutlet weak var label: UILabel!
let disposeBag = DisposeBag()
override func viewDidLoad() {
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable
.map { "当前缩阴术:\($0)" }
.bind(to: label.fontSize)
.disposed(by: disposeBag)
}
}
extension UILabel {
public var fontSize: Binder<CGFloat> {
return Binder(self) { label, fontSize in
label.font = UIFont.systemFont(ofSize: fontSize)
}
}
}
方式二:通过对Reactive类进行扩展
import UIKit
import RxSwift
import RxCocoa
class ViewController: UIViewController {
@IBOutlet weak var label: UILabel!
let disposeBag = DisposeBag()
override func viewDidLoad() {
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable
.map { "当前缩阴术:\($0)" }
.bind(to: label.rx.fontSize)
.disposed(by: disposeBag)
}
}
extension Reactive where Base: UILabel {
public var fontSize: Binder<CGFloat> {
return Binder(self.base) { label, fontSize in
label.font = UIFont.systemFont(ofSize: fontSize)
}
}
}
Subjects
Subjects既是订阅者,也是Observable
一共有四种Subjects,分别是PublishSubject,BehaviorSubject,ReplaySubject,Variable.她们之间最大的区别只是在于:当一个新的订阅者刚好订阅它的时候,能不能收到Subject以前发出过的旧事件,如果能的话又能收到多少个。
PublishSubject
- PublishSubject不需要初始值就能创建
- PublishSubject的订阅者从他们呢开始订阅的时间点起,可以收到订阅后Subject发出的新事件,而不会收到他们呢在订阅前已发出的事件
let disposeBag = DisposeBag()
let subject = PublishSubject<String>()
subject.onNext("111")
subject.subscribe(onNext: { string in
print("第1次订阅:", string)
}, onCompleted: {
print("第1次订阅:onCompleted")
}).disposed(by: disposeBag)
subject.onNext("222")
subject.subscribe(onNext: { string in
print("第2次订阅:", string)
}, onCompleted: {
print("第2次订阅:onCompleted")
}).disposed(by: disposeBag)
subject.onNext("333")
subject.onCompleted()
subject.onNext("444")
subject.subscribe(onNext: { string in
print("第3次订阅:", string)
}, onCompleted: {
print("第3次订阅:onCompleted")
}).disposed(by: disposeBag)
BehaviorSubject
1.需要一个默认初始值来创建
2.当一个订阅者来订阅它的时候,这个订阅者会立即收到BehaviorSubject上一个发出的事件。之后就跟正常的情况一样,他也会接收到BehaviorSubject之后发出的新的事件
let disposeBag = DisposeBag()
let subject = BehaviorSubject(value: "111")
subject.subscrbe { event in
print("第1次订阅:", event)
}.disposed(by: disposeBag)
subject.onNext("222")
subject.onError(NSError(domain: "local", code: 0, userInfo:nil))
subject.subscribe { event in
print("第2次订阅:", event)
}.disposed(by: disposeBag)
ReplaySubject
ReplaySubject在创建时候需要设置一个bufferSize,表示他对于他发送过的事件的缓存个数。
如果一个subscriber订阅已经结束的ReplaySubject,除了会收到缓存的。next的事件外,还会收到那个终结的.error或者.complete的事件。
let disposeBag = DisposeBag()
let subject = ReplaySubject<String>.create(bufferSize: 2)
subject.onNext("111")
subject.onNext("222")
subject.onNext("333")
subject.subscribe { event in
print("第1次订阅:", event)
}.disposed(by: disposeBag)
subject.onNext("444")
subject.subscribe { event in
print("第2次订阅:", event)
}.disposed(by: disposeBag)
subject.onCompleted()
subject.subscribe { event in
print("第3次订阅:", event)
}.disposed(by: disposeBag)
Variable
Variable其实就是对BehaviorSubject的封装,所以他也必须要通过一个默认的初始值进行创建
不同的是,Variable还会把当前发出的值保存为自己的状态。同时他会在销毁时自动发送.complete的事件,不需要也不能手动给Variables发送completed或者error事件来结束它
import UIKit
import RxSwift
import RxCocoa
class ViewController: UIViewController {
override func viewDidLoad() {
super.viewDidLoad()
let disposeBag = DisposeBag()
let variable = Variable("111")
variable.value = "222"
variable.asObservable().subscribe {
print("第1次订阅:", $0)
}.disposed(by: disposeBag)
variable.value = "333"
variable.asObservable().subscribe {
print("第2次订阅:", $0)
}.disposed(by: disposeBag)
variable.value = "444"
}
}
操作符
变换操作指的是对原始的Observable序列进行一些转换。
buffer 方法作用是缓冲组合,第一个参数是缓冲时间,第二个参数是缓冲个数,第三个参数是线程
import UIKit
import RxSwift
import RxCocoa
class ViewController: UIViewController {
let disposeBag = DisposeBag()
override func viewDidLoad() {
super.viewDidLoad()
let subject = PublishSubject<String>()
//没缓存3个元素则组合起来一起发出
//如果1秒钟内不够3个也会发出(有几个发几个,一个都没有发空数组[])
subject
.buffer(timeSpan: 1, count: 3, scheduler:MainScheduler.instance)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject.onNext("a")
subject.onNext("b")
subject.onNext("c")
subject.onNext("1")
subject.onNext("2")
subject.onNext("3")
}
}
window 操作符和buffer十分相识,不过buffer是周期性的将缓存的元素集合发送出去,而window周期性的将元素集合以Observable的形态发送出来。同时buffer要等到元素搜集完毕后,才会发出元素序列,而window可以实时发出元素序列
import UIKit
import RxSwift
import RxCocoa
class ViewController: UIViewController {
let disposeBag = DisposeBag()
override func viewDidLoad() {
super.viewDidLoad()
let subject = PublishSubject<String>()
//每3个元素作为一个子Observable发出
subject
.window(timeSpan: 1, count: 3, scheduler:MainScheduler.instance)
.subscribe(onNext: { [weak self] in
print("subscribe: \($0)")
$0.asObservable()
.subscribe(onNext: { print($0) })
.disposed(by: self!.disposeBag)
})
.disposed(by: disposeBag)
subject.onNext("a")
subject.onNext("b")
subject.onNext("c")
subject.onNext("1")
subject.onNext("2")
subject.onNext("3")
}
}
map 该操作符通过传入一个函数闭包原来的Observable序列转变为一个新的Observable序列
let disposeBag = DisposeBag()
Observable.of(1, 2, 3)
.map { $0 * 10 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
filter 该操作符就是用来过滤掉某些不符合要求的事件
let disposeBag = DisposeBag()
Observable.of(2, 30, 22, 5, 60, 3, 40, 9)
.filter {
$0 > 10
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
distinctUntilChanged 该操作符永雨过滤掉重复的事件
let disposeBag = DisposeBag()
Observable.of(1, 2, 3, 1, 1, 4)
. distinctUntilChanged()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
single 限制只发送一次事件,或者满足条件的第一个事件,如果存在多个事件或者没有事件都会发出一个error事件。如果只有一个事件,则不会发出error事件
elementAt 该方法实现只处理在指定位置的事件
ignoreElements 该操作符可以忽略掉所有的元素,只发出error或completed事件
take 该方法仅发送Observable序列中的前n个事件,在满足数量之后会自动.completed
takeLast 该方法实现仅发送Observable序列中的后n个事件
skip 该方法用于跳过Observable序列发出的前n个事件
amb 当传入多个Observable到amb操作符时,它将取第一个发出元素或产生事件的 Observable,然后只发出它的元素。并忽略掉其他的 Observables。
takeWhile 该方法依次判断 Observable 序列的每一个值是否满足给定的条件。 当第一个不满足条件的值出现时,它便自动完成。
takeUntil 除了订阅源Observable外,通过 takeUntil 方法我们还可以监视另外一个 Observable, 即 notifier。如果 notifier 发出值或 complete 通知,那么源 Observable 便自动完成,停止发送事件。
skipWhile 该方法用于跳过前面所有满足条件的事件。一旦遇到不满足条件的事件,之后就不会再跳过了。
skipUntil 同上面的 takeUntil 一样,skipUntil 除了订阅源 Observable 外,通过 skipUntil方法我们还可以监视另外一个 Observable, 即 notifier。与 takeUntil 相反的是。源 Observable 序列事件默认会一直跳过,直到 notifier 发出值或 complete 通知。
结合操作
结合操作(或者称合并操作)指的是将多个 Observable 序列进行组合,拼装成一个新的 Observable 序列。
startWith 该方法会在 Observable 序列开始之前插入一些事件元素。即发出事件消息之前,会先发出这些预先插入的事件消息。
merge 该方法可以将多个(两个或两个以上的)Observable 序列合并成一个 Observable序列。
zip 该方法可以将多个(两个或两个以上的)Observable 序列压缩成一个 Observable 序列。而且它会等到每个 Observable 事件一一对应地凑齐之后再合并。
combineLatest 该方法同样是将多个(两个或两个以上的)Observable 序列元素进行合并。但与 zip 不同的是,每当任意一个 Observable 有新的事件发出时,它会将每个 Observable 序列的最新的一个事件元素进行合并。
withLatestFrom 该方法将两个 Observable 序列合并为一个。每当 self 队列发射一个元素时,便从第二个序列中取出最新的一个值。
switchLatest switchLatest 有点像其他语言的switch 方法,可以对事件流进行转换。比如本来监听的 subject1,我可以通过更改 variable 里面的 value 更换事件源。变成监听 subject2。
算数,以及聚合操作
toArray 该操作符先把一个序列转成一个数组,并作为一个单一的事件发送,然后结束。
reduce 接受一个初始值,和一个操作符号。将给定的初始值,与序列里的每个值进行累计运算。得到一个最终结果,并将其作为单个值发送出去。
concat 会把多个 Observable 序列合并(串联)为一个 Observable 序列。并且只有当前面一个 Observable 序列发出了 completed 事件,才会开始发送下一个 Observable 序列事件。
连接操作
可连接的序列
1.可连接的序列和一般序列不同在于:有订阅时不会立刻开始发送事件消息,只有当调用connect()之后才会开始发送值
2.可连接的序列可以让所有的订阅者订阅后,才开始发出事件消息,从而保证我们想要的所有订阅者都能接收到事件消息
publish 方法会将一个正常的序列转换成一个可连接的序列。同时该序列不会立刻发送事件,只有在调用 connect 之后才会开始。
replay 与 publish 不同在于:新的订阅者还能接收到订阅之前的事件消息(数量由设置的 bufferSize 决定)
multicast 方法还可以传入一个 Subject,每当序列发送事件时都会触发这个 Subject 的发送。
delay 该操作符会将 Observable 的所有元素都先拖延一段设定好的时间,然后才将它们发送出来。
delaySubscription 使用该操作符可以进行延时订阅。即经过所设定的时间后,才对 Observable 进行订阅操作。
materialize 该操作符可以将序列产生的事件,转换成元素。通常一个有限的 Observable 将产生零个或者多个 onNext 事件,最后产生一个 onCompleted 或者onError事件。而 materialize 操作符会将 Observable 产生的这些事件全部转换成元素,然后发送出来。
dematerialize 该操作符的作用和 materialize 正好相反,它可以将 materialize 转换后的元素还原。
timeout 使用该操作符可以设置一个超时时间。如果源 Observable 在规定时间内没有发任何出元素,就产生一个超时的 error 事件。
using 使用 using 操作符创建 Observable 时,同时会创建一个可被清除的资源,一旦 Observable终止了,那么这个资源就会被清除掉了。
错误处理操作
错误处理操作符可以用来帮助我们对 Observable 发出的 error 事件做出响应,或者从错误中恢复。
catchErrorJustReturn 当遇到 error 事件的时候,就返回指定的值,然后结束。
catchError 该方法可以捕获 error,并对其进行处理。
retry 使用该方法当遇到错误的时候,会重新订阅该序列。比如遇到网络请求失败时,可以进行重新连接。retry() 方法可以传入数字表示重试次数。不传的话只会重试一次。
调试操作
debug 我们可以将 debug 调试操作符添加到一个链式步骤当中,这样系统就能将所有的订阅者、事件、和处理等详细信息打印出来,方便我们开发调试。
RxSwift.Resources.total 通过将 RxSwift.Resources.total 打印出来,我们可以查看当前 RxSwift 申请的所有资源数量。这个在检查内存泄露的时候非常有用。
除了Observable,RxSwift 还为我们提供了一些特征序列(Traits):Single、Completable、Maybe、Driver、ControlEvent。
区别:
1.Observable 是能够用于任何上下文环境的通用序列。
2.而 Traits 可以帮助我们更准确的描述序列。同时它们还为我们提供上下文含义、语法糖,让我们能够用更加优雅的方式书写代码。
Single 是 Observable 的另外一个版本。但它不像 Observable 可以发出多个元素,它要么只能发出一个元素,要么产生一个 error 事件。
Single 比较常见的例子就是执行 HTTP 请求,然后返回一个应答或错误。不过我们也可以用 Single 来描述任何只有一个元素的序列。
为方便使用,RxSwift 还为 Single 订阅提供了一个枚举(SingleEvent):
.success:里面包含该Single的一个元素值
.error:用于包含错误
我们可以通过调用 Observable 序列的.asSingle()方法,将它转换为 Single。
Completable 是 Observable 的另外一个版本。不像 Observable 可以发出多个元素,它要么只能产生一个 completed 事件,要么产生一个 error 事件。
为方便使用,RxSwift 为 Completable 订阅提供了一个枚举(CompletableEvent):
.completed:用于产生完成事件
.error:用于产生一个错误
Maybe 同样是 Observable 的另外一个版本。它介于 Single 和 Completable 之间,它要么只能发出一个元素,要么产生一个 completed 事件,要么产生一个 error 事件。
我们可以通过调用 Observable 序列的 .asMaybe()方法,将它转换为 Maybe。
Driver
1.可以说是最复杂的 trait,它的目标是提供一种简便的方式在 UI 层编写响应式代码。
2,如果我们的序列满足如下特征,就可以使用它:
不会产生 error 事件
一定在主线程监听(MainScheduler)
共享状态变化(shareReplayLatestWhileConnected)
ControlProperty
1.是专门用来描述 UI 控件属性,拥有该类型的属性都是被观察者(Observable)
2.ControlProperty 具有以下特征:
不会产生 error 事件
一定在 MainScheduler 订阅(主线程订阅)
一定在 MainScheduler 监听(主线程监听)
共享状态变化
ControlEvent
1.是专门用于描述 UI 所产生的事件,拥有该类型的属性都是被观察者(Observable)。
2.ControlEvent 和 ControlProperty 一样,都具有以下特征:
不会产生 error 事件
一定在 MainScheduler 订阅(主线程订阅)
一定在 MainScheduler 监听(主线程监听)
共享状态变化
调度器
1.调度器(Schedulers)是 RxSwift 实现多线程的核心模块,它主要用于控制任务在哪个线程或队列运行。
2.RxSwift 内置了如下几种 Scheduler:
CurrentThreadScheduler:表示当前线程 Scheduler。(默认使用这个)
MainScheduler:表示主线程。如果我们需要执行一些和 UI 相关的任务,就需要切换到该 Scheduler运行。
SerialDispatchQueueScheduler:封装了 GCD 的串行队列。如果我们需要执行一些串行任务,可以切换到这个 Scheduler 运行。
ConcurrentDispatchQueueScheduler:封装了 GCD 的并行队列。如果我们需要执行一些并发任务,可以切换到这个 Scheduler 运行。
OperationQueueScheduler:封装了 NSOperationQueue。
subscribeOn决定数据序列的构建函数在哪个 Scheduler 上运行。
observeOn该方法决定在哪个 Scheduler 上监听这个数据序列。
网友评论