美文网首页RxJava
Rxjava 创造型操作符

Rxjava 创造型操作符

作者: tingtingtina | 来源:发表于2020-09-30 13:50 被阅读0次

    关于操作符的介绍,官网说明的还是非常清楚的,还配有事件流向图。



    上面就是事件上游(被观察者)

    • 箭头表示 Observable 发射消息的时间线;
    • 花花绿绿,形形状状的就是发射的数据;
    • 最后有竖线,就是结束的意思,对应着时间的onComplete,× 对应着 onError

    下面是事件下游(观察者)

    • 时间线是观察者接收消息的时间线
    • 花花绿绿,形形状状的就是接收的数据;

    中间那部分就是功能强大操作符了
    在这里面连接了上游和下游,使他们建立了订阅关系。

    最基础的就是创造型操作符,顾名思义,它们的作用就是创建 Observable,并发送事件

    基本创建

    create

    创建一个被观察者对象,使用者自己使用发射器的 onNext(), onError(), 和 onCompleted() 发送对应消息,下游可以接收。

    有意思的是,这里就没有上游线,因为创造型操作符本身就是创建一个上游,并允许发送消息。

    为了示例看起来更简洁,先写一个创建观察者的方法

    private Observer createObserver() {
        return new Observer<Integer>() {
            // 下游 Observer 观察者 处理事件
            @Override
            public void onSubscribe(Disposable d) {
            }
            @Override
            public void onNext(Integer integer) {
                log("下游处理事件 onNext " + integer);
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onComplete() {
                log("下游处理事件 onComplete");
            }
        };
    }
    
     /** Create 基本創建
      *  ObservableEmitter是事件的发送器,可以发送多个onNext()方法;一旦发送 onComplete(),onError() 事件之后,后续的事件将不会再发送
     */
    public void rx_create() {
        // 上游 Observable 被观察者
        Observable.create(new ObservableOnSubscribe<Integer>() {
            // 发射器 发射事件
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                log("上游发射事件");
                // 发射事件
                emitter.onNext(1);
                emitter.onNext(2);
                log("上游发射完成");
            }
        }).subscribe(createObserver());
    }
    
    // 结果
    上游发射事件
    下游处理事件 onNext 1
    下游处理事件 onNext 2
    上游发射完成
    

    快速创建

    create 是使用自己的发射器发送的,RxJava 中也提供了更加快速的创建方式

    Just

    内部会自己发射数据,发射之后会再发一个 onComplete, 这里面支持发送多个数据
    // just 会依次发射,最多发送 10 个
    public void rx_just() {
        // 上游 Observable 被观察者
        // 内部会先发射 1, 再发射 2
        Observable.just(1, 2).subscribe(createObserver());
    }
    // 结果
    下游处理事件 onNext 1
    下游处理事件 onNext 2
    下游处理事件 onComplete
    

    from: fromArray/fromIterable

    内部自己发射的,数集对象/迭代器
    public void rx_formArray() {
        // 上游 Observable 被观察者
        Integer[] array = {1, 2, 3, 4, 5};
        Observable.fromArray(array).subscribe(createObserver());
    }
    
    // 结果
    下游处理事件 onNext 1
    下游处理事件 onNext 2
    下游处理事件 onNext 3
    下游处理事件 onNext 4
    下游处理事件 onNext 5
    下游处理事件 onComplete
    

    range

    发射一个范围内的有序数列,可以指定范围的起始和长度
    public void rx_range() {
        // 从 1 开始加 数量 5个 (1,2,3,4,5)
        Observable.range(1,5).subscribe(createObserver());
    }
    
    // 结果
    下游处理事件 onNext 1
    下游处理事件 onNext 2
    下游处理事件 onNext 3
    下游处理事件 onNext 4
    下游处理事件 onNext 5
    下游处理事件 onComplete
    

    empty / never / error

    下游默认是 Object,无法发出有值的事件,创建后只会发射 onComplete 创建后,什么都不做 创建后会发射一个 Error 时间,通知异常
    /**
     * 上游没有发射任何事件 无法指定类型,默认 Object Rxjava 泛型默认类型 == Object
     * 做一个耗时操作,不需要任何数据来刷新 UI 只会调用 onComplete
     */
    public void rx_empty() {
        // 不会发射有值的事件
        Observable.empty().subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
            }
            @Override
            public void onNext(Object o) {
                // 没有事件可接收
                log("empty -- > onNext");
            }
            @Override
            public void onError(Throwable e) {
                // 如果是 .error 会在这接收
            }
            @Override
            public void onComplete() {
                // 内部一定会调用 onComplete 事件
                log("empty --- > onComplete");
            }
        });
    }
    // 结果
    empty --- > onComplete
    

    延迟创建

    除了以上创建方法,我们可以为事件的发送添加延迟,支持延迟创建的有下几个操作符。

    timer

    快速创建后,延迟指定时间后,发送1个数值0(Long类型)

    interval

    按照固定时间发射一个无限递增的整数序列。发送的事件序列 = 从0开始、无限递增1的的整数序列,也可以为它添加第一发射数据前的延时时间
    public void rx_interval() {
        log("------ Start ------");
        Observable.interval(1, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        log("下游处理事件 onNext " + aLong);
                    }
                });
    }
    
    结果:每个一秒钟会发射一个数据,从 0 开始 加1 递增

    TIPS
    这里用到了个 新的类 Consumer 其实也是个观察者类,可以看做是 Observer 的简化版,里面只有一个方法 accpet 相当于 Observer 中的 onNext

    intervalRange

    每隔指定时间就发送事件,可指定发送数据的数量,与 interval 类似,但可以指定发送数据的数量。它们都支持设置第一次延时时间

    public void rx_intervalRange() {
        log("------ Start ------");
        // start 开始累积, count 累积多少个数量, initialDelay 开始等待的时间, period 每隔多久执行, TimeUnit 时间单位
        Observable.intervalRange(1, 5, 1, 2, TimeUnit.SECONDS)
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        log("accept: " + aLong);
                    }
                });
    }
    
    结果,数据从 1 开始 发送 5 个就是 1,2, 3, 4, 5,最后单位是 秒,开始延迟1s, 每个 2s 发送一次数据。

    defer

    直到有观察者(Observer)订阅时,才动态创建被观察者对象及发送事件

    int i = 1;
    public void rx_defer() {
        // 上游 被观察者
        Observable observable = Observable.defer(new Callable<ObservableSource<Integer>>() {
            @Override
            public ObservableSource<Integer> call() throws Exception {
                return Observable.just(i);
            }
        });
        // 再赋值
        i = 10;
        // 订阅
        // 此时,才会调用 defer()创建被观察者对象
        observable.subscribe(createObserver());
    }
    
    // 结果
    下游处理事件 onNext 10
    下游处理事件 onComplete
    

    defer 操作符的入参是一个 Callable 的接口实现,里面的 call 方法返回的是一个 ObservableSource,看过之前手写框架就就已经知道,其实就是返回一个被观察者。通过之前的学习和经验应该能推断出,只有在订阅的时候,才会使用 call 方法,创建一个被观察者对象,然后再用这个对象订阅。

    相关文章

      网友评论

        本文标题:Rxjava 创造型操作符

        本文链接:https://www.haomeiwen.com/subject/lvymyktx.html