美文网首页
RxSwift(6)-高阶函数

RxSwift(6)-高阶函数

作者: xxxxxxxx_123 | 来源:发表于2020-04-21 19:30 被阅读0次

映射

map

map用于将可观察序列中的元素进行新的转换,返回新的观察序列。

let ob = Observable.of(1,2,3,4)
ob.map { (number) -> Int in
    return number*2
}
.subscribe{
    print("\($0)")
}
.disposed(by: disposeBag)

运行结果如下:

next(2)
next(4)
next(6)
next(8)
completed

那么map是如何实现呢?我们来看看源码:

首先调用map,会调用以下函数:

public func map<Result>(_ transform: @escaping (Element) throws -> Result)
    -> Observable<Result> {
    return self.asObservable().composeMap(transform)
}

经过一系列的调用之后,最终会来到Map这个类中调用其的初始化方法,这个时候source就是观察序列,而transform则是map后面的闭包。

final private class Map<SourceType, ResultType>: Producer<ResultType> {
    typealias Transform = (SourceType) throws -> ResultType

    private let _source: Observable<SourceType>

    private let _transform: Transform

    init(source: Observable<SourceType>, transform: @escaping Transform) {
        self._source = source
        self._transform = transform
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == ResultType {
        let sink = MapSink(transform: self._transform, observer: observer, cancel: cancel)
        let subscription = self._source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }
}

Map继承自Producer,当map出来的结果调用subscribe方法的时候,会调用Producersubscribe方法,然后在回调中会调用Map.run(),最终在MapSinkon方法中进行响应。

public func subscribe(_ on: @escaping (Event<Element>) -> Void)
    -> Disposable {
    let observer = AnonymousObserver { e in
        on(e)
    }
    return self.asObservable().subscribe(observer)
}

final private class MapSink<SourceType, Observer: ObserverType>: Sink<Observer>, ObserverType {
    typealias Transform = (SourceType) throws -> ResultType

    typealias ResultType = Observer.Element 
    typealias Element = SourceType
    
    // map的尾随闭包
    private let _transform: Transform

    init(transform: @escaping Transform, observer: Observer, cancel: Cancelable) {
        self._transform = transform
        super.init(observer: observer, cancel: cancel)
    }

    func on(_ event: Event<SourceType>) {
        switch event {
        case .next(let element):
            do {
                // 先使用_transform对element进行变换
                // 然后发送事件
                let mappedElement = try self._transform(element)
                self.forwardOn(.next(mappedElement))
            }
            catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }
        case .error(let error):
            self.forwardOn(.error(error))
            self.dispose()
        case .completed:
            self.forwardOn(.completed)
            self.dispose()
        }
    }
}

这里的处理是先使用之前保存的_transform对当前元素进行变化,然后将事件发送出去。

flatMap

将可观察序列的每个元素映射转换成一个可观察序列,并将所得的可观察序列合并为一个可观察序列。一般用在当Observable的元素本身拥有其他的Observable时,可以将所有子Observables的元素作为事件发送出来。

func flatMapTest() {
    let first = BehaviorSubject.init(value: "1")
    let second = BehaviorSubject.init(value: "A")
    let rable = BehaviorRelay.init(value: first) // 切换监听哪一个序列
        
    rable.asObservable()
        .flatMap { $0 }
        .subscribe(onNext: { (element) in
            print("==\(element)==")
        }).disposed(by: disposeBag)
        
    first.onNext("2")
    rable.accept(second) 
    second.onNext("B")
    first.onNext("3")
}

控制台输出:

==1==
==2==
==A==
==B==
==3==

flatMapFirst

将可观察序列的每个元素映射转换成一个可观察序列Observables,然后取这些Observables中的第一个。

func flatMapTest() {
    let first = BehaviorSubject.init(value: "1")
    let second = BehaviorSubject.init(value: "A")
    let rable = BehaviorRelay.init(value: first)
        
    rable.asObservable()
        .flatMapFirst { $0 }
        .subscribe(onNext: { (element) in
            print("==\(element)==")
        }).disposed(by: disposeBag)
        
    first.onNext("2")
    rable.accept(second)
    second.onNext("B")
    first.onNext("3")
}

