美文网首页
Swift Combine 之 Publisher数据流

Swift Combine 之 Publisher数据流

作者: Smile_Later | 来源:发表于2022-10-26 18:03 被阅读0次

    Publisher

    • 发布源协议,可以实现该协议来实现自己的数据,
      • Subject继承自Publisher,提供了三套默认的内置实现类
    • 容器包装类,具体实现交由Subscriber实现类来转发数据流
    • 内置提供各种各样的操作符(函数式编程的仿函数,Swift语言的Operator)
      • allSatisfy
      • tryAllSatisfy
      • compactMap
      • contains
      • filter
      • tryFilter
      • throttle
      • ....等等
    • Publisher接口
        public protocol Publisher {
        /// 数据输出流,相当于订阅者的数据输入流        
        associatedtype Output
        /// 数据发布,要么发布一个真实数据流,要么发布一个错误(或者可以选择丢弃错误,      Never忽略错误) 
        associatedtype Failure: Error
     
         /// 接收数据源输入流,并转发给订阅者
        func receive<Subscriber: OpenCombine.Subscriber>(subscriber: Subscriber) where Failure == Subscriber.Failure, Output ==       Subscriber.Input
      }
    
    • 所有的操作符的流程都类似,追踪一个操作符的调用顺序和触发流程
    • 定义一个数组的数据流 [1, 2, 10000].publisher, publisherSequence的一个扩展,内部使用Publishers.Sequence进行了包装成了一个可以被观察的数据流
     extension Sequence {
         public var publisher: Publishers.Sequence<Self, Never> {
             return .init(sequence: self)
         }
    - `Publishers.Sequence` 内部实现
     /// 序列流继承于`Publisher` 
     public struct Sequence<Elements: Swift.Sequence, Failure: Error>: Publisher {
             /// 输出源
             public typealias Output = Elements.Element
     
             public let sequence: Elements
     
             public init(sequence: Elements) {
                 self.sequence = sequence
             }
             /// 实现 `Publisher`协议方法`receive`
             public func receive<Downstream: Subscriber>(subscriber: Downstream)
                 where Failure == Downstream.Failure,
                       Elements.Element == Downstream.Input
             {
                 /// 包装类`Inner`实现了`Subscription`协议,内部持有当前收到的`subscriber`, 进行转发
                 let inner = Inner(downstream: subscriber, sequence: sequence)
     
                 /// 判断是否序列是否到末尾,如果序列结束发送完成事件并取消序列,数据流完成,反之,持续接收数据流
                 if inner.isExhausted {
                     subscriber.receive(subscription: Subscriptions.empty)
                     subscriber.receive(completion: .finished)
                     inner.cancel()
                 } else {
                     /// 内部会调用`Subscription`协议的 `request`方法 
                     subscriber.receive(subscription: inner)
                 }
             }
         }
    
    • Inner作为一个私有类,单独实现了序列数据流的内部数据源的流转(每一个操作符都有一套内部特有的Inner实现类),源码我进行了部分简化
      private final class Inner<Downstream: Subscriber, Elements: Sequence, Failure>
             : Subscription
             where Downstream.Input == Elements.Element,
                   Downstream.Failure == Failure
         {
             
             typealias Iterator = Elements.Iterator
             typealias Element = Elements.Element
     
             private var sequence: Elements?
             private var downstream: Downstream?
             private var iterator: Iterator
             private var next: Element?
             private var pendingDemand = Subscribers.Demand.none
     
             /// 初始化持有的`downstream`数据流,方便后续数据流转
             fileprivate init(downstream: Downstream, sequence: Elements) {
                 self.sequence = sequence
                 self.downstream = downstream
                 self.iterator = sequence.makeIterator()
                 next = iterator.next()
             }
             
             func request(_ demand: Subscribers.Demand) {
                 guard downstream != nil else {
                     return
                 }
     
                 while let downstream = self.downstream, pendingDemand > 0 {
                     if let current = self.next {
                         /// 迭代数据流,依次进行数据的转发,交给订阅者接收
                         let additionalDemand = downstream.receive(current)
                     }
     
                     if next == nil {
                         self.downstream = nil
                         self.sequence = nil
                         /// 序列结束,发送完成事件
                         downstream.receive(completion: .finished)
                         return
                     }
                 }
             }
         }
    
    • 上面定义了数据流源,并发出了数据,等待订阅者监听数据流,sinkassign操作符可以进行订阅,后续会列出sink和assign的源码

    代码演示片段

             class Root: NSObject {
                 var name: String = ""
             }
             
             let root = Root()
             
             let arr: [Int] = [1, 2, 100]
             
             /// 将数组转换成一个数据流
             arr.publisher
                 /// 过滤数据流中大于2的元素
                 .filter{$0 > 2}
                 /// 进行一次转换,转成String类型
                 .compactMap{"\($0)"}
                 /// `Sink`订阅数据源
                 .sink { value in
                     debugPrint("数据流: \(value)")
                 }.store(in: &cancel)
             
              /// 使用keypath进行赋值
             arr.publisher
                 .filter{$0 > 2}
                 .compactMap{"\($0)"}
                 /// `Assign` keypath 赋值
                 .assign(to: \.name, on: root).store(in: &cancel)
             
             debugPrint("root name: \(root.name)")
     
            /// 控制台输出
            "数据流: 100"
            "root name: 100"
    

    相关文章

      网友评论

          本文标题:Swift Combine 之 Publisher数据流

          本文链接:https://www.haomeiwen.com/subject/iaittdtx.html