美文网首页
RxSwift - Observable序列的创建方式

RxSwift - Observable序列的创建方式

作者: 恍然如梦_b700 | 来源:发表于2020-06-11 19:19 被阅读0次

    1:emty

    首先来一个空的序列 - 本来序列事件是Int类型的,这里调用emty函数 没有序列,只能complete

    print("********emty********")
    let emtyOb = Observable<Int>.empty()
    _ = emtyOb.subscribe(onNext: { (number) in
        print("订阅:",number)
    }, onError: { (error) in
        print("error:",error)
    }, onCompleted: {
        print("完成回调")
    }) {
        print("释放回调")
    }
    
    
    • 这种方式不常用,但是我们以点及面展开分析
    • 通过源码解析查看
    override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        observer.on(.completed)
        return Disposables.create()
    }
    
    
    • 很明显在订阅的时候,直接observer.on(.completed) 发送了完成信号,非常简洁
    image

    2: just

    • 单个信号序列创建
    • 该方法通过传入一个默认值来初始化,构建一个只有一个元素的Observable队列,订阅完信息自动complete
    • 下面的样例,我们显示地标注出了Observable的类型为Observable<[String]>,即指定了这个 Observable 所发出的事件携带的数据类型必须是String 类型
    print("********just********")
    //MARK:  just
    // 单个信号序列创建
    let array = ["FY_Event","LG_Kody"]
    Observable<[String]>.just(array)
        .subscribe { (event) in
            print(event)
        }.disposed(by: disposeBag)
    
    _ = Observable<[String]>.just(array).subscribe(onNext: { (number) in
        print("订阅:",number)
    }, onError: { (error) in
        print("error:",error)
    }, onCompleted: {
        print("完成回调")
    }) {
        print("释放回调")
    }
    
    
    • 感觉有点数据便利的感觉
    • 这个序列在平时开发里面还是应用挺多的,看看底层源码
    override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        observer.on(.next(self._element))
        observer.on(.completed)
        return Disposables.create()
    }
    
    
    • observer.on(.next(self._element))常规订阅之后就会发送.next事件
    • 之后就会自动发送完成事件,跟我们效果完全吻合
    image

    3:of

    • 此方法创建一个新的可观察实例,该实例具有可变数量的元素。
    • 该方法可以接受可变数量的参数(必需要是同类型的)
    print("********of********")
    //MARK:  of
    // 多个元素 - 针对序列处理
    Observable<String>.of("FY_Event","LG_Kody")
        .subscribe { (event) in
            print(event)
        }.disposed(by: disposeBag)
    
    // 字典
    Observable<[String: Any]>.of(["name":"FY_Event","age":18])
        .subscribe { (event) in
            print(event)
        }.disposed(by: disposeBag)
    
    // 数组
    Observable<[String]>.of(["FY_Event","LG_Kody"])
        .subscribe { (event) in
            print(event)
        }.disposed(by: disposeBag)
    
    
    • 无论字典,数组多个元素都是正常使用
    • 底层源码的结构也是中规中矩
    • 初始化保存调度环境和传入的元素
    • 订阅流程也是利用sink,然后通过mutableIterator迭代器处理发送
    image

    4:from

    • 将可选序列转换为可观察序列。
    • 从集合中获取序列:数组,集合,set 获取序列 - 有可选项处理 - 更安全
    print("********from********")
    // MARK:  from
    Observable<[String]>.from(optional: ["FY_Event","LG_Kody"])
        .subscribe { (event) in
            print(event)
        }.disposed(by: disposeBag)
    
    
    • self._optional = optional底层初始化可选项保存
    • 订阅流程判断是否匹配我们的可选项
    • 发送observer.on(.next(element))序列
    • 随即自动observer.on(.completed)完成序列发送
    image

    5:deferred

    • 返回一个可观察序列,该序列在新观察者订阅时调用指定的工厂函数。
    • 这里有一个需求:动态序列 - 根据外界的标识 - 动态输出
    • 使用deferred()方法延迟Observable序列的初始化,通过传入的block来实现Observable序列的初始化并且返回。
    print("********defer********")
    //MARK:  defer
    var isOdd = true
    _ = Observable<Int>.deferred { () -> Observable<Int> in
        // 这里设计我们的序列
        isOdd = !isOdd
        if isOdd {
            return Observable.of(1,3,5,7,9)
        }
        return Observable.of(0,2,4,6,8)
        }
        .subscribe { (event) in
            print(event)
        }
    
    
    • self._observableFactory = observableFactory初始化保存了这段工厂闭包
    func run() -> Disposable {
        do {
            let result = try self._observableFactory()
            return result.subscribe(self)
        }
        catch let e {
            self.forwardOn(.error(e))
            self.dispose()
            return Disposables.create()
        }
    }
    
    
    • 在订阅流程到sink的时候,把这段工厂闭包执行
    • 有种中间层被包装的感觉
    image

    6:rang

    • 使用指定的调度程序生成并发送观察者消息,生成指定范围内的可观察整数序列。
    print("********rang********")
    //MARK:  rang
    Observable.range(start: 2, count: 5)
        .subscribe { (event) in
            print(event)
        }.disposed(by: disposeBag)
    
    // 底层源码
    init(start: E, count: E, scheduler: ImmediateSchedulerType) {
        self._start = start
        self._count = count
        self._scheduler = scheduler
    }
    
    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == E {
        let sink = RangeSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
    
    
    • 保存序列中第一个整数的值。
    • 保存要生成的顺序整数的数目。
    • 保存调度环境
    if i < self._parent._count {
        self.forwardOn(.next(self._parent._start + i))
        recurse(i + 1)
    }
    else {
        self.forwardOn(.completed)
        self.dispose()
    }
    
    
    • 根据之前保存的信息,数据的状态也不断攀升,然后递归到规定的要求
    image

    7:generate

    • 通过运行产生序列元素的状态驱动循环,使用指定的调度程序运行循环,发送观察者消息,从而生成一个可观察序列。
    • 该方法创建一个只有当提供的所有的判断条件都为 true 的时候,才会给出动作的 Observable 序列。
    • 初始值给定 然后判断条件1判断条件2 会一直递归下去,直到条件1或者条件2不满足
    • 类似 数组遍历循环
    • -参数一initialState: 初始状态。
    • -参数二 condition:终止生成的条件(返回“false”时)。
    • -参数三 iterate:迭代步骤函数。
    • -参数四 调度器:用来运行生成器循环的调度器,默认:CurrentThreadScheduler.instance
    • -返回:生成的序列。
    print("********generate********")
    //MARK:  generate
    Observable.generate(initialState: 0,// 初始值
                        condition: { $0 < 10}, // 条件1
                        iterate: { $0 + 2 })  // 条件2 +2
        .subscribe { (event) in
            print(event)
        }.disposed(by: disposeBag)
    
    // 数组遍历
    let arr = ["FY_Event_1","FY_Event_2","FY_Event_3","FY_Event_4","FY_Event_5","FY_Event_6","FY_Event_7","FY_Event_8","FY_Event_9","FY_Event_10"]
    Observable.generate(initialState: 0,// 初始值
        condition: { $0 < arr.count}, // 条件1
        iterate: { $0 + 1 })  // 条件2 +2
        .subscribe(onNext: {
            print("遍历arr:",arr[$0])
        })
        .disposed(by: disposeBag)
    
    
    image

    8:timer

    • 返回一个可观察序列,该序列使用指定的调度程序运行计时器,在指定的初始相对到期时间过后定期生成一个值。
    • 第一次参数:第一次响应距离现在的时间
    • 第二个参数:时间间隔
    • 第三个参数:线程
    print("********timer********")
    //MARK:  timer
    Observable<Int>.timer(5, period: 2, scheduler: MainScheduler.instance)
        .subscribe { (event) in
            print(event)
        }
    .disposed(by: disposeBag)
    
    // 因为没有指定期限period,故认定为一次性
    Observable<Int>.timer(1, scheduler: MainScheduler.instance)
        .subscribe { (event) in
            print("111111111 \(event)")
        }
        //.disposed(by: disposeBag)
    
    
    • 状态码的不断攀升,间隔时间不断发送响应
    image

    9:interval

    • 返回一个可观察序列,该序列在每个周期之后生成一个值,使用指定的调度程序运行计时器并发送观察者消息。
    print("********interval********")
    //MARK:  interval
    // 定时器
    Observable<Int>.interval(1, scheduler: MainScheduler.instance)
        .subscribe { (event) in
            print(event)
        }
        //.disposed(by: disposeBag)
    
    
    image

    9:repeatElement

    • 使用指定的调度程序发送观察者消息,生成无限重复给定元素的可观察序列。
    print("********repeatElement********")
    //MARK:  repeatElement
    Observable<Int>.repeatElement(5)
        .subscribe { (event) in
            // print("订阅:",event)
        }
        .disposed(by: disposeBag)
    
    
    image

    10:error

    • 返回一个以“error”结束的可观察序列。
    • 这个序列平时在开发也比较常见,请求网络失败也会发送失败信号!
    print("********error********")
    //MARK:  error
    // 对消费者发出一个错误信号
    Observable<String>.error(NSError.init(domain: "lgerror", code: 10086, userInfo: ["reason":"unknow"]))
        .subscribe { (event) in
            print("订阅:",event)
        }
        .disposed(by: disposeBag)
    
    
    image

    11:never

    • 该方法创建一个永远不会发出 Event(也不会终止)的 Observable 序列。
    • 这种类型的响应源 在测试或者在组合操作符中禁用确切的源非常有用
    print("********never********")
    //MARK:  never
    Observable<String>.never()
        .subscribe { (event) in
            print("走你",event)
        }
        .disposed(by: disposeBag)
    print("********never********")
    
    

    [图片上传中...(image-d05944-1591873354261-0)]

    12:create()

    • 该方法接受一个 闭包形式的参数,任务是对每一个过来的订阅进行处理。
    • 下面是一个简单的样例。为方便演示,这里增加了订阅相关代码
    • 这也是序列创建的一般方式,应用非常之多
    let observable = Observable<String>.create{observer in
        //对订阅者发出了.next事件,且携带了一个数据"hangge.com"
        observer.onNext("hangge.com")
        //对订阅者发出了.completed事件
        observer.onCompleted()
        //因为一个订阅行为会有一个Disposable类型的返回值,所以在结尾一定要returen一个Disposable
        return Disposables.create()
    }
    
    //订阅测试
    observable.subscribe {
        print($0)
    }
    
    

    序列的创建也是学习 RxSwift 的根基,有很多时候我遇到很多的BUG,说白了就是根基没有掌握好!

    相关文章

      网友评论

          本文标题:RxSwift - Observable序列的创建方式

          本文链接:https://www.haomeiwen.com/subject/dvkitktx.html