控制台输出:

==1==
==2==
==3==

flatMapLatest

将可观察序列的每个元素映射转换成一个可观察序列Observables,然后取这些Observables中的最后一个。

func flatMapTest() {
    let first = BehaviorSubject.init(value: "1")
    let second = BehaviorSubject.init(value: "A")
    let rable = BehaviorRelay.init(value: first)
        
    rable.asObservable()
        .flatMapLatest { $0 }
        .subscribe(onNext: { (element) in
            print("==\(element)==")
        }).disposed(by: disposeBag)
        
    first.onNext("2")
    rable.accept(second)
    second.onNext("B")
    first.onNext("3")
}

控制台输出:

==1==
==2==
==A==
==B==

组合

startWith

startWith用于将元素插入可观察序列的头部

func startWithTest() {
    Observable.of("1", "2", "3", "4")
    .startWith("5")
    .startWith("6", "7")
    .subscribe(onNext: { (element) in
            print("==\(element)==")
    })
    .disposed(by: disposeBag)
}

控制台输出:

==6==
==7==
==5==
==1==
==2==
==3==
==4==

源码实现:

final private class StartWith<Element>: Producer<Element> {
    let elements: [Element]
    let source: Observable<Element>

    init(source: Observable<Element>, elements: [Element]) {
        self.source = source
        self.elements = elements
        super.init()
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        // 遍历发出
        for e in self.elements {
            observer.on(.next(e))
        }

        return (sink: Disposables.create(), subscription: self.source.subscribe(observer))
    }
}

concat

连接所有内部可观察序列,只要先前的可观察序列成功终止即可。也就是说前面的序列只要成功调用onCompleted方法,就可以连接后面的序列。concat连接是按顺序执行的。

func concatTest() {
    let first = BehaviorSubject.init(value: "1")
    let second = BehaviorSubject.init(value: "A")
    let rable = BehaviorRelay.init(value: first)
        
    rable.asObservable()
    .concat()
    .subscribe(onNext: { (element) in
        print("==\(element)==")
    })
    .disposed(by: disposeBag)
        
    first.onNext("2")
    second.onNext("B")
    rable.accept(second)
    first.onCompleted()
    second.onNext("C")
}

控制台输出:

==1==
==2==
==B==
==C==

当我们把second.onNext("B")注释掉,控制台输出:

==1==
==2==
==A==
==C==

为什么会有这个区别呢?这是因为concat连接的时候需要等前一个序列执行完毕,也就是说后面序列的发送事件如果在前面序列的onCompleted事件之前,那就只会保留一个,因此会出现上述情况。

merge

将源可观察序列中的元素组合成一个新的可观察序列,如果某一个Observable发出一个onError事件,那么被合并的Observable也会将它发出,并且立即终止序列。

func mergeTest() {
    let sub1 = PublishSubject<String>()
    let sub2 = PublishSubject<String>()
        
    Observable.of(sub1, sub2)
    .merge()
    .subscribe(onNext: { (element) in
        print("==\(element)==")
    })
    .disposed(by: disposeBag)
        
    sub1.onNext("1")
    sub2.onNext("A")
    sub1.onNext("2")
    sub1.onNext("3")
    sub2.onNext("B")
}

控制台输出:

==1==
==A==
==2==
==3==
==B==

zip

通过一个函数将多个Observables的元素组合起来,然后将每一个组合的结果作为一个元素发出来。zip能够组合的可观察序列是有限制的,最多不超过8个。最后发出的素数量等于源Observable中元素数量最少的那个,也就是每一个元素的组成必须是和所有源Observable一一对应,如果缺少一个源Observable则不会发出元素。

