美文网首页RxSwift源码解析
Rx 从 SinkDisposer 一窥线程安全

Rx 从 SinkDisposer 一窥线程安全

作者: 狼性刀锋 | 来源:发表于2018-09-26 17:17 被阅读17次
    // SinkDisposer
    fileprivate final class SinkDisposer: Cancelable {
        fileprivate enum DisposeState: UInt32 {
            case disposed = 1
            case sinkAndSubscriptionSet = 2
        }
    
        // Jeej, swift API consistency rules
        fileprivate enum DisposeStateInt32: Int32 {
            case disposed = 1
            case sinkAndSubscriptionSet = 2
        }
        
        private var _state: AtomicInt = 0
        private var _sink: Disposable? = nil
        private var _subscription: Disposable? = nil
    
        var isDisposed: Bool {
            return AtomicFlagSet(DisposeState.disposed.rawValue, &_state)
        }
    
        func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
            _sink = sink
            _subscription = subscription
    
            let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)
            if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
                rxFatalError("Sink and subscription were already set")
            }
    
            if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
                sink.dispose()
                subscription.dispose()
                _sink = nil
                _subscription = nil
            }
        }
        
        func dispose() {
            let previousState = AtomicOr(DisposeState.disposed.rawValue, &_state)
    
            if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
                return
            }
    
            if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
                guard let sink = _sink else {
                    rxFatalError("Sink not set")
                }
                guard let subscription = _subscription else {
                    rxFatalError("Subscription not set")
                }
    
                sink.dispose()
                subscription.dispose()
    
                _sink = nil
                _subscription = nil
            }
        }
    }
    
    
    
    

    首先我们要分析下需求,哪些操作是需要保证线程安全的。 显然_state的设置是要保证线程安全的, 那么与之相关的读写操作都是要加锁的,因此 isDisposed,setSinkAndSubscription,dispose都是需要加锁的。RX 通过Atomic Operation 保证操作的原子性。这里值的注意的是通过or设置具体标志位,通过&操作检测具体相应标志位。

    setSinkAndSubscription 为例子:

          let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)
            if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
                rxFatalError("Sink and subscription were already set")
            }
    
            if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
                sink.dispose()
                subscription.dispose()
                _sink = nil
                _subscription = nil
            }
    
    
    

    现假设执行setSinkAndSubscription_state = 0bxy

    那么执行 let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)

    previousState = 0bxy

    _state = 0bxy | 0b10 = 0b1y

    也就是说这个操作最终导致_state 的第二位置1
    previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue

    0bxy & 0b10 = 0bx0

    x = 1 则最终结果为0b10,否则则为0b00, 再通过if语句即可检测第二位是否为0.

    这里还有一个小坑:

            if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
                 print("triger  disposed in setSinkAndSubscription function \(self) \n \(Thread.current)")
                sink.dispose()
                subscription.dispose()
                _sink = nil
                _subscription = nil
            }
    
    

    在检测完setSinkAndSubscription flag之后,立马又检测 disposed flag 如果为真则立即执行dispose操作,也就是说一旦set disposed flag,则再设置setSinkAndSubscription 则是无效操作,这里我不确定具体是什么情况会触发这个操作,不过我跑了下单元测试,确实某些case是会触发这种情况,以下便是一个触发该case的单元测试:

        func test1323() {
            func performSharingOperatorsTest(share: @escaping (Observable<Int>) -> Observable<Int>) {
                _ = share(Observable<Int>.create({ observer in
                        observer.on(.next(1))
                        Thread.sleep(forTimeInterval: 0.1)
                        observer.on(.completed)
                        return Disposables.create()
                    })
                    .flatMap { (int) -> Observable<Int> in
                        return Observable.create { (observer) -> Disposable in
                            DispatchQueue.global().async {
                                observer.onNext(int)
                                observer.onCompleted()
                            }
                            return Disposables.create()
                        }
                    })
                    .subscribe { (e) in
                    }
            }
    
            for op in [
                { $0.share(replay: 0, scope: .whileConnected) },
                { $0.share(replay: 0, scope: .forever) },
                { $0.share(replay: 1, scope: .whileConnected) },
                { $0.share(replay: 1, scope: .forever) },
                { $0.share(replay: 2, scope: .whileConnected) },
                { $0.share(replay: 2, scope: .forever) },
                ] as [(Observable<Int>) -> Observable<Int>] {
                performSharingOperatorsTest(share: op)
            }
        }
    
    

    但是具体怎么原理还有待细究。

    相关资料:

    property属性的atomic和nonatomic区别

    理解Memory Barrier(内存屏障)

    相关文章

      网友评论

        本文标题:Rx 从 SinkDisposer 一窥线程安全

        本文链接:https://www.haomeiwen.com/subject/gjrjoftx.html