美文网首页
Combine最简流程源码解析

Combine最简流程源码解析

作者: 梦即是幻 | 来源:发表于2020-03-23 09:33 被阅读0次

    Combine

    Combine是Apple出的Functional Reactive Programming (FRP)模式框架,类似很出名的RxSwift

    Combine 框架主要分成发布者 (Publisher)、操作者 (Operator)、和订阅者 (Subscriber) 三大部分。可以通过这三种元素的组装,来建立各式各样的订阅关系 (Subscription)。

    苹果公司创建了自己的FRP框架这点可以看出Combine的重要性,SwiftUI中也大量使用了Combine。

    本篇的目标就是通过下面这段代码背后的源码来了解Combine:

    先说明一下,本篇所有的代码都是参考OpenCombine简化后的,只保留了可以理解原理必须的代码,类型加了前缀😄

    extension YYPublishers {
        func demo() {
            /// 1
            var store = [YYAnyCancellable]()
            
            /// 2
            Just("just").sink(
                receiveCompletion: { print($0) },
                receiveValue: { print($0) }
            ).store(in: &store)
        }
    }
    

    可能还是有点复杂,可以再拆一下,更方便理解:

    extension YYPublishers {
        func demo() {
            /// 1
            var store = [YYAnyCancellable]()
            
            /// 2
            let just = Just("just")
            
            /// 3
            let canceller = just.sink(
                receiveCompletion: { print($0) },
                receiveValue: { print($0) }
            )
            
            /// 4
            canceller.store(in: &store)
        }
    }
    

    先简单分析一下:

    1. 创建YYAnyCancellable对象数组store

    2. 创建只包含一个值,也就是字符串"just"的发布者

    3. 调用just的sink方法来订阅,在它发射字符串或结束时打印出来,同时会返回一个可以提前取消订阅关系的canceller

    4. 将canceller放入store数组中来被管理

    步骤1和4很简单,可以先不用看,后面讲到Cancellable相关时再讲解,我们先来看步骤2,创建发布者Just。

    发布者(Just)

    Just是一个发布者,实现如下:

    public struct Just<Output>: YYPublisher {
        public typealias Failure = Never
    
        public let output: Output
    
        public init(_ output: Output) {
            self.output = output
        }
            
            public func receive<Subscriber: YYSubscriber>(subscriber: Subscriber)
            where Failure == Subscriber.Failure, Output == Subscriber.Input {
            subscriber.receive(
                subscription: Inner(value: output, downstream: subscriber)
            )
        }
    }
    

    看上去很简单,就是一个实现YYPublisher协议的泛型结构体,只有一个属性,保存初始化时传进来的那个值。

    receive方法的细节先不用管,知道它提供让订阅者注册的能力即可。

    再来看Publisher协议:

    public protocol YYPublisher {
        associatedtype Output
        associatedtype Failure: Error
    
        func receive<Subscriber: YYSubscriber>(subscriber: Subscriber)
            where Failure == Subscriber.Failure, Output == Subscriber.Input
    }
    

    也比较简单,只有2个关联类型和一个方法

    • Output:产生值的类型
    • Failure :失败的错误类型,当发布者不产生错误时,可以使用 Never
    • receive方法:提供让订阅者注册的能力

    看到这里,我们对创建Just对象的代码let just = Just("just"),应该是能理解了:

    简单来说就是创建了一个保存字符串"just"的结构体,同时提供了一个receive方法。

    下面就继续来看订阅的代码:

    let canceller = just.sink(
        receiveCompletion: { print($0) },
        receiveValue: { print($0) }
    )
    

    sink方法

    sink是协议YYPublisher的公开扩展方法,也就是说所有发布者都提供了基于闭包的订阅方式:

    extension YYPublisher {
        public func sink(
            receiveCompletion: @escaping (YYSubscribers.Completion<Failure>) -> Void,
            receiveValue: @escaping (Output) -> Void
        ) -> YYAnyCancellable {
            let subscriber = YYSubscribers.Sink(receiveCompletion: receiveCompletion,
                                                receiveValue: receiveValue)
            subscribe(subscriber)
            return YYAnyCancellable(subscriber)
        }
    }
    

    该方法内部会创建一个YYSubscribers.Sink对象,可以看成匿名订阅者,然后通过subscribe方法和订阅者建立关系,再返回YYAnyCancellable对象。

    这里出现了很多陌生的类型和方法:

    • YYAnyCancellable

    • YYSubscribers

    • YYSubscribers.Completion

    • YYSubscribers.Sink

    • subscribe

    我们一个个深入,首先是YYAnyCancellable。

    YYAnyCancellable

    可以提前取消订阅关系的类,实现如下:

    public protocol YYCancellable {
        func cancel()
    }
    
    extension YYAnyCancellable {
        public func store<Cancellables: RangeReplaceableCollection>(
            in collection: inout Cancellables
        ) where Cancellables.Element == YYAnyCancellable {
            collection.append(self)
        }
    
        public func store(in set: inout Set<YYAnyCancellable>) {
            set.insert(self)
        }
    }
    
    public final class YYAnyCancellable: YYCancellable {
        private var _cancel: (() -> Void)?
    
        public init(_ cancel: @escaping () -> Void) {
            _cancel = cancel
        }
        
        public init<Other: YYCancellable>(_ canceller: Other) {
            _cancel = canceller.cancel
        }
    
        public func cancel() {
            _cancel?()
            _cancel = nil
        }
    
        deinit {
            _cancel?()
        }
    }
    

    首先是YYCancellable协议和扩展公开方法store(in:),都很简单,就不说了。

    YYAnyCancellable是实现YYCancellable的类,并且内部会保存一个_cancel闭包,在外部调用cancel方法或该对象释放时,都会被调用。

    看完这里再回去看最开始的1和4,应该是有些能理解了:

    /// 1
    var store = [YYAnyCancellable]()
    
    /// 4
    subscriber.store(in: &store)
    

    另外说明一下,我们应该经常会看到XXX协议,然后对应一个AnyXXX的类或结构体,尤其XXX是泛型时,这就是协议与类型擦除,建议大家可以先了解下这点

    YYSubscribers

    只是个空枚举,其实就是与YYSubscriber协议相关的类型的名称空间,实现如下:

    public enum YYSubscribers {}
    

    当然还有类似的YYPublishers:

    public enum YYPublishers {}
    

    YYSubscribers.Completion

    也只是个简单的枚举,表示正常完成或错误的信号,实现如下:

    extension YYSubscribers {
        public enum Completion<Failure: Error> {
            case finished
            case failure(Failure)
        }
    }
    

    YYSubscribers.Sink

    其实就是一个简单的订阅者,实现了YYSubscriber和YYCancellable协议,实现如下:

    extension YYSubscribers {
        public final class Sink<Input, Failure: Error>: YYSubscriber, YYCancellable {
            
            private var status: YYSubscriptions.Status = .awaiting
            
            public let receiveValue: (Input) -> Void
            public let receiveCompletion: (YYSubscribers.Completion<Failure>) -> Void
    
            public init(
                receiveCompletion: @escaping (YYSubscribers.Completion<Failure>) -> Void,
                receiveValue: @escaping ((Input) -> Void)
            ) {
                self.receiveCompletion = receiveCompletion
                self.receiveValue = receiveValue
            }
    
            public func receive(subscription: YYSubscription) {
                switch status {
                case .subscribed, .terminal:
                    subscription.cancel()
                case .awaiting:
                    status = .subscribed(subscription)
                    subscription.request(.unlimited)
                }
            }
    
            public func receive(_ input: Input) -> YYSubscribers.Demand {
                receiveValue(input)
                return .none
            }
    
            public func receive(completion: YYSubscribers.Completion<Failure>) {
                receiveCompletion(completion)
                status = .terminal
            }
    
            public func cancel() {
                guard case let .subscribed(subscription) = status else {
                    return
                }
    
                subscription.cancel()
                status = .terminal
            }
        }
    }
    

    这里我们应该只能看懂它的初始化流程:保存接收值和完成事件的2个闭包以后用。

    其他的代码都涉及到了陌生的类型:

    • YYSubscription
    • YYSubscriptions
    • YYSubscriptions.Status

    没关系,一个一个来,先看看怎么才算是订阅者。

    YYSubscriber

    表示订阅者的协议,实现如下:

    public protocol YYSubscriber {
        associatedtype Input
        associatedtype Failure: Error
    
        func receive(subscription: YYSubscription)
        func receive(_ input: Input) -> YYSubscribers.Demand
        func receive(completion: YYSubscribers.Completion<Failure>)
    }
    

    关联类型 InputFailure定义了接受的值和错误,

    同时必须实现3个receive方法,表示订阅者从开始订阅到完成的生命周期:

    • receive(subscription: ) - 告诉订阅者,它在发布者上被成功订阅,可以请求值了
    • receive(_ input: ) - 告诉订阅者,发布者产生值了
    • receive(completion:) - 告知订阅者发布者已完成发布,可能是正常发布或发生错误

    后面2个应该很好理解,不用过多的解释。

    只有第一个函数,为啥订阅成功时接收的参数是Subscription,不应该是Publisher么?

    带着疑问我们继续。

    YYSubscription

    表示订阅者与发布者连接的协议,实现如下:

    public protocol YYSubscription: YYCancellable {
        func request(_ demand: YYSubscribers.Demand)
    }
    

    只有一个request方法,还是有点懵逼!!!

    没关系,至少这里我们心里应该有一个概念,订阅者和发布者之间有一个Subscription来处理2者的逻辑。

    继续看下参数YYSubscribers.Demand

    YYSubscribers.Demand

    表示请求的项目数,实现如下:

    extension YYSubscribers {
        public struct Demand {
            internal let rawValue: UInt
    
            internal init(rawValue: UInt) {
                self.rawValue = min(UInt(Int.max) + 1, rawValue)
            }
    
            public static var unlimited: Demand {
                return Demand(rawValue: .max)
            }
    
            public static var none: Demand { return .max(0) }
    
            public static func max(_ value: Int) -> Demand {
                precondition(value >= 0, "demand cannot be negative")
                return Demand(rawValue: UInt(value))
            }
    }
    

    其实就是可以保存数量的结构体,还提供了unlimited,none,max(数量)的静态方式,用来限制订阅者可以获得的值。

    YYSubscriptions

    与YYSubscription协议相关的类型的名称空间,也是个空枚举:

    public enum YYSubscriptions {}
    

    YYSubscriptions.Status

    表示订阅状态的枚举:

    extension YYSubscriptions {
        internal enum Status {
            case awaiting
            case subscribed(YYSubscription)
            case terminal
        }
    }
    
    • awaiting - 等待订阅
    • subscribed - 和某个Subscription建立了联系
    • terminal - 订阅结束

    了解了Subscription相关的含义,现在我们可以回到订阅者YYSubscribers.Sink生命周期相关的实现了。

    YYSubscribers.Sink生命周期

    去掉了初始化相关的代码:

    extension YYSubscribers {
        public final class Sink<Input, Failure: Error>: YYSubscriber, YYCancellable {
            
            private var status: YYSubscriptions.Status = .awaiting
    
            public func receive(subscription: YYSubscription) {
                switch status {
                case .subscribed, .terminal:
                    subscription.cancel()
                case .awaiting:
                    status = .subscribed(subscription)
                    subscription.request(.unlimited)
                }
            }
    
            public func receive(_ input: Input) -> YYSubscribers.Demand {
                receiveValue(input)
                return .none
            }
    
            public func receive(completion: YYSubscribers.Completion<Failure>) {
                receiveCompletion(completion)
                status = .terminal
            }
    
            public func cancel() {
                guard case let .subscribed(subscription) = status else {
                    return
                }
    
                subscription.cancel()
                status = .terminal
            }
        }
    }
    
    1. 订阅者内部维护了一个表示当前状态的属性status,开始是awaiting
    2. receive(subscription: ):订阅成功时
      1. 检查状态,如果ok,就将状态置为subscribed并保存subscription
      2. 然后向subscription发起request请求值
    3. receive(_ input: ):收到值,调用保存的对应闭包
    4. receive(completion:):收到完成事件,调用保存的对应闭包,并将状态置为terminal
    5. cancel():被取消时如果是已经订阅状态,调用subscription的cancel方法,并将状态置为terminal

    省略了一些小细节,不过这下我们对订阅者就应该有比较清晰的了解了,下面继续看订阅关系如何产生的。

    subscribe方法

    回到上面sink的实现,来看看subscribe方法的流程:

    extension YYPublisher {
        public func sink(
            receiveCompletion: @escaping (YYSubscribers.Completion<Failure>) -> Void,
            receiveValue: @escaping (Output) -> Void
        ) -> YYAnyCancellable {
            let subscriber = YYSubscribers.Sink(receiveCompletion: receiveCompletion,
                                                receiveValue: receiveValue)
            subscribe(subscriber)
            return YYAnyCancellable(subscriber)
        }
    }
    

    subscribe是协议YYPublisher的公开方法,作用是告诉发布者有人对你感兴趣,订阅你了,快开始干活吧~

    可以看到内部会调用具体对应发布者对象的receive方法,也就是YYPublisher协议规定必须的方法:

    extension YYPublisher {
        public func subscribe<Subscriber: YYSubscriber>(_ subscriber: Subscriber)
            where Failure == Subscriber.Failure, Output == Subscriber.Input {
            receive(subscriber: subscriber)
        }
    }
    

    这里的Publisher是Just,看看receive的具体实现:

    public struct Just<Output>: YYPublisher {
            public func receive<Subscriber: YYSubscriber>(subscriber: Subscriber)
            where Failure == Subscriber.Failure, Output == Subscriber.Input {
            subscriber.receive(
                subscription: Inner(value: output, downstream: subscriber)
            )
        }
    }
    

    可以看到receive方法做了2件事:

    1. 创建作为subscription的Inner对象
    2. 调用订阅者生命周期方法receive(subscription: ),并将1创建的subscription传递过去

    现在我们对订阅者,发布者如何产生订阅关系应该了解了,不过最后还有一个问题,订阅者是怎么收到值和完成事件的呢?

    这里到调用receive方法就结束了,那答案应该在里面。

    还记得这里的订阅者subscriber是谁吧,就是前面的YYSubscribers.Sink对象,回到它的receive(subscription: )再看看:

    public func receive(subscription: YYSubscription) {
          switch status {
          case .subscribed, .terminal:
              subscription.cancel()
          case .awaiting:
              status = .subscribed(subscription)
              subscription.request(.unlimited)
          }
      }
    

    正常情况下,最后会向subscription发起request请求值。

    而这里的subscription,就是Inner(value: output, downstream: subscriber)对象了:

    subscriber.receive(
        subscription: Inner(value: output, downstream: subscriber)
    )
    

    最后来看看这个Inner。

    Just.Inner

    处理订阅者和发布者Just之间关系的Subscription,实现如下:

    extension YYPublishers.Just {
        final class Inner<Downstream: YYSubscriber>: YYSubscription,
            where Downstream.Input == Output {
            
            private var downstream: Downstream?
            private let value: Output
            
            fileprivate init(value: Output, downstream: Downstream) {
                self.downstream = downstream
                self.value = value
            }
            
            func cancel() {
                downstream = nil
            }
            
            func request(_ demand: YYSubscribers.Demand) {
                demand.assertNonZero()
                guard let downstream = self.downstream else { return }
                self.downstream = nil
                _ = downstream.receive(value)
                downstream.receive(completion: .finished)
            }
        }
    }
    
    • 2有个属性
      • downstream:关联的订阅者,会强引用
      • value:发布者的值
    • request方法:
      1. 校验demand合法性
      2. 如果有订阅者,释放并调用订阅者的receive(value)和receive(completion)方法

    然后订阅者在这2个生命周期方法里面会调用我们步骤3中的2个闭包参数receiveValue和receiveCompletion:

    /// 3
    let subscriber = just.sink(
        receiveCompletion: { print($0) },
        receiveValue: { print($0) }
    )
    

    这样整个流程就总算是走完了😄!!!

    总结

    整个流程有些复杂,最后再来总结一下发布者和订阅者是如何联系在一起的。

    使用sink方式订阅某个发布者时:

    • 它会在内部创建匿名订阅者,再创建订阅对象,并将订阅对象的引用传递给订阅者。
    • 然后,订阅者将向订阅对象请求多个值,再接收这些值。
    • 最后,订阅对象发送完成事件,结束。

    还是看图吧:

    image

    订阅对象的工作是在发布者和订阅者之间进行调解,并确保订阅者获得的值不超过其请求的值,同时还负责保留和释放订阅者。

    理解了这3者的关系以及实现原理,可以让我们使用Combine框架更加得心应手,更进一步自定义发布者,订阅对象和订阅者。

    当然,上面只是Combine框架最基本流程的分析,还有常用的Subject,Operator等着我们~~~

    相关文章

      网友评论

          本文标题:Combine最简流程源码解析

          本文链接:https://www.haomeiwen.com/subject/flyiyhtx.html