func zipTest() {
    let stringSubject = PublishSubject<String>()
    let intSubject = PublishSubject<Int>()

    Observable.zip(stringSubject, intSubject) { stringElement, intElement in
            // 合并的方式
            "\(stringElement) \(intElement)"
        }
        .subscribe(onNext: { (element) in
            print("==\(element)==")
        })
        .disposed(by: disposeBag)
    
    stringSubject.onNext("A")
    stringSubject.onNext("B")

    intSubject.onNext(1)
    intSubject.onNext(2)
    stringSubject.onNext("C")
    intSubject.onNext(3)
    stringSubject.onNext("D")
}

控制台输出:

==A 1==
==B 2==
==C 3==

combineLatest

当多个Observables中任何一个发出一个元素,就发出一个元素。这个元素是由这些Observables中最新的元素,通过一个函数组合起来的。

func combineLatestTest() {
    let stringSub = PublishSubject<String>()
    let intSub = PublishSubject<Int>()
    Observable.combineLatest(stringSub, intSub) { strElement, intElement in
            // 组合的方式
            "\(strElement)+\(intElement)"
        }
        .subscribe(onNext: { (element) in
            print("==\(element)==")
        })
        .disposed(by: disposeBag)
    
    stringSub.onNext("A") 
    stringSub.onNext("B")  // B会覆盖A
    intSub.onNext(1)     
    intSub.onNext(2)      
    stringSub.onNext("C") 
    intSub.onNext(3)    
    stringSub.onNext("D") 
}

控制台输出:

==B+1==
==B+2==
==C+2==
==C+3==
==D+3==

可以看出combineLatestzip的相同点是,两者都是子Observables有值才会发出元素,但是combineLatest只要有值过,就会发出新值,并且当只是某一个子Observables有值时,后面的元素会覆盖前面的元素;而zip不会覆盖,必须是成对出现。

switchLatest

将可观察序列发出的元素转换为可观察序列,并从最新的内部可观察序列发出元素。

func switchLatestTest() {
    let sub1 = PublishSubject<String>()
    let sub2 = PublishSubject<String>()
    
    Observable.of(sub1, sub2)
    .switchLatest()
    .subscribe(onNext: { (element) in
        print("==\(element)==")
    })
    .disposed(by: disposeBag)
    
    sub1.onNext("1")
    sub2.onNext("A")
    sub1.onNext("2")
    sub1.onNext("3")
    sub2.onNext("B")
}

控制台输出:

==A==
==B==

scan

在可观察序列上应用累加器函数并返回每个中间结果,指定的种子值用作初始累加器值。

func scanTest() {
    Observable.of(10, 100, 1000)
    .scan(2) { aggregateValue, newValue in
        aggregateValue + newValue // 10 + 2 , 100 + 10 + 2 , 1000 + 100 + 2
    }
    .subscribe(onNext: { (element) in
        print("==\(element)==")
    })
    .disposed(by: disposeBag)
}

控制台输出:

==12==
==112==
==1112==

reduce

将可观察序列中的元素和初始种子值在累加器中累加,输出累加之后的结果。也就是对第一个元素应用一个函数。然后,将结果作为参数填入到第二个元素的应用函数中。以此类推,直到遍历完全部的元素后发出最终结果。

func reduceTest() {
    Observable.of(10, 100, 1000)
    .reduce(2, accumulator: +) // 2+10+100+1000
    .subscribe(onNext: { (element) in
        print("==\(element)==")
    })
    .disposed(by: disposeBag)
}

控制台输出:

==1112==

scan不同的是,scan每一个元素计算出来都会发送一次,reduce只会发出最终结果。

toArray

将可观察序列转换为Single信号,也就是将可观察序列中的元素整合之后作为一个元素发出,然后终止。

func toArrayTest() {
    Observable.range(start: 1, count: 5)
    .toArray()
    .subscribe(onSuccess: { (element) in
        print("==\(element)==")
    })
    .disposed(by: disposeBag)
}

控制台输出

==[1, 2, 3, 4, 5]==

过滤

filter

根据谓词过滤可观察序列的元素。

