Geting Started
基础部分
观察者模式和正常序列的等价性是理解Rx的最重要的事情!
先看下几种形式的序列:
数字序列
--1--2--3--4--5--6--| //正常终止
字符序列
--a--b--a--a--a---d---X //错误终止
前面一些序列是有限的,而另一些序列是无限的,如按钮的点击事件序列
---tap-tap-------tap--->
如果我们把序列语法指定为正则表达式,那它会是这样的
next* (error | completed)?
这就意味着:
- 序列可以有0个或多个元素
- 一旦接收了错误(
error
)或者完成(completed
)事件,该序列便不能产生任何其它元素
enum Event<Element> {
case next(Element) // next element of a sequence
case error(Swift.Error) // sequence failed with error
case completed // sequence terminated successfully
}
class Observable<Element> {
func subscribe(_ observer: Observer<Element>) -> Disposable
}
protocol ObserverType {
func on(_ event: Event<Element>)
}
当一个序列发送完成(completed
)或者错误(error
)事件的时候,计算序列内部的所有资源将全部被释放.
此时,如果想要立即取消序列元素和可用资源的生成,可以在返回的订阅上调用dispose.
但是,如果序列在有限的时间内终止,不调用dispose或者不使用addDisposableTo(disposeBag)
也不会导致任何永久性资源泄露,不过这些资源将一直被使用到序列完成或者返回错误!
如果序列不以某种方式终止,资源将被永久分配,除非dispose被手动调用,或者在disposeBag,takeUntil或其他方式中自动调用.
使用dispose bag或takeUntil操作符是确保资源清理的有效方法。 建议使用它们,即使序列将在有限时间内终止
Disposing
这是另外一种可以终止序列的方式,当我们完成一个序列,并且我们想释放所有分配用于计算即将到来的元素的资源时,我们可以在订阅上调用dispose,例如:
let subscription = Observable<Int>.interval(0.3, scheduler: scheduler)
.subscribe { event in
print(event)
}
Thread.sleep(forTimeInterval: 2.0)
subscription.dispose()
打印结果:
0
1
2
3
4
5
注: 这里大概意思就是创建了一个时间监听序列,0.3秒打印一次Int值,线程睡眠两分钟,再把序列dispose掉.而且,你通常不需要手动调用dispose,手动调用通常是不好的选择,我们可以使用disposeBag,或者takeUntil.
所以print的代码在dispose操作执行以后会不会被执行呢?这取决于scheduler,如果scheduler是串行队列serial scheduler
(例如: MainScheduler
),就不会打印任何信息,如果不是,就会打印.关于scheduler
Dispose Bags
Dispose bags 类似于ARC的回收机制,当一个Dispose bags 被销毁的时候,他将对任何添加到序列的disposables执行dispose.
Dispose bags are used to return ARC like behavior to RX
`注意: 因为Dispose bags 没有dispose方法,所以你无法随时dispose掉包里的资源,如果你想那么做的话,你可以重新创建一个新的Dispose bag 例如:
self.disposeBag = DisposeBag()
这样会清除引用,并且会置空所有的序列资源,说白了就是disposeBag重新赋值的时候就会自动清理掉资源.
Take until
这是另一种自动dispose掉序列的方式例如:
sequence
.takeUntil(self.rx.deallocated)
.subscribe {
print($0)
}
操作符如下:
4B9735F7-C9B2-42E8-9834-401162880E92.pngImplicit Observable guarantees
所有的序列生产者必须要保证,无论他们再哪个线程上生成元素,只要他们生成了一个元素,并将这个元素发送给观察者(observer)observer.on(.next(nextElement))
,那么,在observer.on方法执行结束之前他们是不能再次发送下一个元素的.例如:
someObservable
.subscribe { (e: Event<Element>) in
print("Event processing started")
// processing
print("Event processing ended")
}
//打印结果
Event processing started
Event processing ended
Event processing started
Event processing ended
Event processing started
Event processing ended
//而不会出现这样
Event processing started
Event processing started
Event processing ended
Event processing ended
创建我们自己的Observable(也称Observable序列)
当一个Observable被创建的时候,它并不会执行任何操作,因为你只是创建了它,而没有去订阅(subscribe
)他,一旦subscribe被调用,那么这个序列开始生成.
例如:
func searchWikipedia(searchTerm: String) -> Observable<Results> {}
let searchForMe = searchWikipedia("me")
//此时未执行任何操作
let cancel = searchForMe
//此时开始生成序列,URL请求被触发
.subscribe(onNext: { results in
print(results)
})
创建的Observeble序列的方式有很多, creat
函数就是最简单的一种
just
just可以创建一个序列,在订阅的时候返回一个元素,以下是just的具体实现过程:
func myJust<E>(element: E) -> Observable<E> {
return Observable.create { observer in
observer.on(.next(element))
observer.on(.completed)
return Disposables.create()
}
}
myJust(0)
.subscribe(onNext: { n in
print(n)
})
//打印结果 0
看起来不错,所以,究竟creat
函数是什么呢?
它只是一个便利方法,你可以使用Swfit闭包来轻松实现subscribe方法,就像subscribe方法,它需要一个参数,观察者,并且返回一个disposeble.
这种生成序列的方式实际上是同步的,这种方法生成元素,给了订阅者以后就自动结束了,不会等待调用者结束,因为返回什么样的disposeble并不重要,生成元素的过程是不会被打断的.
当生成一个同步的序列时,通常返回的disposeble是NopDisposable的单例对象.
下面让我们创建一个返回数组元素的Observable:
func myFrom<E>(sequence: [E]) -> Observable<E> {
return Observable.create { observer in
for element in sequence {
observer.on(.next(element))
}
observer.on(.completed)
return Disposables.create()
}
}
let stringCounter = myFrom(["first", "second"])
print("Started ----")
// first time
stringCounter
.subscribe(onNext: { n in
print(n)
})
print("----")
// again
stringCounter
.subscribe(onNext: { n in
print(n)
})
print("Ended ----")
打印结果:
//
Started ----
first
second
----
first
second
Ended ----
创建一个执行工作的Observable
我们自己创建一个interval 操作符,前面的例子已经提到过了
func myInterval(_ interval: TimeInterval) -> Observable<Int> {
return Observable.create { observer in
print("Subscribed")
let timer = DispatchSource.makeTimerSource(queue: DispatchQueue.global())
timer.scheduleRepeating(deadline: DispatchTime.now() + interval, interval: interval)
let cancel = Disposables.create {
print("Disposed")
timer.cancel()
}
var next = 0
timer.setEventHandler {
if cancel.isDisposed {
return
}
observer.on(.next(next))
next += 1
}
timer.resume()
return cancel
}
}
let counter = myInterval(0.1)
print("Started ----")
let subscription = counter
.subscribe(onNext: { n in
print(n)
})
Thread.sleep(forTimeInterval: 0.5)
subscription.dispose()
print("Ended ----")
打印结果:
//
Started ----
Subscribed
0
1
2
3
4
Disposed
Ended ----
每个订阅者订阅时通常会生成它自己的单独的元素序列.默认情况下,运算符是无状态的.实际生产中,大多数的操作符是无状态的.(事实上,我们更需要的也是无状态的操作符,因为这会避免序列复杂度的指数级增加
)
未完待续...
网友评论