美文网首页SwiftUI
Combine-Subscriber

Combine-Subscriber

作者: YungFan | 来源:发表于2020-04-12 16:42 被阅读0次

    Publisher 根据 Subscriber 的请求提供数据。如果没有任何订阅请求,Publisher 不会提供任何数据。所以可以这样说,Subscriber负责向 Publisher 请求数据并接收数据(或失败)。

    Subscriber定义

    public protocol Subscriber: CustomCombineIdentifierConvertible {
    
        /// 可以接收的数据的类型
        associatedtype Input
    
        /// 可以接收的错误类型;如果不接收错误,则使用 `Never`
        associatedtype Failure: Error
    
        /// Publisher调用此方法以提供订阅
        func receive(subscription: Subscription)
    
        /// Publisher调用此方法发布新的数据
        func receive(_ input: Self.Input) -> Subscribers.Demand
    
        /// Publisher调用此方法发送错误或完成事件
        func receive(completion: Subscribers.Completion<Self.Failure>)
    }
    

    其中 InputFailure分别表示了 Subscriber 能够接受的数据类型和错误类型,如果不接收错误,则使用Never

    Subscription

    连接 Publisher 和 Subscriber 是 Subscription,其定义如下:

    public protocol Subscription: Cancellable, CustomCombineIdentifierConvertible {
        ///  告诉 Publisher 可以发送多少个数据到 Subscriber
        func request(_ demand: Subscribers.Demand)
    }
    

    Back pressure
    Combine 约定 Subscriber 控制数据流,因此它可以同时控制整个流程中发生的所有操作,这个特性称之为Back pressure。Subscription 中的request方法就体现了这种特性,它返回值是一个Subscribers.Demand,设置接受数据的最大值,但是在每次收到新的数据以后都可以调整这个值,且这个值是累加的。

    发布与订阅流程

    发布与订阅流程.png
    1. Subscriber 通过调用Publisher.subscribe来告诉 Publisher 开始订阅。
    2. Publisher 通过调用Subscriber.receive(subscription:)发送确认信息给 Subscriber。这个方法接收一个 Subscription。
    3. Subscriber 通过调用 2 中创建的 Subscription 上的request(_: Demand)方法来首次告诉 Publisher 需要事件的事件的最大值。
    4. Publisher 通过调用Subscriber.·receive(_: Input)发送 1 个数据或者事件给 Subscriber 。
    5. 同4
    6. Publisher 通过调用Subscriber.receive(completion :)向 Subscriber 发送 completion 完成事件。这里的 completion 可以是正常.finished,也可以是.failure的,如果是.failure的会携带一个错误信息。注意:如果中途取消了订阅,Publisher 将不发送完成事件。

    自定义

    自己实现一个 Subscriber,写完以后对 Publisher 和 Subscriber 之间的关系会更加明晰。

    // 1 通过数组创建一个Publisher
    let publisher = [1,2,3,4,5,6].publisher
    
    // 2 自定义一个Subscriber
    class CustomSubscriber: Subscriber {
        // 3 指定接收值的类型和错误类型
        typealias Input = Int
        typealias Failure = Never
    
        // 4 Publisher首先会调用该方法
        func receive(subscription: Subscription) {
            // 接收订阅的值不做限制,也可以通过.max()设置最大值
            subscription.request(.unlimited)
        }
    
        // 5 接受到值时的方法,返回接收值的最大个数变化
        func receive(_ input: Int) -> Subscribers.Demand {
            // 打印出接收到的值
            print("Received value", input)
            // 返回.none,意思就是不改变最大接收数量,也可以通过.max()设置增大多少
            return .none
        }
    
        // 6 实现接收到完成事件的方法
        func receive(completion: Subscribers.Completion<Never>) {
            print("Received completion", completion)
        }
    }
    // 订阅Publisher
    publisher.subscribe(CustomSubscriber())
    
    /*输出
    Received value 1
    Received value 2
    Received value 3
    Received value 4
    Received value 5
    Received value 6
    Received completion finished
     */
    

    内置Subscriber

    • Sink
    • Assign

    Sink

    在闭包中处理数据或 completion 事件。

    // 1 Just发送单个数据
    let publisher = Just(1)
    // 2 sink订阅
    publisher.sink(receiveCompletion: { _ in
        print("receiveCompletion")
    }, receiveValue: { value in
        print(value)
    })
    
    /* 输出
     1
     receiveCompletion
     */
    

    Assign

    • 属性写入数据。
    • 它接受一个class对象以及对象类型上的某个KeyPath。会将 Publisher 的 Output 数据设置到对应的属性上去。
    // 1 创建对象
    class Student {
        var name: String = ""
    }
    let stu = Student()
    
    // 2 Just发送单个数据
    let publisher = Just("Hello Combine")
    // 3 assign订阅,设置到foo的bar属性上
    publisher.assign(to: \.name, on: stu)
    
    print(stu.name)
    
    /* 输出
    Hello Combine
    */
    

    Cancellable

    Combine 中提供了Cancellable这个协议,里面只定义了一个cancel方法,用于提前结束订阅流程。SinkAssign都实现了Cancellable 协议,所以可以调用cancel方法来取消订阅。另外 Combine 中还定义了AnyCancellable类,它也实现了 Cancellable 协议,这个类会在deinit时自动执行cancel方法。

    protocol Cancellable {
        func cancel()
    }
    

    应用

    场景一:模拟用户取消上传数据。

    let request = URLRequest(url: URL(string: "https://xxxxx")!)
    let image = UIImage(named: "largeImage")
    let imgFile: Data = image!.pngData()!
    
    // 上传Publisher
    let downloadPublisher = Future<Data?, Never> { promise in
        URLSession.shared.uploadTask(with: request, from: imgFile) { (data, _, _) in
            promise(.success(data))
        }.resume()
    }
    
    // 订阅
    let subscription = downloadPublisher.sink { data in
        print("Received data: \(data)")
    }
    
    // 可以在完成之前调用cancel取消任务
    subscription.cancel()
    

    场景二:模拟网络原因导致的网络请求中断。

    let dataPublisher = URLSession.shared.dataTaskPublisher(for: URL(string: "https://www.baidu.com")!)
    
    let cancellableSink = dataPublisher
        .sink(receiveCompletion: { completion in
            switch completion {
            case .finished:
                print("received finished")
                break
            case .failure(let error):
                print("received error: ", error)
            }}, receiveValue: { someValue in
                print(".sink() received \(someValue)")
        })
    
    // 可以取消
    cancellableSink.cancel()
    

    相关文章

      网友评论

        本文标题:Combine-Subscriber

        本文链接:https://www.haomeiwen.com/subject/qtkbmhtx.html