func filterTest() {
    Observable.of(1,2,3,4,5,6,7,8,9)
    .filter { $0 % 2 == 0 }
    .subscribe(onNext: { (element) in
        print("==\(element)==")
    })
    .disposed(by: disposeBag)
}

控制台输出:

==2==
==4==
==6==
==8==

源码如下:

extension ObservableType {
    public func filter(_ predicate: @escaping (Element) throws -> Bool)
        -> Observable<Element> {
        return Filter(source: self.asObservable(), predicate: predicate)
    }
}

final private class FilterSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
    
    func on(_ event: Event<Element>) {
        switch event {
        case .next(let value):
            do {
                // 谓词过滤
                let satisfies = try self._predicate(value)
                if satisfies {
                    self.forwardOn(.next(value))
                }
            }
            catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }
        case .completed, .error:
            self.forwardOn(event)
            self.dispose()
        }
    }
}

elementAt

将可观察序列中指定索引数的元素发出。

func elementAtTest () {
    Observable.of("A", "B", "C", "D")
    .elementAt(2)
    .subscribe(onNext: { (element) in
        print("==\(element)==")
    })
    .disposed(by: disposeBag)
}

控制台输出:

==C==

distinctUntilChanged

阻止可观察序列发出相同的元素。如果后一个元素和前一个元素是相同的,那么这个元素将不会被发出来。如果后一个元素和前一个元素不相同,那么这个元素才会被发出来。

func distinctUntilChangedTest() {
    Observable.of("1", "2", "2", "2", "3", "3", "4")
    .distinctUntilChanged()
    .subscribe(onNext: { (element) in
        print("==\(element)==")
    })
    .disposed(by: disposeBag)
}

控制台输出

==1==
==2==
==3==
==4==

single

限制可观察序列只发出一个元素,如果没有元素或者元素数量大于1,它将产生一个error事件。

func singleTest() {
    Observable.of("1", "2", "3","4")
    .single()
    .subscribe(onNext: { (element) in
        print("==\(element)==")
    })
    .disposed(by: disposeBag)
}

控制台输出:

==1==
Unhandled error happened: Sequence contains more than one element.
 subscription called from:

take

从可观察序列的开始位置返回指定数量的连续元素。

func takeTest() {
    Observable.of("1", "2", "3","4")
    .take(2)
    .subscribe(onNext: { (element) in
        print("==\(element)==")
    })
    .disposed(by: disposeBag)
}

控制台输出:

==1==
==2==

takeLast

从可观察序列的末尾返回指定数量的连续元素,可能会存在延迟的情况。

func takeLastTest() {
    Observable.of("1", "2", "3","4")
    .takeLast(2)
    .subscribe(onNext: { (element) in
        print("==\(element)==")
    })
    .disposed(by: disposeBag)
}

控制台输出:

==3==
==4==

takeWhile

只要指定条件的值为true,就从可观察序列的开始发出元素。一旦为false则直接发送完成事件。

func takeWhileTest() {
    Observable.of("1", "2", "3","4")
        .takeWhile({ (item) -> Bool in
            return item != "2"
        })
    .subscribe(onNext: { (element) in
        print("==\(element)==")
    })
    .disposed(by: disposeBag)
}

控制台输出:

==1==

上述例子中,虽然我们后面的元素还是不等于"2",但是前面等于"2"的时候就已经发送了完成信息,结束了订阅观察。

takeUntil

忽略掉在第二个可观察序列产生事件后发出的那部分元素。当我们在订阅一个观察序列的时候,一旦参考观察序列发出一个元素或者产生一个终止事件,原理订阅的观察序列会立即终止。

func takeUntilTest() {
    let sourceSequence = PublishSubject<String>()
    let referenceSequence = PublishSubject<String>()
    
    sourceSequence
        .takeUntil(referenceSequence)
        .subscribe(onNext: { (element) in
            print("==\(element)==")
        })
        .disposed(by: disposeBag)
    
    sourceSequence.onNext("A")
    sourceSequence.onNext("B")
    sourceSequence.onNext("C")

    referenceSequence.onNext("1") // 会终止原来的订阅
    
    sourceSequence.onNext("D")
    sourceSequence.onNext("E")
}

