一、串行 (concat)
let queueA = PublishSubject<String>()
let queueB = PublishSubject<String>()
let queueC = PublishSubject<String>()
enum E:Error {
case demoAError
case demoBError
}
Concat 串行合并多个事件,处理完 queneA 才会处理 queueB
1、结合方式
1.1、 let sequece = Observable.concat([queueA.asObserver(), queueB.asObserver()])
1.2、let sequece = queueA.concat(queueB)
2、生命周期
2.1、合成后observable,在所有 Sub Observable 都正常结束了,合成后的observable 才会正常结束。
let sequece = Observable.concat([queueA.asObserver(), queueB.asObserver()])
_ = sequece.subscribe(onNext: { (Str) in
print(Str)
}, onError: nil, onCompleted: {
print("Completed")
}, onDisposed: {
print("Disposed")
})
queueA.onNext("A1")
queueA.onCompleted()
queueB.onNext("B1")
queueB.onCompleted()
2.1.1、
2.2、只要当前只在订阅的 Sub Observable 发生 Error, 合成后的 Observable 也会发生 Error, 离开作用域会被 Dispose 。 但是不会执行 onCompleted
queueA.onNext("A1")
queueA.onError(E.demoAError)
queueA.onCompleted()
queueB.onNext("B1")
queueB.onCompleted()
2.2.1、
2.3、在 "排队中" 的 Sub observable 发生 Error, 并不影响定于中的observable。不会执行 onCompletd, 但是回执 Error 回调
queueA.onNext("A1")
queueA.onNext("A2")
queueB.onNext("B1")
queueB.onError(E.demoBError)
queueA.onNext("A3")
queueA.onCompleted()
queueB.onCompleted()
2.3.1、
并行(merge)
并行合并多个事件
1、结合方式
1.1 未指定订阅数量
let sequence = Observable.of(queueA.asObserver(),queueB.asObserver()).merge()
1.2 指定订阅服务
let sequence = Observable.of(queueA.asObserver(),queueB.asObserver()).merge(maxConcurrent: 2)
2、生命周期
2.1 合成后的Observable, 在所有的 Sub observalbe 都正常结束了, 合成后的 Observable 才会正常结束
_ = Observable.of(queueA.asObserver(),queueB.asObserver()).merge(maxConcurrent: 2).subscribe(onNext: { (Str) in
print(Str)
}, onError: nil, onCompleted: {
print("onCompleted")
}) {
print("onDisposed")
}
queueA.onNext("A1")
queueB.onNext("B1")
queueA.onNext("A2")
queueB.onNext("B2")
queueA.onCompleted()
queueB.onCompleted()
2.1.1、
2.2 合成后的observable, 在所有的 Sub Observable 中只要一个发生Error, 就会指定Error, 并在离开作用域时释放, 并不会执行 onCompleted 方法。
queueA.onNext("A1")
queueB.onNext("B1")
queueA.onNext("A2")
queueA.onError(E.demoAError)
queueB.onNext("B2")
queueA.onCompleted()
queueB.onCompleted()
2.2.1、
3、指定同时订阅数量 maxconcurrent
3.1、指定的 maxconcurrent 为 1, 下一个事件也需要等当前订阅 completed 完成后才能订阅
_ = Observable.of(queueA.asObserver(),queueB.asObserver(),queueC.asObserver()).merge(maxConcurrent: 1).subscribe(onNext: { (Str) in
print(Str)
}, onError: nil, onCompleted: {
print("onCompleted")
}) {
print("onDisposed")
}
queueA.onNext("A1")
queueA.onCompleted()
queueB.onNext("B1")
queueB.onCompleted()
queueC.onNext("C1")
queueC.onCompleted()
3.1.1、
3.2、maxconcurrent 为1 和 concat 的是效果是一样的
// merge
_ = Observable.of(queueA.asObserver(),queueB.asObserver(),queueC.asObserver()).merge(maxConcurrent: 1).subscribe(onNext: { (Str) in
print(Str)
}, onError: nil, onCompleted: {
print("onCompleted")
}) {
print("onDisposed")
}
queueA.onNext("A1")
queueB.onNext("B1")
queueA.onCompleted()
queueB.onNext("B2")
queueB.onCompleted()
queueC.onNext("C1")
queueC.onCompleted()
// concat
_ = Observable.concat([queueA.asObserver(),queueB.asObserver(),queueC.asObserver()]).subscribe(onNext: { (str) in
print(str)
}, onError: nil, onCompleted: {
print("onCompleted")
}, onDisposed: {
print("onDisposed")
})
queueA.onNext("A1")
queueB.onNext("B1")
queueA.onCompleted()
queueB.onNext("B2")
queueB.onCompleted()
queueC.onNext("C1")
queueC.onCompleted()
3.2.1、
3.3 指定maxconcurrent 为 2
_ = Observable.of(queueA.asObserver(),queueB.asObserver(),queueC.asObserver()).merge(maxConcurrent: 2).subscribe(onNext: { (Str) in
print(Str)
}, onError: nil, onCompleted: {
print("onCompleted")
}) {
print("onDisposed")
}
queueA.onNext("A1")
queueA.onNext("A2")
queueB.onNext("B1")
queueB.onNext("B2")
queueA.onCompleted()
queueB.onCompleted()
queueC.onNext("C1")
queueC.onNext("C2")
queueC.onCompleted()
3.3.1、
网友评论