美文网首页
RxSwift的使用

RxSwift的使用

作者: sws1314 | 来源:发表于2023-12-10 09:26 被阅读0次
    private func demo1(){
            print("**********  just  ***************")
            
            let disposeBag = DisposeBag()
            Observable.just("🔴")
                .subscribe { event in
                    print(event)
                }
                .disposed(by: disposeBag)
            
            /** 输出:
             next(🔴)
             completed
             */
            print("**********  just  ***************")
            
    
            print("**********  of  ***************")
            Observable.of("🐶", "🐱", "🐭", "🐹")
                .subscribe(onNext: { element in
                    print(element)
                })
                .disposed(by: disposeBag)
            
            /** 输出:
             🐶
             🐱
             🐭
             🐹
             */
            print("**********  of  ***************")
            
            
            //MARK: - from
            print("**********  from  ***************")
            Observable.from(["🐶", "🐱", "🐭", "🐹"])
                .subscribe(onNext: { print($0) })
                .disposed(by: disposeBag)
            
            /** 输出:
             🐶
             🐱
             🐭
             🐹
             */
            print("**********  from  ***************")
            
            
            //MARK: - create
            print("**********  create  ***************")
            
            let myJust = { (element: String) -> Observable<String> in
                return Observable.create { observer in
                    observer.on(.next(element))
                    observer.on(.completed)
                    return Disposables.create()
                }
            }
                
            myJust("🔴")
                .subscribe { print($0) }
                .disposed(by: disposeBag)
            
            /** 输出:
             next(🔴)
             completed
             */
            print("**********  create  ***************")
            
            
            //MARK: - range
            print("**********  range  ***************")
            
            Observable.range(start: 1, count: 10)
                .subscribe {
                    print($0)
                }.disposed(by: disposeBag)
            /** 输出:
             next(1)
             next(2)
             next(3)
             next(4)
             next(5)
             next(6)
             next(7)
             next(8)
             next(9)
             next(10)
             completed
             */
            print("**********  range  ***************")
            
            
            //MARK: - repeatElement
            print("**********  repeatElement  ***************")
            Observable.repeatElement("🔴")
                .take(3)
                .subscribe {
                    if case .next(let value) = $0 {
                        print(value)
                    }
                }.disposed(by: disposeBag)
            /** 输出:
             🔴
             🔴
             🔴
             */
            print("**********  repeatElement  ***************")
            
            
            //MARK: - generate
            print("**********  generate  ***************")
            Observable.generate(initialState: 0, condition: { $0 < 3 }, iterate: { $0 + 1 })
                .subscribe {
                    if case .next(let value) = $0 {
                        print(value)
                    }
                }.disposed(by: disposeBag)
            /** 输出:
             0
             1
             2
             */
            print("**********  generate  ***************")
            
            
            //MARK: - deferred
            print("**********  deferred  ***************")
            
            var count = 1
            let deferredSequence = Observable<String>.deferred {
                print("Creating \(count)")
                
                count += 1
                
                return Observable.create { observer in
                    print("Emitting....")
                    observer.onNext("🐶")
                    observer.onNext("🐱")
                    observer.onNext("🐵")
                    return Disposables.create()
                }
            }
            deferredSequence.subscribe {
                if case .next(let value) = $0 {
                    print(value)
                }
            }.disposed(by: disposeBag)
            
    
            deferredSequence.subscribe{
                if case .next(let value) = $0 {
                    print(value)
                }
            }.disposed(by: disposeBag)
            /** 输出:
             Creating 1
             Emitting....
             🐶
             🐱
             🐵
             Creating 2
             Emitting....
             🐶
             🐱
             🐵
             */
            print("**********  deferred  ***************")
            
            
            //MARK: - error
            print("**********  error  ***************")
            Observable<Int>.error(RxError.unknown)
                .subscribe {
                    print($0)
                }.disposed(by: disposeBag)
            
            /** 输出:
             error(Unknown error occurred.)
             */
            print("**********  error  ***************")
            
            
            //MARK: - doOn
            print("**********  doOn  ***************")
            Observable.of("🍎", "🍐", "🍊", "🍋")
                .do(onNext: { print("Intercepted:", $0)}, afterNext: { print("Intercepted after: ", $0) }, onError: {
                    print("Intercepted error: ", $0)
                }, afterError: {  print("Intercepted after error: ", $0)}, onCompleted: { print("Completed") }, afterCompleted: {
                    print("After completed")
                }).subscribe{ print($0)}.disposed(by: disposeBag)
            /** 输出:
             Intercepted: 🍎
             next(🍎)
             Intercepted after:  🍎
             Intercepted: 🍐
             next(🍐)
             Intercepted after:  🍐
             Intercepted: 🍊
             next(🍊)
             Intercepted after:  🍊
             Intercepted: 🍋
             next(🍋)
             Intercepted after:  🍋
             Completed
             completed
             After completed
             */
            print("**********  doOn  ***************")
            
            
            //MARK: - PublishSubject
            /*!
             从订阅时间开始向所有观察者广播新事件。
             */
            print("**********  PublishSubject  ***************")
            print("从订阅时间开始向所有观察者广播新事件。")
            let subject = PublishSubject<String>()
    
            subject.subscribe{
                print("订阅者1: \($0)")
            }.disposed(by: disposeBag)
            subject.onNext("🐶")
            subject.onNext("🐱")
    
            subject.subscribe{
                print("订阅者2: \($0)")
            }.disposed(by: disposeBag)
            subject.onNext("🅰️")
            subject.onNext("🅱️")
            /** 输出:
             订阅者1: next(🐶)
             订阅者1: next(🐱)
             订阅者1: next(🅰️)
             订阅者2: next(🅰️)
             订阅者1: next(🅱️)
             订阅者2: next(🅱️)
             */
            print("**********  PublishSubject  ***************")
            
            //MARK: - ReplaySubject
            /**
             向所有订阅者广播新事件,并将先前事件的指定bufferSize数量广播给新订阅者。
             */
            print("**********  ReplaySubject  ***************")
            print("向所有订阅者广播新事件,并将先前事件的指定bufferSize数量广播给新订阅者。")
            let subject1 = ReplaySubject<String>.create(bufferSize: 1)
            
            subject1.subscribe{
                print("订阅者1: \($0)")
            }.disposed(by: disposeBag)
            subject1.onNext("🐶")
            subject1.onNext("🐱")
            
            subject1.subscribe{
                print("订阅者2: \($0)")
            }.disposed(by: disposeBag)
            subject1.onNext("🅰️")
            subject1.onNext("🅱️")
            /** 输出:
             订阅者1: next(🐶)
             订阅者1: next(🐱)
             订阅者2: next(🐱)
             订阅者1: next(🅰️)
             订阅者2: next(🅰️)
             订阅者1: next(🅱️)
             订阅者2: next(🅱️)
             */
            print("**********  ReplaySubject  ***************")
            
    
    
            //MARK: - BehaviorSubject
            /**
             向所有订阅者广播新事件,并向新订阅者广播最近的(或初始)值。
             */
            print("**********  BehaviorSubject  ***************")
            print("向所有订阅者广播新事件,并向新订阅者广播最近的(或初始)值。")
            let subject2 = BehaviorSubject(value: "🔴")
            
            subject2.subscribe{
                print("订阅者1: \($0)")
            }.disposed(by: disposeBag)
            subject2.onNext("🐶")
            subject2.onNext("🐱")
            
            subject2.subscribe{
                print("订阅者2: \($0)")
            }.disposed(by: disposeBag)
            subject2.onNext("🅰️")
            subject2.onNext("🅱️")
            /** 输出:
             订阅者1: next(🔴)
             订阅者1: next(🐶)
             订阅者1: next(🐱)
             订阅者2: next(🐱)
             订阅者1: next(🅰️)
             订阅者2: next(🅰️)
             订阅者1: next(🅱️)
             订阅者2: next(🅱️)
             */
            print("**********  BehaviorSubject  ***************")
        }
        
        //MARK: - 基础使用
        private func demo2(){
            let disposeBag = DisposeBag()
            
            
            //MARK: - startWith
            print("**********  startWith  ***************")
            print("在开始从源Observable中发射元素之前,先发射指定的元素序列。")
            Observable.of("🐶", "🐱", "🐭", "🐹")
                .startWith("1️⃣")
                .startWith("2️⃣")
                .startWith("3️⃣", "🅰️", "🅱️")
                .subscribe(onNext: { print($0) })
                .disposed(by: disposeBag)
            /** 输出:
             3️⃣
             🅰️
             🅱️
             2️⃣
             1️⃣
             🐶
             🐱
             🐭
             🐹
             */
            print("**********  startWith  ***************")
           
            
            //MARK: - merge
            print("**********  merge  ***************")
            print("将源Observable序列中的元素组合成一个新的Observable序列,并像每个源Observable序列发出的那样发出每个元素。")
            let subject1 = PublishSubject<String>()
            let subject2 = PublishSubject<String>()
            
            Observable.of(subject1, subject2)
                .merge()
                .subscribe{
                    print($0)
                }.disposed(by: disposeBag)
            
            
            subject1.onNext("🅰️")
            
            subject1.onNext("🅱️")
            
            subject2.onNext("①")
            
            subject2.onNext("②")
            
            subject1.onNext("🆎")
            
            subject2.onNext("③")
            
            /** 输出:
             next(🅰️)
             next(🅱️)
             next(①)
             next(②)
             next(🆎)
             next(③)
             */
            print("**********  merge  ***************")
    
            //MARK: - zip
            print("**********  zip  ***************")
            print("最多可将8个源可观察序列合并为一个新的可观察序列,并从合并后的可观察序列中发出对应索引处每个源可观察序列中的元素。")
            print("特点: 必定是成双成对的, 不会叉位")
            /**
             next(("🅰️", 1))
             next(("🅱️", 2))
             next(("🆎", 3))
             */
            let stringSubject = PublishSubject<String>()
            let intSubject = PublishSubject<Int>()
            
            Observable.zip(stringSubject, intSubject)
                .subscribe{
                    print($0)
                }.disposed(by: disposeBag)
            stringSubject.onNext("🅰️")
            stringSubject.onNext("🅱️")
            
            intSubject.onNext(1)
            
            intSubject.onNext(2)
            
            stringSubject.onNext("🆎")
            intSubject.onNext(3)
            
            /** 输出:
             next(("🅰️", 1))
             next(("🅱️", 2))
             next(("🆎", 3))
             */
            print("**********  zip  ***************")
            
            
            //MARK: - combineLatest
            print("**********  combineLatest  ***************")
            print("将最多8个源可观察序列合并为一个新的可观察序列,并在所有源可观察序列发射至少一个元素后,以及当任何源可观察序列发射一个新元素时,开始从合并后的可观察序列中发射每个源可观察序列的最新元素。")
            
            let stringSubject1 = PublishSubject<String>()
            let intSubject1 = PublishSubject<Int>()
            
            Observable.combineLatest(stringSubject1, intSubject1) { stringElement, intElement in
                "\(stringElement) ----  \(intElement)"
            }.subscribe{
                print($0)
            }.disposed(by: disposeBag)
            
            stringSubject1.onNext("🅰️")
            
            stringSubject1.onNext("🅱️")
            intSubject1.onNext(1)
            
            intSubject1.onNext(2)
            
            stringSubject1.onNext("🆎")
            
            /** 输出:
             next(🅱️ ----  1)
             next(🅱️ ----  2)
             next(🆎 ----  2)
             */
            print("**********  combineLatest  ***************")
            
            
            //MARK: - combineLatest --- Array
            print("**********  combineLatest --- Array   ***************")
            
            let stringSubject02 = Observable.just("❤️")
            let fruitObservable = Observable.from(["🍎", "🍐", "🍊"])
            let animalObservable = Observable.of("🐶", "🐱", "🐭", "🐹")
            
            Observable.combineLatest([stringSubject02, fruitObservable, animalObservable]) {
                "\($0[0]) --- \($0[1]) --- \($0[2])"
            }.subscribe{
                print($0)
            }.disposed(by: disposeBag)
            
            /** 输出:
             next(❤️ --- 🍎 --- 🐶)
             next(❤️ --- 🍐 --- 🐶)
             next(❤️ --- 🍐 --- 🐱)
             next(❤️ --- 🍊 --- 🐱)
             next(❤️ --- 🍊 --- 🐭)
             next(❤️ --- 🍊 --- 🐹)
             completed
             */
            print("**********  combineLatest --- Array  ***************")
            
            
            //MARK: - switchLatest
            print("**********  switchLatest   ***************")
            print("将Observable序列发出的元素转换为Observable序列,并从最近的内部Observable序列发出元素。")
            let subject11 = BehaviorSubject(value: "⚽️")
            let subject22 = BehaviorSubject(value: "🍎")
            
            let subjectsSubject = BehaviorSubject<Observable>(value: subject1)
            
            subjectsSubject.asObservable()
                .switchLatest()
                .subscribe(onNext: { print($0) })
                .disposed(by: disposeBag)
            
            subject11.onNext("🏈")
            subject11.onNext("🏀")
            subjectsSubject.onNext(subject22)
            subject11.onNext("⚾️")
            subject22.onNext("🍐")
            /** 输出:
             🍎
             🍐
             */
            print("**********  switchLatest   ***************")
    
            
            //MARK: - withLatestFrom
            print("**********  withLatestFrom   ***************")
            print("通过将第一个源中的每个元素与第二个源中的最新元素(如果有的话)结合,将两个可观察序列合并为一个可观察序列。")
            let foodSubject = PublishSubject<String>()
            let drinksSubject = PublishSubject<String>()
    
            foodSubject.asObservable()
                .withLatestFrom(drinksSubject) {
                    "\($0)  +  \($1)"
                }.subscribe{
                    print($0)
                }.disposed(by: disposeBag)
            
            foodSubject.onNext("🥗")
            
            drinksSubject.onNext("☕️")
            foodSubject.onNext("🥐")
            
            drinksSubject.onNext("🍷")
            foodSubject.onNext("🍔")
            
            foodSubject.onNext("🍟")
            
            drinksSubject.onNext("🍾")
            /** 输出:
             next(🥐  +  ☕️)
             next(🍔  +  🍷)
             next(🍟  +  🍷)
             */
            
            print("**********  withLatestFrom   ***************")
            
        }
        
        //MARK: - 组合操作符
        private func demo3(){
            let disposeBag = DisposeBag()
    
            //MARK: - map
            print("**********  map   ***************")
            Observable.of(1, 2, 3)
                .map { $0 * $0 }
                .subscribe(onNext: { print($0) })
                .disposed(by: disposeBag)
            
            Observable.from([2, 3, 4])
                .map {  $0 * $0 }
                .subscribe{
                    print($0)
                }.disposed(by: disposeBag)
            
            print("**********  map   ***************")
            
            
            //MARK: - flatMap and flatMapLatest
            print("**********  flatMap and flatMapLatest   ***************")
            print("将可观察序列发出的元素转换为可观察序列,并将两个可观察序列发出的元素合并为一个可观察序列。这也很有用,例如,当你有一个可观察序列本身发射可观察序列,你希望能够对来自任何一个可观察序列的新发射作出反应。flatMap和flatMapLatest的区别在于,flatMapLatest只会从最近的内部Observable序列中发出元素。")
            
            struct Player{
                init(score: Int) {
                    self.score = BehaviorSubject(value: score)
                }
                
                let score: BehaviorSubject<Int>
            }
            
            let 👦🏻 = Player(score: 80)
            let 👧🏼 = Player(score: 90)
            let player = BehaviorSubject(value: 👦🏻)
            player.asObservable()
                .flatMap { $0.score.asObservable() } // Change flatMap to flatMapLatest and observe change in printed output
                .subscribe(onNext: { print($0) })
                .disposed(by: disposeBag)
            👦🏻.score.onNext(85)
            player.onNext(👧🏼)
            👦🏻.score.onNext(95) // Will be printed when using flatMap, but will not be printed when using flatMapLatest
            👧🏼.score.onNext(100)
            
            /** 输出:
             80
             85
             90
             95
             100
             */
            print("**********  flatMap and flatMapLatest   ***************")
    
            
            //MARK: - scan
            print("**********  scan   ***************")
    
            print("从初始种子值开始,然后对Observable序列发出的每个元素应用累加器闭包,并将每个中间结果作为单元素Observable序列返回。")
                
            Observable.of(10, 100, 1000)
                .scan(0) { aggregateValue, newValue in
                    print("\(aggregateValue) ------ \(newValue)")
                    return aggregateValue + newValue
                }
                .subscribe{
                    print($0)
                }.disposed(by: disposeBag)
            /** 输出:
             0 ------ 10
             next(10)
             10 ------ 100
             next(110)
             110 ------ 1000
             next(1110)
             completed
             */
            print("**********  scan   ***************")
    
        }
        
    
        private func demo4(){
            let disposeBag = DisposeBag()
            
            //MARK: - filter
            print("**********  filter   ***************")
            print("只发出Observable序列中满足指定条件的元素。")
            Observable.of( "🐱", "🐰", "🐶",
                           "🐸", "🐱", "🐰",
                           "🐹", "🐸", "🐱")
            .filter {
                $0 == "🐱"
            }.subscribe {
                print($0)
            }.disposed(by: disposeBag)
            /** 输出:
             next(🐱)
             next(🐱)
             next(🐱)
             completed
             */
            print("**********  filter   ***************")
    
            
            //MARK: - distinctUntilChanged
            print("**********  distinctUntilChanged   ***************")
            print("抑制由Observable序列发出的连续重复元素。")
            Observable.of("🐱", "🐷", "🐱", "🐱", "🐱", "🐵", "🐱")
                .distinctUntilChanged()
                .subscribe{
                    print($0)
                }.disposed(by: disposeBag)
            
            /** 输出:
             next(🐱)
             next(🐷)
             next(🐱)
             next(🐵)
             next(🐱)
             completed
             */
            print("**********  distinctUntilChanged   ***************")
            
            
            //MARK: - elementAt
            print("**********  elementAt   ***************")
            print("只发出可观察序列发出的所有元素中指定索引处的元素。")
            Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
                .element(at: 3)
                .subscribe{ print($0) }
                .disposed(by: disposeBag)
            /** 输出:
             next(🐸)
             completed
             */
            print("**********  elementAt   ***************")
    
            
            //MARK: - single
            print("**********  single   ***************")
            print("只发出Observable序列发出的第一个元素(或第一个满足条件的元素)。如果Observable序列没有发出一个元素,将抛出错误。")
            
            Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
                .single()
                .subscribe{ print($0) }
                .disposed(by: disposeBag)
            /** 输出:
             next(🐱)
             error(Sequence contains more than one element.)
             */
            
            print("**********  single   ***************")
            
            
            //MARK: - single with condaitions
            print("**********  single with condaitions   ***************")
            
            print("有且只有一个满足条件的")
            Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
                .single {  $0 == "🐸"}
                .subscribe {  print($0) }
                .disposed(by: disposeBag)
            /** 输出:
              next(🐸)
              completed
              */
            
            print("有两个满足条件的数据, 且发出第二个的时候会抛出错误")
            Observable.of("🐱", "🐰", "🐶", "🐱", "🐰", "🐶")
                .single {  $0 == "🐰"}
                .subscribe {  print($0) }
                .disposed(by: disposeBag)
            /** 输出:
             next(🐰)
             error(Sequence contains more than one element.)
             */
            
            
            print("没有满足提交的条件, 直接抛出错误")
            Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
                .single { $0 == "🔵" }
                .subscribe { print($0) }
                .disposed(by: disposeBag)
            
            
            print("**********  single with condaitions   ***************")
            
            
            //MARK: - take
            print("**********  take   ***************")
            print("只从Observable序列的开头发出指定数量的元素。")
            
            Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
                .take(3)
                .subscribe{  print($0) }
                .disposed(by: disposeBag)
    
            /** 输出:
             next(🐱)
             next(🐰)
             next(🐶)
             completed
             */
            print("**********  take   ***************")
            
            
            //MARK: - takeLast
            print("**********  takeLast   ***************")
            print("只从Observable序列的末尾发出指定数量的元素。")
            
            Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
                .takeLast(3)
                .subscribe{  print($0) }
                .disposed(by: disposeBag)
    
            /** 输出:
             next(🐸)
             next(🐷)
             next(🐵)
             completed
             */
            print("**********  takeLast   ***************")
            
            
            //MARK: - takeWhile
            print("**********  takeWhile   ***************")
            print("只要指定的条件求值为true,就从Observable序列的开头发出元素。")
            Observable.of(1, 2, 3, 4, 5, 6)
                .take(while: { $0 < 4 })
                .subscribe{ print($0) }
                .disposed(by: disposeBag)
            
            /** 输出:
             next(1)
             next(2)
             next(3)
             completed
             */
            print("**********  takeWhile   ***************")
            
            
            //MARK: - takeUntil
        
            print("**********  takeUntil   ***************")
            print("从源Observable序列发出元素,直到引用Observable序列发出一个元素。")
            
            let sourceSequence = PublishSubject<String>()
            let referenceSequence = PublishSubject<String>()
            
            sourceSequence
                .take(until: referenceSequence)
                .subscribe { print($0) }
                .disposed(by: disposeBag)
            
            sourceSequence.onNext("🐱")
            sourceSequence.onNext("🐰")
            sourceSequence.onNext("🐶")
            
            referenceSequence.onNext("🔴")
            
            sourceSequence.onNext("🐸")
            sourceSequence.onNext("🐷")
            sourceSequence.onNext("🐵")
            
            /** 输出:
             next(🐱)
             next(🐰)
             next(🐶)
             completed
             */
            print("**********  takeUntil   ***************")
    
            
            
            //MARK: - skip
            print("**********  skip   ***************")
            print("禁止从Observable序列的开头发出指定数量的元素。")
            Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
                .skip(2)
                .subscribe(onNext: { print($0) })
                .disposed(by: disposeBag)
            
            /** 输出:
             🐶
             🐸
             🐷
             🐵
             */
            print("**********  skip   ***************")
            
            
            //MARK: - skipWhile
            print("**********  skipWhile   ***************")
            print("禁止从Observable序列的开头发出满足指定条件的元素。")
            Observable.of(1, 2, 3, 4, 5, 6)
                .skip(while: { $0 < 4 })
                .subscribe(onNext: { print($0) })
                .disposed(by: disposeBag)
            
            /** 输出:
             4
             5
             6
             */
            print("**********  skipWhile   ***************")
            
            //MARK: - skipWhileWithIndex
            print("**********  skipWhileWithIndex   ***************")
            print("抑制从Observable序列的开头发出满足指定条件的元素,并发出剩余的元素。闭包还传递每个元素的索引。")
            Observable.of("🐱", "🐰", "🐶", "🐸", "🐷", "🐵")
                .enumerated()
                .skip(while: { (index, element) in
                    index < 3
                })
                .map({ (index, element) in
                    element
                })
                .subscribe(onNext: { print($0) })
                .disposed(by: disposeBag)
            /** 输出:
             🐸
             🐷
             🐵
             */
            print("**********  skipWhileWithIndex   ***************")
            
            
            //MARK: - skipUntil
            print("**********  skipUntil   ***************")
            print("抑制从源Observable序列发出元素,直到引用Observable序列发出元素。")
            
            let sourceSequence1 = PublishSubject<String>()
            let referenceSequence1 = PublishSubject<String>()
            
            sourceSequence1
                .skip(until: referenceSequence1)
                .subscribe(onNext: { print($0) })
                .disposed(by: disposeBag)
            
            sourceSequence1.onNext("🐱")
            sourceSequence1.onNext("🐰")
            sourceSequence1.onNext("🐶")
            
            referenceSequence1.onNext("🔴")
            
            sourceSequence1.onNext("🐸")
            sourceSequence1.onNext("🐷")
            sourceSequence1.onNext("🐵")
            
            /** 输出:
             🐸
             🐷
             🐵
             */
            print("**********  skipUntil   ***************")
            
        }
        
        
        private func demo5(){
            let disposeBag = DisposeBag()
            
    
            //MARK: - toArray
            print("**********  toArray   ***************")
            Observable.range(start: 1, count: 10)
                .toArray()
                .subscribe {
                    if case .success(let value) = $0 {
                        print(value)
                    }
                }
                .disposed(by: disposeBag)
            print("**********  toArray   ***************")
    
            
            //MARK: - reduce
            print("**********  reduce   ***************")
            print("从初始种子值开始,然后对Observable序列发出的所有元素应用accumulator闭包,并将聚合结果作为单元素Observable序列返回。")
            
            Observable.of(10, 100, 1000)
                .reduce(1, accumulator: +)
                .subscribe {
                    print($0)
                }.disposed(by: disposeBag)
            /** 输出:
             next(1111)
             completed
             */
            print("**********  reduce   ***************")
    
            
            //MARK: - concat
            print("**********  concat   ***************")
            print("以顺序的方式连接Observable序列的内部Observable序列中的元素,等待每个序列成功终止,然后再释放下一个序列中的元素.")
    
            let subject1 = BehaviorSubject(value: "🍎")
            let subject2 = BehaviorSubject(value: "🐶")
            
            let subjectsSubject = BehaviorSubject(value: subject1)
            
            subjectsSubject.asObservable()
                .concat()
                .subscribe{
                    print($0)
                }.disposed(by: disposeBag)
            
            subject1.onNext("🍐")
            subject1.onNext("🍊")
    
            subjectsSubject.onNext(subject2)
    
            subject2.onNext("I would be ignored")  /// 这个不会打印出结果, 因为subject1还没有发送completed信号
            subject2.onNext("🐱") /// 这里也不会直接输出, 但是当subject1发送了completed后, 会输出一次subject2最后一次保存的数据,
    
            subject1.onCompleted()
    
            subject2.onNext("🐭")
            
            /** 输出:
             next(🍎)
             next(🍐)
             next(🍊)
             next(🐱)   这里会输出是因为 BehaviorSubject 会保存最后一次信号
             next(🐭)
             */
            print("**********  concat   ***************")
        }
        
        private func demo6() {
            //MARK: - interval 定时器
            print("**********  interval   ***************")
    
            let interval = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
    
            _ = interval.subscribe(onNext: {
                print("Subscription: 1, Event: \($0)")
            })
    
    
            DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + .seconds(5)) {
                _ = interval.subscribe(onNext: {
                    print("Subscription: 2, Event: \($0)")
                })
            }
            print("**********  interval   ***************")
            
        }
        
        private func demo7() {
            //MARK: - publish
            print("**********  publish   ***************")
            print("将源Observable序列转换为可连接序列。 publish()  和  connect() 方法是一起使用的, 否则信号不会开始")
            
            let intSequence = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance).publish()
            
            _ = intSequence.subscribe(onNext: { print("Subscription 1: Event: \($0)") })
            
            DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(2)) {
                _ = intSequence.connect()
            }
            
            DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(4)) {
                _ = intSequence.subscribe(onNext: {
                    print("Subscriotion 2, Event: \($0)")
                })
            }
            
            DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(6)) {
                _ = intSequence.subscribe(onNext: {
                    print("Subscriotion 3, Event: \($0)")
                })
            }
            
            /** 输出:
             Subscription 1: Event: 0
             Subscription 1: Event: 1
             Subscriotion 2, Event: 1
             Subscription 1: Event: 2
             Subscriotion 2, Event: 2
             Subscription 1: Event: 3
             Subscriotion 2, Event: 3
             Subscriotion 3, Event: 3
             Subscription 1: Event: 4
             Subscriotion 2, Event: 4
             Subscriotion 3, Event: 4
             Subscription 1: Event: 5
             Subscriotion 2, Event: 5
             Subscriotion 3, Event: 5
             Subscription 1: Event: 6
             Subscriotion 2, Event: 6
             Subscriotion 3, Event: 6
             Subscription 1: Event: 7
             Subscriotion 2, Event: 7
             Subscriotion 3, Event: 7
             Subscription 1: Event: 8
             Subscriotion 2, Event: 8
             Subscriotion 3, Event: 8
             Subscription 1: Event: 9
             Subscriotion 2, Event: 9
             Subscriotion 3, Event: 9
             ....
             */
            
            print("**********  publish   ***************")
        }
        
        
        private func demo8 () {
            let disposeBag = DisposeBag()
            //MARK: - replay
            
            print("**********  replay   ***************")
            print("将源Observable序列转换为可连接序列,并向每个新订阅者重播先前的排放量bufferSize数。")
    
            let intSequence = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
                .replay(2)
            
            _  = intSequence.subscribe(onNext: {
                print("Subscription 1: Event: \($0)")
            })
            
            
            DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(2)) {
                _  = intSequence.connect()
            }
            
            DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(4)) {
                _  = intSequence.subscribe{
                    print("Subscription 2, Event: \($0)")
                }
            }
            
            DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(8)) {
                _  = intSequence.subscribe{
                    print("Subscription 3, Event: \($0)")
                }
            }
            
            /** 输出:
             **********  replay   ***************
             将源Observable序列转换为可连接序列,并向每个新订阅者重播先前的排放量bufferSize数。
             **********  replay   ***************
             Subscription 1: Event: 0
             Subscription 2, Event: next(0)
             Subscription 1: Event: 1
             Subscription 2, Event: next(1)
             Subscription 1: Event: 2
             Subscription 2, Event: next(2)
             Subscription 1: Event: 3
             Subscription 2, Event: next(3)
             Subscription 1: Event: 4
             Subscription 2, Event: next(4)
             Subscription 3, Event: next(3)
             Subscription 3, Event: next(4)
             Subscription 1: Event: 5
             Subscription 2, Event: next(5)
             Subscription 3, Event: next(5)
             Subscription 1: Event: 6
             Subscription 2, Event: next(6)
             Subscription 3, Event: next(6)
             Subscription 1: Event: 7
             Subscription 2, Event: next(7)
             Subscription 3, Event: next(7)
             Subscription 1: Event: 8
             Subscription 2, Event: next(8)
             Subscription 3, Event: next(8)
             Subscription 1: Event: 9
             Subscription 2, Event: next(9)
             Subscription 3, Event: next(9)
             Subscription 1: Event: 10
             Subscription 2, Event: next(10)
             Subscription 3, Event: next(10)
             Subscription 1: Event: 11
             Subscription 2, Event: next(11)
             Subscription 3, Event: next(11)
             Subscription 1: Event: 12
             Subscription 2, Event: next(12)
             Subscription 3, Event: next(12)
             */
            
            print("**********  replay   ***************")
        }
        
        private func demo9 () {
            //MARK: - multicast
            print("**********  multicast   ***************")
            print("将源可观察序列转换为可连接序列,并通过指定对象广播其发射。")
            
            let subject = PublishSubject<Int>()
            
            _ = subject
                .subscribe(onNext: { print("Subject: \($0)") })
            
            let intSequence = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
                .multicast(subject)
            
            _ = intSequence
                .subscribe(onNext: { print("\tSubscription 1:, Event: \($0)") })
            
            DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(2)) {
                _ = intSequence.connect()
            }
            
            DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(4)) {
                _ = intSequence
                    .subscribe(onNext: { print("\tSubscription 2:, Event: \($0)") })
            }
            
            DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(6)) {
                _ = intSequence
                    .subscribe(onNext: { print("\tSubscription 3:, Event: \($0)") })
            }
            
            /** 输出:
             **********  multicast   ***************
             将源可观察序列转换为可连接序列,并通过指定对象广播其发射。
             **********  multicast   ***************
             Subject: 0
                 Subscription 1:, Event: 0
             Subject: 1
                 Subscription 1:, Event: 1
                 Subscription 2:, Event: 1
             Subject: 2
                 Subscription 1:, Event: 2
                 Subscription 2:, Event: 2
             Subject: 3
                 Subscription 1:, Event: 3
                 Subscription 2:, Event: 3
                 Subscription 3:, Event: 3
             Subject: 4
                 Subscription 1:, Event: 4
                 Subscription 2:, Event: 4
                 Subscription 3:, Event: 4
             Subject: 5
                 Subscription 1:, Event: 5
                 Subscription 2:, Event: 5
                 Subscription 3:, Event: 5
             Subject: 6
                 Subscription 1:, Event: 6
                 Subscription 2:, Event: 6
                 Subscription 3:, Event: 6
             Subject: 7
                 Subscription 1:, Event: 7
                 Subscription 2:, Event: 7
                 Subscription 3:, Event: 7
             Subject: 8
                 Subscription 1:, Event: 8
                 Subscription 2:, Event: 8
                 Subscription 3:, Event: 8
             Subject: 9
                 Subscription 1:, Event: 9
                 Subscription 2:, Event: 9
                 Subscription 3:, Event: 9
             Subject: 10
                 Subscription 1:, Event: 10
                 Subscription 2:, Event: 10
                 Subscription 3:, Event: 10
             */
            print("**********  multicast   ***************")
        }
        
        
        private func demo10() {
            let disposeBag = DisposeBag()
            //MARK: - catchAndReturn
            print("**********  catchAndReturn   ***************")
            print("通过返回一个发出单个元素然后终止的Observable序列,从Error事件中恢复。")
            
            
            let sequenceThatFails = PublishSubject<String>()
            sequenceThatFails
                .catchAndReturn("😊")
                .subscribe {
                    print($0)
                }.disposed(by: disposeBag)
    
            print("**********  catchAndReturn   ***************")
    
        }
    

    相关文章

      网友评论

          本文标题:RxSwift的使用

          本文链接:https://www.haomeiwen.com/subject/lbnugdtx.html