控制台输出:

==A==
==B==
==C==

skip

从可观察序列的开始位置跳过指定数量的元素。

func skipTest() {
    Observable.of(1, 2, 3, 4, 5, 6)
    .skip(2)
    .subscribe(onNext: { (element) in
        print("==\(element)==")
    })
    .disposed(by: disposeBag)
}

控制台输出:

==3==
==4==
==5==
==6==

skipWhile

判断可观察元素的开始位置,只要指定条件为true,就绕过可观察序列中对应的元素,然后返回其余元素,如果为false,则正常返回剩余的所有元素。

func skipTest() {
    Observable.of(1, 2, 3, 4, 5, 6)
    .skipWhile { $0 < 3 }
    .subscribe(onNext: { (element) in
        print("==\(element)==")
    })
    .disposed(by: disposeBag)
}

控制台输出:

Observable.of(1, 2, 3, 4, 5, 6) // 3456
Observable.of(6, 5, 4, 3, 2, 1) // 654321

skipUntil

跳过可观察序列开始位置的几个元素,直到参考的可观察序列发出一个元素才会接着发出原来可观察序列的元素。

func skipUntilTest() {
    let sourceSequence = PublishSubject<String>()
    let referenceSequence = PublishSubject<String>()
    
    sourceSequence
        .skipUntil(referenceSequence)
        .subscribe(onNext: { (element) in
            print("==\(element)==")
        })
        .disposed(by: disposeBag)
    
    sourceSequence.onNext("A")
    sourceSequence.onNext("B")
    sourceSequence.onNext("C")

    referenceSequence.onNext("1") // 只发出条件下面的元素
    
    sourceSequence.onNext("D")
    sourceSequence.onNext("E")
}

控制台输出:

==D==
==E==

可以看出,skipUntiltakeUntil正好相反。

错误处理

catchErrorJustReturn

可观察序列发送错误事件,catchErrorJustReturn会返回一个正常的事件,然后再终止观察序列。

func catchErrorJustReturnTest() {
    let errorSub = PublishSubject<String>()
    
    errorSub.catchErrorJustReturn("这是我们自定义的错误")
        .subscribe(onNext: { (element) in
            print("==\(element)==")
        })
        .disposed(by: disposeBag)
    
    errorSub.onNext("A")
    errorSub.onNext("B") 
    errorSub.onError(NSError.init(domain: "错误", code: 10000, userInfo: nil)) // 发error就会调用我们自定义的错误
}

控制台输出:

==A==
==B==
==这是我们自定义的错误==

catchError

拦截一个error事件,将它替换成其他的元素或者一组元素,然后传递给观察者。这样可以使得可观察序列正常运行。

func catchErrorTest() {
    let sub1 = PublishSubject<String>()
    let sub2 = PublishSubject<String>()
    
    sub1.catchError { (element) -> Observable<String> in
        print("==Error==\(element)==")
        return sub2
    }
    .subscribe { (element) in
        print("==\(element)==")
    }
    .disposed(by: disposeBag)
    
    sub1.onNext("A")
    sub1.onNext("B")
    sub1.onError(NSError.init(domain: "错误", code: 10000, userInfo: nil))
    sub2.onNext("1")
    sub2.onNext("2")
}

控制台输出:

==next(A)==
==next(B)==
==Error==Error Domain=错误 Code=10000 "(null)"==
==next(1)==
==next(2)==

多播、链接

publish & connect

publish会将可观察序列转换为可被共享的观察序列,connect则通知可被共享的观察序列可以开始发出元素了。

被共享的观察序列在被订阅后不会发出元素,直到应用链接connect为止,也就是说publish将多次订阅共享链接之后,才会发出事件。这样一来我们就可以控制可观察序列在什么时候开始发出元素。

