美文网首页Swift学习
RXSwift - 合并Observable (concat和m

RXSwift - 合并Observable (concat和m

作者: 内蒙小猿 | 来源:发表于2020-08-20 12:44 被阅读0次

    一、串行 (concat)

    let queueA = PublishSubject<String>()
    let queueB = PublishSubject<String>()
    let queueC = PublishSubject<String>()
    
    
    enum E:Error {
        case demoAError
        case demoBError
    }
    
    

    Concat 串行合并多个事件,处理完 queneA 才会处理 queueB

    1、结合方式

    ​ 1.1、 let sequece = Observable.concat([queueA.asObserver(), queueB.asObserver()])

    ​ 1.2、let sequece = queueA.concat(queueB)

    2、生命周期

    2.1、合成后observable,在所有 Sub Observable 都正常结束了,合成后的observable 才会正常结束。

            let sequece = Observable.concat([queueA.asObserver(), queueB.asObserver()])
            _ = sequece.subscribe(onNext: { (Str) in
                print(Str)
            }, onError: nil, onCompleted: {
                print("Completed")
            }, onDisposed: {
                print("Disposed")
            })
            
            queueA.onNext("A1")
            queueA.onCompleted()
            queueB.onNext("B1")
            queueB.onCompleted()
    
    

    ​ 2.1.1、\color{#DEB887} {执行结果: A1 -> B1 -> onCompleted -> Disposed}

    2.2、只要当前只在订阅的 Sub Observable 发生 Error, 合成后的 Observable 也会发生 Error, 离开作用域会被 Dispose 。 但是不会执行 onCompleted

            queueA.onNext("A1")
            queueA.onError(E.demoAError)
            queueA.onCompleted()
            queueB.onNext("B1")
            queueB.onCompleted()
            
    

    ​ 2.2.1、\color{#DEB887} {执行结果: A1-> demoAError -> Disposed}

    2.3、在 "排队中" 的 Sub observable 发生 Error, 并不影响定于中的observable。不会执行 onCompletd, 但是回执 Error 回调

            queueA.onNext("A1")
            queueA.onNext("A2")
            queueB.onNext("B1")
            queueB.onError(E.demoBError)
            queueA.onNext("A3")
            queueA.onCompleted()
            queueB.onCompleted()
    

    ​ 2.3.1、 \color{#DEB887} {执行结果:A1-> A -> 2 -> A3 -> onCompleted -> Disposed}

    并行(merge)

    并行合并多个事件

    1、结合方式

    1.1 未指定订阅数量

    let sequence = Observable.of(queueA.asObserver(),queueB.asObserver()).merge()

    1.2 指定订阅服务

    let sequence = Observable.of(queueA.asObserver(),queueB.asObserver()).merge(maxConcurrent: 2)

    2、生命周期

    2.1 合成后的Observable, 在所有的 Sub observalbe 都正常结束了, 合成后的 Observable 才会正常结束

     _ = Observable.of(queueA.asObserver(),queueB.asObserver()).merge(maxConcurrent: 2).subscribe(onNext: { (Str) in
                print(Str)
            }, onError: nil, onCompleted: {
                print("onCompleted")
            }) {
                print("onDisposed")
            }
            
            queueA.onNext("A1")
            queueB.onNext("B1")
            queueA.onNext("A2")
            queueB.onNext("B2")
            queueA.onCompleted()
            queueB.onCompleted()
            
    

    ​ 2.1.1、\color{#DEB887} {执行结果: A1 -> B1 -> A2 -> B2 -> onCompleted -> Disposed}

    2.2 合成后的observable, 在所有的 Sub Observable 中只要一个发生Error, 就会指定Error, 并在离开作用域时释放, 并不会执行 onCompleted 方法。

                    queueA.onNext("A1")
            queueB.onNext("B1")
            queueA.onNext("A2")
            queueA.onError(E.demoAError)
            queueB.onNext("B2")
            queueA.onCompleted()
            queueB.onCompleted()
    

    ​ 2.2.1、 \color{#DEB887} {执行结果:A1 -> B1 -> A2 -> demoAError -> Disposed}

    3、指定同时订阅数量 maxconcurrent

    3.1、指定的 maxconcurrent 为 1, 下一个事件也需要等当前订阅 completed 完成后才能订阅

            _ = Observable.of(queueA.asObserver(),queueB.asObserver(),queueC.asObserver()).merge(maxConcurrent: 1).subscribe(onNext: { (Str) in
                print(Str)
            }, onError: nil, onCompleted: {
                print("onCompleted")
            }) {
                print("onDisposed")
            }
            
            queueA.onNext("A1")
            queueA.onCompleted()
            
            queueB.onNext("B1")
            queueB.onCompleted()
            
            queueC.onNext("C1")
            queueC.onCompleted()
    

    ​ 3.1.1、 \color{#DEB887} {执行结果:A1 -> B1 -> C1 -> onCompleted -> Disposed}

    3.2、maxconcurrent 为1 和 concat 的是效果是一样的

    // merge
    
            _ = Observable.of(queueA.asObserver(),queueB.asObserver(),queueC.asObserver()).merge(maxConcurrent: 1).subscribe(onNext: { (Str) in
                print(Str)
            }, onError: nil, onCompleted: {
                print("onCompleted")
            }) {
                print("onDisposed")
            }
               
            queueA.onNext("A1")
            queueB.onNext("B1")
            queueA.onCompleted()
            
            queueB.onNext("B2")
            queueB.onCompleted()
            
            queueC.onNext("C1")
            queueC.onCompleted()
    
    
    // concat
    
            _ = Observable.concat([queueA.asObserver(),queueB.asObserver(),queueC.asObserver()]).subscribe(onNext: { (str) in
                 print(str)
            }, onError: nil, onCompleted: {
                print("onCompleted")
            }, onDisposed: {
                print("onDisposed")
            })
               
            queueA.onNext("A1")
            queueB.onNext("B1")
            queueA.onCompleted()
            
            queueB.onNext("B2")
            queueB.onCompleted()
            
            queueC.onNext("C1")
            queueC.onCompleted()
    

    ​ 3.2.1、 \color{#DEB887} {执行结果都是: A1-> B2 -> C1 -> onCompleted -> onDisposed ,B2 都收不到,因为B2 开始时,A1还没结束。}

    3.3 指定maxconcurrent 为 2

    _ = Observable.of(queueA.asObserver(),queueB.asObserver(),queueC.asObserver()).merge(maxConcurrent: 2).subscribe(onNext: { (Str) in
                print(Str)
            }, onError: nil, onCompleted: {
                print("onCompleted")
            }) {
                print("onDisposed")
            }
            
            queueA.onNext("A1")
            queueA.onNext("A2")
            queueB.onNext("B1")
            queueB.onNext("B2")
            queueA.onCompleted()
            queueB.onCompleted()
            queueC.onNext("C1")
            queueC.onNext("C2")
            queueC.onCompleted()
    

    ​ 3.3.1、 \color{#DEB887} {执行结果:A1 -> A2 -> B1 -> B2 -> C1 -> C2 -> onCompleted -> Disposed}

    相关文章

      网友评论

        本文标题:RXSwift - 合并Observable (concat和m

        本文链接:https://www.haomeiwen.com/subject/whldjktx.html