呕心沥血:RxJava2.x创建操作符

作者: Burning燃烧 | 来源:发表于2019-12-05 14:24 被阅读0次

    RxJava的基本流程以及线程切换可以参考之前的文章

    https://www.jianshu.com/p/2adaea7237c4

    1、序言

    RxJava除了拥有逻辑简洁的事件流链式调用,使用简单外其丰富的操作符基本可以满足日常开发中的各种实现逻辑

    Rx的基本操作符分类

    RxJava操作符分类.jpg

    下面会逐一讲解每一类操作符的使用

    2、创建操作符

    RxJava创建操作符.jpg

    2.1、基本创建操作符

    create作为RxJava最基本的创建操作,用来完整的创建一个被观察者Observable对象

    通过create创建一个被观察Observable对象

     Observable observable = Observable.create(new ObservableOnSubscribe() {
                @Override
                public void subscribe(ObservableEmitter e) throws Exception {
                    //重写subscribe 写入实际的代码逻辑
                    if (!e.isDisposed()) {
                        e.onNext("RxText");
                    }
                }
            });
    

    创建观察者Observer对象

    Observer observer = new Observer() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Object o) {
                    LogUtils.showLog("message == " + (String) o);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            };
    

    通过subscribe进行关联

    observable.subscribe(observer);
    

    在实际使用时一般链式调用

    Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> e) throws Exception {
                    if (!e.isDisposed()) {
                        e.onNext("test");
                    }
                }
            })
                    .subscribe(new Observer<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(String s) {
                            LogUtils.showLog("s == " + s);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                           
                        }
    
                        @Override
                        public void onComplete() {
                          
                        }
                    });
    

    2.2、快速创建操作符

    使用场景,快速的创建被观察者并进行数据发送

    1、just()

    作用:快速创建1个被观察者对象Observable

    注意:just只能传入最多10个数据

    Observable.just(1, 2, 3, 4, 5)
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            LogUtils.showLog("integer == " + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    

    输出:

    D/hzfTag1204: integer == 1
        integer == 2
        integer == 3
        integer == 4
        integer == 5
    

    2、fromArray()

    作用:快速创建一个被观察者,直接发送传入的数组数据,当发送数据大于10时可以考虑采用fromArray

     int[] arrays = {1, 2, 3, 4, 5};
            Observable.fromArray(arrays)
                    .subscribe(new Observer<int[]>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(int[] ints) {
                             //遍历数组并输出
                            for (int num : ints) {
                                LogUtils.showLog("num == " + num);
                            }
                           
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    

    3、 fromIterable

    作用:快速创建一个Observable并将集合当中的数据发送

    fromIterable的使用与fromArray一致,数据由fromArray的数组改为集合,不做具体的赘述了

    BUT!!! 我在做测试的时候发现了一个好玩儿的事情.....

    fromArray中也可以传一个list;但是fromIterable不能传数组

    fromIterable不能传数组根据代码很明显,其对参数做了限制

    fromIterable参数.png

    但fromArray没有做限制,当我用以下代码操作时可以正确拿到list中的数据

            List list = new ArrayList();
            list.add("1");
            list.add("2");
            list.add("3");
            Observable.fromArray(list)
                    .subscribe(new Observer<List>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(List ints) {
                            LogUtils.showLog("ints == "+ints.get(2));
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            LogUtils.showLog("exception == "+e.getMessage().toString());
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    
    
    
    
    fromArray传集合.png

    通过断点分析源码fromArray当中的逻辑,当fromArray传入一个数组,会走到item.length == 1这个判断当中,并且最终走的是just操作符


    fromArray源码.png

    所以通过fromArray传入集合就相当于是just(list);但是为什么会走到items.length == 1这个判断当中?
    这里面涉及到Java可变参数的概念,fromArray后面参数是T...意思是参数不确定,可以传多个参数,传入几个参数这个items的length就是多少,所以fromArray不论传的是list还是array,只要传的是一个参数,最终都相当于通过just将数据发送出去了(相当于把对象通过just发出去);当fromArray中的参数大于1时,会将参数封装成为一个T[]数组,再将数组中的每一个元素逐一发送。


    ObservableFromArray.png

    4、never
    作用:只会调用onSubscribe方法,不会回调onNext onError onComplete等回调方法;通过源码可以看出,内部的subscribeActual方法只是调用了onSubscribe,并没有执行其他的回调方法

    Rxjava never操作符.png

    5、empty
    作用:当订阅后,被观察者只会发送一个onComplete事件,最终Observer的回调只有onSubscribe和onComplete会执行

    6、error
    作用:订阅后仅发送Error事件,error的参数可以自定义异常发送给onError

    2.3、延迟创建操作符

    延迟创建操作符的需求场景:
    (1)当经过n秒后,执行操作x
    (2)每经过n秒,周期的执行操作x
    延迟创建操作符的分类:


    RxJava延迟操作符.jpg

    1、timer
    作用:快速创建一个Observable,并指定一段时间后发送onNext(0)事件;onNext的参数为long类型,默认数值为0

    final long startTime = System.currentTimeMillis();
            Observable.timer(5,TimeUnit.SECONDS)
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Long aLong) {
                            LogUtils.showLog((System.currentTimeMillis() - startTime) + " ms后接收到了数据 " + aLong);
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    
    12-05 10:07:11.351 12187-12207/com.hzf.test.myapplication D/hzfTag1205: 5005 ms后接收到了数据 0
    

    2、defer
    作用:等到实际subscribe订阅时才会创建一个Observable;可以保证Observable的数据在订阅时是最新的数据

     Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
                @Override
                public ObservableSource<? extends String> call() throws Exception {
                    return Observable.just(test1);
                }
            });
    
            test1 = "test222";
    
            observable.subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(String s) {
                    LogUtils.showLog("s == "+s);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    
    12-05 10:22:00.299 12308-12308/com.hzf.test.myapplication D/hzfTag1205: s == test222
    

    通过defer的源码可以出来被观察者的创建是在subscribeActual订阅时
    (ObservableDefer)


    RxJava defer订阅.png

    3、interval
    作用:快速创建一个被观察者Observable对象,每隔指定时间发送一个事件
    interval的参数最多可用的为4个参数
    参数1:初始延迟发送事件的时间
    参数2:间隔发送事件的时间
    参数3:TimeUnit指定的时间类型
    参数4:Scheduler,可以手动创建一个worker指定interval的运行线程(如果不手动选择第四个参数,默认interval发生在子线程)

    Observable.interval(3, 5, TimeUnit.SECONDS)
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Long aLong) {
                            LogUtils.showLog("aLong == "+aLong+",当前线程为 "+Thread.currentThread().getName());
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
        }
    
    运行结果为:
    12-05 11:17:32.539 13352-13372/com.hzf.test.myapplication D/hzfTag1205: aLong == 0,当前线程为 RxComputationThreadPool-1
    12-05 11:17:37.539 13352-13372/com.hzf.test.myapplication D/hzfTag1205: aLong == 1,当前线程为 RxComputationThreadPool-1
    12-05 11:17:42.539 13352-13372/com.hzf.test.myapplication D/hzfTag1205: aLong == 2,当前线程为 RxComputationThreadPool-1
    12-05 11:17:47.539 13352-13372/com.hzf.test.myapplication D/hzfTag1205: aLong == 3,当前线程为 RxComputationThreadPool-1
    

    也可以指定调度器,例如:

    Observable.interval(3, 5, TimeUnit.SECONDS, new Scheduler() {
                @Override
                public Worker createWorker() {
                    return AndroidSchedulers.mainThread().createWorker();
                }
            })
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Long aLong) {
                            LogUtils.showLog("aLong == "+aLong+",当前线程为 "+Thread.currentThread().getName());
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    
    12-05 11:20:19.386 13450-13450/com.hzf.test.myapplication D/hzfTag1205: aLong == 0,当前线程为 main
    12-05 11:20:24.388 13450-13450/com.hzf.test.myapplication D/hzfTag1205: aLong == 1,当前线程为 main
    12-05 11:20:29.388 13450-13450/com.hzf.test.myapplication D/hzfTag1205: aLong == 2,当前线程为 main
    12-05 11:20:34.388 13450-13450/com.hzf.test.myapplication D/hzfTag1205: aLong == 3,当前线程为 main
    

    如果运用线程操作符的话,经过我的实验,当调用subscribeOn时是不起作用的,实际发生事件的线程依然是子线程或者指定的调度器;而调用observeOn时,onNext接收事件的线程即为observeOn所指定的线程。

    2、intervalRange
    作用:快读创建一个被观察者对象,每隔指定时间发送数据,但是与interval不同的是intervalRange可以指定发送的数据数量

    参数1:起始的事件序号
    参数2:事件数量
    参数3:第1次事件延迟发送的时间
    参数4:事件间的间隔时间
    参数5:时间单位
    参数6:Scheduler

    Observable.intervalRange(3, 3, 3, 2, TimeUnit.SECONDS)
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Long aLong) {
                            LogUtils.showLog("aLong == " + aLong);
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    
    运行结果:
    12-05 11:30:08.864 13737-13757/com.hzf.test.myapplication D/hzfTag1205: aLong == 3
    12-05 11:30:10.864 13737-13757/com.hzf.test.myapplication D/hzfTag1205: aLong == 4
    12-05 11:30:12.864 13737-13757/com.hzf.test.myapplication D/hzfTag1205: aLong == 5
    

    2、range
    作用:快速创建一个被观察者,并连续发送一个事件序列,可指定范围。功能与intervalRange类似,但实现的功能会相对简单一些。

    参数1:起始数据
    参数2:发送多少条数据

    //从5开始发送 发送10个数据
            Observable.range(5, 10)
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            LogUtils.showLog("integer == " + integer);
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    

    2、rangeLong
    作用:与range类似,但支持long类型

    相关文章

      网友评论

        本文标题:呕心沥血:RxJava2.x创建操作符

        本文链接:https://www.haomeiwen.com/subject/wdnmwctx.html