func multicastTest() {
    // *** multicast : 将源可观察序列转换为可连接序列,并通过指定的主题广播其发射。
    
    let pOb = Observable<Any>.create { (observer) -> Disposable in
            print("==开始发送事件了==")
            observer.onNext("发送广播事件")
            return Disposables.create {
                print("销毁回调了")
            }
        }.publish()
    
    pOb.subscribe(onNext: { (element) in
            print("订阅1==\(element)==")
        })
        .disposed(by: disposeBag)

    pOb.subscribe(onNext: { (element) in
            print("订阅2==\(element)==")
        })
        .disposed(by: disposeBag)
    
    _ = pOb.connect()
}

控制台输出:

==开始发送事件了==
订阅1==发送广播事件==
订阅2==发送广播事件==

源码如下:

在我们创建可观察序列,调用publish方法的时候会进入以下方法:

public func publish() -> ConnectableObservable<Element> {
    return self.multicast { PublishSubject() }
}

public func multicast<Subject: SubjectType>(makeSubject: @escaping () -> Subject)
    -> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
    return ConnectableObservableAdapter(source: self.asObservable(), makeSubject: makeSubject)
}

返回的是一个ConnectableObservableAdapter对象,该对象继承自ConnectableObservable,而ConnectableObservable则是直接继承自Observable,而不是producer,这个继承关系和前面的一些方法是不一样的。那么其订阅方法也就不是producer中的subscribe方法了。

public class ConnectableObservable<Element>
    : Observable<Element>
    , ConnectableObservableType {

    public func connect() -> Disposable {
        rxAbstractMethod()
    }
}

final private class ConnectableObservableAdapter<Subject: SubjectType>
    : ConnectableObservable<Subject.Element> {
    typealias ConnectionType = Connection<Subject>

    fileprivate let _source: Observable<Subject.Observer.Element>
    fileprivate let _makeSubject: () -> Subject

    fileprivate let _lock = RecursiveLock()
    fileprivate var _subject: Subject?

    // state
    fileprivate var _connection: ConnectionType?

    init(source: Observable<Subject.Observer.Element>, makeSubject: @escaping () -> Subject) {
        self._source = source
        self._makeSubject = makeSubject
        self._subject = nil
        self._connection = nil
    }

    // 使用懒加载方法将我们传入的PublishSubject()对象返回
    fileprivate var lazySubject: Subject {
        if let subject = self._subject {
            return subject
        }

        let subject = self._makeSubject()
        self._subject = subject
        return subject
    }

    // 自己实现的订阅方法
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
        return self.lazySubject.subscribe(observer)
    }
}

可以看出,ConnectableObservableAdapter自己实现了subscribe方法,例子中pOb调用的方法就是该方法了,我们再来看看这个方法做了什么。这其中lazySubject就是我们之前外界传入的_makeSubject()创建的对象,也就是PublishSubject()

public final class PublishSubject<Element>
    : Observable<Element>
    , SubjectType
    , Cancelable
    , ObserverType
    , SynchronizedUnsubscribeType  {
    
    public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        self._lock.lock()
        let subscription = self._synchronized_subscribe(observer)
        self._lock.unlock()
        return subscription
    }

    // 订阅 
    func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if let stoppedEvent = self._stoppedEvent {
            observer.on(stoppedEvent)
            return Disposables.create()
        }
        
        if self._isDisposed {
            observer.on(.error(RxError.disposed(object: self)))
            return Disposables.create()
        }
        
        let key = self._observers.insert(observer.on)
        return SubscriptionDisposable(owner: self, key: key)
    }
}

我们再来看看pOb调用connect()方法的相关实现:

final private class ConnectableObservableAdapter<Subject: SubjectType>
    : ConnectableObservable<Subject.Element> {
   
    override func connect() -> Disposable {
        return self._lock.calculateLocked {
            if let connection = self._connection {
                return connection
            }

            let singleAssignmentDisposable = SingleAssignmentDisposable()
            // 创建connection观察者
            let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
            self._connection = connection
            // 订阅观察
            let subscription = self._source.subscribe(connection)
            singleAssignmentDisposable.setDisposable(subscription)
            return connection
        }
    }
}


final private class Connection<Subject: SubjectType>: ObserverType, Disposable {
    typealias Element = Subject.Observer.Element

    private var _lock: RecursiveLock
    // state
    private var _parent: ConnectableObservableAdapter<Subject>?
    private var _subscription : Disposable?
    private var _subjectObserver: Subject.Observer

    private let _disposed = AtomicInt(0)

    init(parent: ConnectableObservableAdapter<Subject>, subjectObserver: Subject.Observer, lock: RecursiveLock, subscription: Disposable) {
        self._parent = parent
        self._subscription = subscription
        self._lock = lock
        self._subjectObserver = subjectObserver
    }

    // 观察者发送事件的时候 最终会来到这里
    // 从这里进入到PublishSubject.on 然后再给订阅者分发
    func on(_ event: Event<Subject.Observer.Element>) {
        if isFlagSet(self._disposed, 1) {
            return
        }
        if event.isStopEvent {
            self.dispose()
        }
        // 发送事件
        self._subjectObserver.on(event)
    }

}

在调用connect()方法的时候也做了一层判断,一旦创建过connection就再也不会创建了。而且在第一次创建的时候,会把connection作为观察者传入订阅的方法中。按照之前订阅流程,发送事件的时候,必然会调用Connectionon方法,而最终会让我们之前懒加载的lazySubject对象调用on方法。也就是发送事件的观察者只有一个,也就是lazySubject

上述例子中,如果我们去掉publish()connect(),则就成为了普通的可观察序列,此时运行程序的结果如下:

==开始发送事件了==
订阅1==发送广播事件==
==开始发送事件了==
订阅2==发送广播事件==

因为我们在订阅方法中,每一次订阅都创建了一个观察者,所以创建观察序列的回调会走两次。而在多播中,lazySubject只有一个,也就是说观察者是唯一的,事件也就只会发送一次。这样就是多播的含义,一人发送,多人接收。

相关文章

  • RxSwift(四)-- RxSwift几个常用高阶函数介绍

    对于RxSwift的重点学习,我们还得需要知道RxSwift的高阶函数,掌握好了RxSwift的高阶函数,是你通往...

  • RxSwift(6)-高阶函数

    映射 map map用于将可观察序列中的元素进行新的转换,返回新的观察序列。 运行结果如下: 那么map是如何实现...

  • RxSwift 深入浅出(二)高阶函数

    RxSwift 这个框架看我就够了,这一篇我重点介绍高阶函数,掌握好RxSwift的高阶函数,是你通往成功的捷径。...

  • RxSwift学习--高阶函数 / 操作符(上)

    前言 在RxSwift中,高阶函数也可以成为操作符,高阶函数可以帮助我们创建新的序列,或者变化组合原有的序列,从而...

  • RxSwift + MVVM 项目实战

    RxSwift 是什么? 为什么要引入它?它有什么优点、好处呢? 函数式编程:利用高阶函数,即将函数作为其它函数的...

  • RxSwift高阶函数skip解读

    RxSwift高阶函数skip解读 skip skip的作用:跳过 Observable 中头 n 个元素,只关注...

  • RxSwift #04 | Operators

    Overview RxSwift 提供了多种操作符(Operator),如果你了解 Swift 的高阶函数,比如 ...

  • RxSwift(二)

    一、常用的RxSwift高阶函数 1、just 2、drive 3、combineLatest 序列组合 4、ma...

  • RxSwift(四)高阶函数

    @TOC 我们知道Swift中有很多高阶函数,非常好用,而且效率都很高,如我们经常使用的map,fliter,fl...

  • RxSwift (5)高阶函数

    定义 *在数学和计算机科学中,高阶函数是至少满足下列一个条件的函数:*接受一个或多个函数作为输入*输出一个函数 R...

网友评论

      本文标题:RxSwift(6)-高阶函数

      本文链接:https://www.haomeiwen.com/subject/zfgpihtx.html