美文网首页框架【库】
Android--RxJava操作符

Android--RxJava操作符

作者: 特大碗牛肉面 | 来源:发表于2018-09-18 11:32 被阅读126次

    Func1 和 Action1 非常相似,也是 RxJava 的一个接口,用于包装含有一个参数的方法。 Func1 和 Action 的区别在于 Func1 包装的是有返回值的方法。另外,和 ActionX 一样, FuncX 也有多个,用于不同参数个数的方法。FuncX 和 ActionX 的区别在 FuncX 包装的是有返回值的方法。

    1.flatMapIterable: (将数据转换后再发送)

    private Observable<String> flatMapIterableObserver(){
            return Observable.just(1,2,3)
                    .flatMapIterable(new Func1<Integer, Iterable<? extends String>>() {
                        @Override
                        public Iterable<? extends String> call(Integer integer) {
                            ArrayList<String> strings = new ArrayList<>();
                            for (int i=0;i<3;i++){
                                strings.add("flatMapIterableObserver  "+integer);
                            }
                            return strings;
                        }
                    });
        }
    
    09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver  1
    09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver  1
    09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver  1
    09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver  2
    09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver  2
    09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver  2
    09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver  3
    09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver  3
    09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver  3
    

    2.map:(将数据源Observable发送给每个数据进行指定函数转换,再将转换后的数据发送出去)

    private Observable<Integer> mapObservable(){
            return Observable.just(1,2,3)
                    .map(new Func1<Integer, Integer>() {
                        @Override
                        public Integer call(Integer integer) {
                            return integer*10;
                        }
                    });
        }
    
    09-13 19:30:11.802 32095-32095/com.example.administrator.rxjavademo E/call: map: 10
    09-13 19:30:11.803 32095-32095/com.example.administrator.rxjavademo E/call: map: 20
    09-13 19:30:11.803 32095-32095/com.example.administrator.rxjavademo E/call: map: 30
    
    • flatMap() 和 map() 有一个相同点:它也是把传入的参数转化之后返回另一个对象。
    • flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中,map返回的是结果集。
    • flatMap() 的原理是这样的:1. 使用传入的事件对象创建一个 Observable 对象;2. 并不发送这个 Observable, 而是将它激活,于是它开始发送事件;3. 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。而这个『铺平』就是 flatMap() 所谓的 flat
    • map适用于一对一转换,flatmap适用于一对多,多对多的场景

    3.groupBy:(将数据源转换为groupBy筛选后的Observable对象)

    private Observable<GroupedObservable<Integer,String>> groupedByStringObservable(){
            return Observable.just(1,2,3,4,5,6,7,8,9)
                    .groupBy(new Func1<Integer, Integer>() {
                        @Override
                        public Integer call(Integer integer) {
                            return integer % 2;
                        }
                    }, new Func1<Integer, String>() {
                        @Override
                        public String call(Integer integer) {
                            return "groupBy: "+integer;
                        }
                    });
        }
    
    groupedByStringObservable().subscribe(new Action1<GroupedObservable<Integer, String>>() {
                        @Override
                        public void call(GroupedObservable<Integer, String> integerStringGroupedObservable) {
                            if (integerStringGroupedObservable.getKey()==0){
                                integerStringGroupedObservable.subscribe(new Action1<String>() {
                                    @Override
                                    public void call(String s) {
                                        Log.e("call", s);
                                    }
                                });
                            }
                        }
                    });
    
    09-13 19:17:28.812 31719-31719/com.example.administrator.rxjavademo E/call: groupBy: 2
    09-13 19:17:28.812 31719-31719/com.example.administrator.rxjavademo E/call: groupBy: 4
    09-13 19:17:28.812 31719-31719/com.example.administrator.rxjavademo E/call: groupBy: 6
    09-13 19:17:28.812 31719-31719/com.example.administrator.rxjavademo E/call: groupBy: 8
    

    4.cast:(将Observable发送的数据强转成另外一种类型)

    5.scan:(做一次计算,有条件、有筛选的输出最终结果)

    6.throttleWithTimeout:(时间限流, 低于指定时间的都将被过滤)

    private Observable<Integer> createObserver(){
            return Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    for (int i=0;i<10;i++){
                        if (!subscriber.isUnsubscribed()){
                            subscriber.onNext(i);
                        }
                        int sleep = 100;
                        if (i%3==0){
                            sleep = 300;
                        }
                        try{
                            Thread.sleep(sleep);
                        } catch (Exception e){
                            e.printStackTrace();
                        }
                    }
                    subscriber.onCompleted();
                }
            }).subscribeOn(Schedulers.computation());
        }
    
    private Observable<Integer> throttleWithThimoutObserver(){
        return createObserver().throttleWithTimeout(200,TimeUnit.MILLISECONDS);
    }
    
    对小于3的倍数的数据延迟100毫秒后发送新数据, 100毫秒小于过滤的时间,数据被过滤掉;
    输出日志如下:
    
    09-14 10:06:19.673 2916-2964/com.example.administrator.rxjavademo E/call: integer: 0
    09-14 10:06:20.174 2916-2964/com.example.administrator.rxjavademo E/call: integer: 3
    09-14 10:06:20.674 2916-2964/com.example.administrator.rxjavademo E/call: integer: 6
    09-14 10:06:21.175 2916-2964/com.example.administrator.rxjavademo E/call: integer: 9
    

    7.distinct:(去重,不能List元素出重, 可以list对象出重)

     private Observable<Integer> distinctObserver(){
            return Observable.just(1,2,4,5,6,8,4,3,2,1).distinct();
        }
    
    09-14 10:33:51.124 3710-3710/com.example.administrator.rxjavademo E/call: integer: 1
    09-14 10:33:51.124 3710-3710/com.example.administrator.rxjavademo E/call: integer: 2
    09-14 10:33:51.124 3710-3710/com.example.administrator.rxjavademo E/call: integer: 4
    09-14 10:33:51.124 3710-3710/com.example.administrator.rxjavademo E/call: integer: 5
    09-14 10:33:51.125 3710-3710/com.example.administrator.rxjavademo E/call: integer: 6
    09-14 10:33:51.125 3710-3710/com.example.administrator.rxjavademo E/call: integer: 8
    09-14 10:33:51.125 3710-3710/com.example.administrator.rxjavademo E/call: integer: 3
    

    8.elementAt:(下标顺序过滤数据源)

    private Observable<Integer> elementAtObserver(){
            return Observable.just(1,3,8,10,9).elementAt(3);
        }
    
    09-14 11:05:48.591 5006-5006/com.example.administrator.rxjavademo E/call: integer: 10
    

    9.filter:(根据函数进行过滤操作,返回true就往下执行,否则过滤掉 , 和last类似)

     private Observable<Integer> filterObserver(){
            return Observable.just(0,1,2,3,4,5,6,7,8,9)
                    .filter(new Func1<Integer, Boolean>() {
                        @Override
                        public Boolean call(Integer integer) {
                            return integer<3;
                        }
                    });
        }
    
    09-14 11:39:34.319 5856-5856/com.example.administrator.rxjavademo E/call: integer: 0
    09-14 11:39:34.319 5856-5856/com.example.administrator.rxjavademo E/call: integer: 1
    09-14 11:39:34.319 5856-5856/com.example.administrator.rxjavademo E/call: integer: 2
    

    10.first:(只会返回第一条或者满足条件的第一条数据)

     Observable.just(0,1,2,3,4,5).first(new Func1<Integer, Boolean>() {
                        @Override
                        public Boolean call(Integer integer) {
                            return integer>3;
                        }
                    }).subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            Log.e("call","integer: "+integer);
                        }
                    });
    
    
    //BlockingObservable不会做任何处理,只会阻塞;
     int result = Observable.just(0,1,2,3,4,5)
                            .toBlocking()
                            .first(new Func1<Integer, Boolean>() {
                                @Override
                                public Boolean call(Integer integer) {
                                    return integer>3;
                                }
                            });
    
    09-14 11:56:10.987 6328-6328/com.example.administrator.rxjavademo E/call: integer: 4
    

    11.skip和take: (skip操作符过滤掉前面的数据,take只取前面数据 ,将后面的数据全部过滤掉)

    12.sample和throttleFrist:(sample一次性发送间隔的几个数据,throttleFrist间隔时间后发送一个数据)

    13.join:(将两个Observable在有效的时间内拼接)

    private Observable<String> getLeftObservable(){
            return Observable.just("a","b","c");
    }
    
    private Observable<Long> getRightObservable(){
          return Observable.just(1L,2L,3L);
    }
    
    private Observable<String> joinObserver(){
        return getLeftObservable().join(getRightObservable(), new Func1<String, Observable<Long>>() {
            @Override
            public Observable<Long> call(String s) {
                return Observable.timer(1000, TimeUnit.MILLISECONDS);
            }
        }, new Func1<Long, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long aLong) {
                return Observable.timer(1000, TimeUnit.MILLISECONDS);
            }
        }, new Func2<String, Long, String>() {
            @Override
            public String call(String s, Long aLong) {
                return s+" : "+aLong;
            }
        });
    }
    
    
    09-14 15:08:57.223 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: b : 1
    09-14 15:08:57.223 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: a : 1
    09-14 15:08:57.223 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: c : 1
    09-14 15:08:57.224 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: b : 2
    09-14 15:08:57.224 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: a : 2
    09-14 15:08:57.224 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: c : 2
    09-14 15:08:57.225 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: b : 3
    09-14 15:08:57.225 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: a : 3
    09-14 15:08:57.225 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: c : 3
    

    14.merge:(将多个Observable发送的数据整合在一起后发送,但发送的数据可能是错乱的,如果不想错乱可以使用concat)

    private Observable<Integer> mergeObserver(){
        return Observable.merge(Observable.just(1,2,3),Observable.just(4,5,6));
    }
    
    09-14 15:39:05.915 10013-10013/com.example.administrator.rxjavademo E/call: mergeObserver: 1
    09-14 15:39:05.915 10013-10013/com.example.administrator.rxjavademo E/call: mergeObserver: 2
    09-14 15:39:05.915 10013-10013/com.example.administrator.rxjavademo E/call: mergeObserver: 3
    09-14 15:39:05.915 10013-10013/com.example.administrator.rxjavademo E/call: mergeObserver: 4
    09-14 15:39:05.915 10013-10013/com.example.administrator.rxjavademo E/call: mergeObserver: 5
    09-14 15:39:05.915 10013-10013/com.example.administrator.rxjavademo E/call: mergeObserver: 6
    

    15.mergeDelayError:(类似merge ,但是遇到异常后会继续组合操作,等所有数据发送完成后才将这个异常抛出)

    private Observable<Integer> mergeDelayErrorObserver(){
        return Observable.mergeDelayError(Observable
        .create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for (int i=0;i<5;i++){
                    if (i==3){
                        subscriber.onError(new Throwable("onError"));
                    }
                    subscriber.onNext(i);
                }
            }
        }),Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for (int i=0;i<5;i++){
                    if (i==3){
                        subscriber.onNext(i+5);
                    }
                    subscriber.onCompleted();
                }
            }
        }));
    }
    
    09-14 15:44:31.088 10353-10353/com.example.administrator.rxjavademo E/call: mergeObserver: 0
    09-14 15:44:31.088 10353-10353/com.example.administrator.rxjavademo E/call: mergeObserver: 1
    09-14 15:44:31.088 10353-10353/com.example.administrator.rxjavademo E/call: mergeObserver: 2
    09-14 15:44:31.088 10353-10353/com.example.administrator.rxjavademo E/call: mergeObserver: 3
    09-14 15:44:31.088 10353-10353/com.example.administrator.rxjavademo E/call: mergeObserver: 4
    09-14 15:44:31.088 10353-10353/com.example.administrator.rxjavademo E/call: mergeObserver: 8
    09-14 15:44:31.090 10353-10353/com.example.administrator.rxjavademo E/AndroidRuntime: FATAL EXCEPTION: main
        Process: com.example.administrator.rxjavademo, PID: 10353
        rx.exceptions.OnErrorNotImplementedException: onError
    

    16.startWith:(需要发送的数据源前面插入数据)

    private Observable<Integer> startWithObserver(){
            return Observable.just(1,2,3).startWith(-1,4);
        }
    
    09-14 15:49:55.637 10729-10729/com.example.administrator.rxjavademo E/call: mergeObserver: -1
    09-14 15:49:55.637 10729-10729/com.example.administrator.rxjavademo E/call: mergeObserver: 4
    09-14 15:49:55.637 10729-10729/com.example.administrator.rxjavademo E/call: mergeObserver: 1
    09-14 15:49:55.637 10729-10729/com.example.administrator.rxjavademo E/call: mergeObserver: 2
    09-14 15:49:55.637 10729-10729/com.example.administrator.rxjavademo E/call: mergeObserver: 3
    

    17.onErrorReturn:(捕获异常并返回了指定的字符串给订阅者)

    private Observable<String> createObserver(){
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                for (int i=1;i<=6;i++){
                    if (i<3){
                        subscriber.onNext("onNext: "+i);
                    } else {
                        subscriber.onError(new Throwable("onError"));
                    }
                }
            }
        });
    }
    private Observable<String> onErrorReturnObserver() {
        return createObserver().onErrorReturn(new Func1<Throwable, String>() {
            @Override
            public String call(Throwable throwable) {
                return "onErrorReturn";
            }
        });
    }
    
    09-14 16:25:28.983 11558-11558/com.example.administrator.rxjavademo E/onErrorReturnObserver: onNext: onNext: 1
    09-14 16:25:28.983 11558-11558/com.example.administrator.rxjavademo E/onErrorReturnObserver: onNext: onNext: 2
    09-14 16:25:28.983 11558-11558/com.example.administrator.rxjavademo E/onErrorReturnObserver: onNext: onErrorReturn
    09-14 16:25:28.983 11558-11558/com.example.administrator.rxjavademo E/onErrorReturnObserver: onCompleted
    

    18.onErrorResumeNext:(发生异常的时候, 创建新的Observable来继续发送数据)

    19.onExceptionResumeNext:(类似onErrorResumeNext,不同之处是对异常数据做判断 ,如果是Exception就会使用另一个Observable代替原来的继续发数据,否则将错误分发给Subscriber)

    20.retry:(发生错误的时候会重新订阅,而且可以重复多次,但是这样也就有可能造成死循环,建议指定最大重复次数)

    private Observable<Integer> createObserver(){
            return Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    Log.e("createObserver","subscriber");
                    for (int i=0;i<3;i++){
                        if (i==2){
                            subscriber.onError(new Exception("Exception"));
                        } else {
                            subscriber.onNext(i);
                        }
                    }
                }
            });
        }
    
    createObserver().retry(2).subscribe(new Subscriber<Integer>() {
                        @Override
                        public void onCompleted() {
                            Log.e("retry","onCompleted");
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.e("retry","onError: "+e.getMessage());
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.e("retry","call: "+integer);
                        }
                    });
    09-17 17:34:02.069 18668-18668/com.example.administrator.rxjavademo E/createObserver: subscriber
    09-17 17:34:02.069 18668-18668/com.example.administrator.rxjavademo E/retry: call: 0
    09-17 17:34:02.069 18668-18668/com.example.administrator.rxjavademo E/retry: call: 1
    09-17 17:34:02.070 18668-18668/com.example.administrator.rxjavademo E/createObserver: subscriber
    09-17 17:34:02.070 18668-18668/com.example.administrator.rxjavademo E/retry: call: 0
    09-17 17:34:02.070 18668-18668/com.example.administrator.rxjavademo E/retry: call: 1
    09-17 17:34:02.071 18668-18668/com.example.administrator.rxjavademo E/createObserver: subscriber
    09-17 17:34:02.071 18668-18668/com.example.administrator.rxjavademo E/retry: call: 0
    09-17 17:34:02.071 18668-18668/com.example.administrator.rxjavademo E/retry: call: 1
    09-17 17:34:02.071 18668-18668/com.example.administrator.rxjavademo E/retry: onError: Exception
    

    21.delay:(延迟发送)

    22.do:(给Observable的各个阶段加上监听,执行到的时候就触发)

    • doOnEach() :Observable每次发送一个数据的时候就会触发这个回调,无论Observable调用的是onNext,onError还是onCompleted.
    • doOnNext: 只有Observable调用onNext 发送数据的时候才会调用;
    • doOnError: 只有Observable通过onError 分发错误的时候才会触发回调,并且调用Throwble对象作为参数传递到回调函数去;
    • doOnComplete:只有Observable调用doOnComplete 发送结束事件的时候才会触发回调;
    • doOnSubscribe和doOnUnsubscribe: 会在Subscrible进行订阅和反订阅的时候才会触发回调;
    • doOnTerminate:会在Observable结束前触发回调,无论是正常结束还是异常结束;
    • finallyDo: 会在Observable结束后触发回调,无论是正常结束还是异常结束;

    23.subscribeOn和observeOn:(subscribeOn是在哪个线程上订阅,也就是用subscribeOn指定要工作的线程;observeOn是在哪个线程上观察,也就是结果被使用的线程)

     private Observable<Integer> observableOnserver(){
            return createObserver()
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribeOn(Schedulers.newThread());
        }
    
        private Observable<Integer> createObserver(){
            return Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    Log.e("createObserver","call: "+Thread.currentThread().getName());
                    subscriber.onNext(1);
                    subscriber.onCompleted();
                }
            });
        }
    
        private Observable<Integer> subscribeOnObserver(){
            return createObserver()
                    .subscribeOn(Schedulers.computation())
                    .observeOn(Schedulers.immediate());
        }
    
    09-18 10:34:52.832 3048-3092/com.example.administrator.rxjavademo E/createObserver: call: RxComputationThreadPool-1
    09-18 10:34:52.833 3048-3092/com.example.administrator.rxjavademo E/computation: call: 1   RxComputationThreadPool-1
    09-18 10:34:52.843 3048-3091/com.example.administrator.rxjavademo E/createObserver: call: RxNewThreadScheduler-1
    09-18 10:34:52.844 3048-3048/com.example.administrator.rxjavademo E/mainThread: call: 1   main
    

    24.using:(创建一个在Observable生命周期内存活的资源,但是Observable终止的时候,资源也会被销毁)

    25.all:(对Observable发送的所有数据进行判断,如果全部满足就返回true,否则返回false)

    26.amb:(最多将9个Observable结合起来,看哪个先发送(包括onError和onComplete),后发送的将被丢弃)

    27.contains:(判断发送的所有数据有没有包含某个数据,如果包含就返回true,Observable没发送完所有数据前不会返回数据)

    28.isEmpty:(判断Observable是否发送过数据,如果发送过了就返回false;如果Observavble已经结束了都还没发送这个数据,则返回true)

    29.concat:(将发送的数据组合起来,类似startWith和merge)

    30.from:(接收一个对象作为参数来创建Observable,参数对象可以是Iterable,Callable,Future和数组)

    31.just:(接收对象作为输入,然后创建一个发送该对象的Observable,对象可以是数字,字符串,数组,Iterate对象等)

    • from()创建方式和just()操作符类似,但是just操作符创建的Observable会将整个参数对象作为数据一下子发送出去,例如参数是个含有10个数字的数组,使用from创建Observable就会发送10次,而just创建的Observable会一次将整个数组发送出去;
    • 一般如果用from转换多个数据,比如 ArrayList等包含多个数据的数组或者列表, just用于处理单个的数据。
    • from 遇到只包含一个数据的时候,会使用just创建Observable; just 遇到多于一个的情况,会使用from 创建 Observable

    32.自定义操作符:
    A. 可以多次调用Subscriber的onNext方法,但是同个数据只能调用一次;
    B. 可以调用Subscriber的onComplete或者onError方法,但是这两个方法是互斥的,调用了其中一个就不能调用另一个,并且一旦调用了两者中的任何一个方法就不能调用onNext方法;
    C. 如果无法保证无法保证上面两条原则,可以对自定义操作符加上serialize操作符,这个操作符会强制性发送正确的数据;
    D. 自定义操作内部不能阻塞;
    E.如果有异常的时候,不能继续发送正常的数据,要立刻调用Subscriber的onError() 来将异常抛出;
    F.null也属于一种数据, 可以正常发送,和完全不发送是两回事;
    G.如果通过组合多个Rxjava原生操作符就能达到目的, 就不要使用自定义操作符实现;例如:

    • first(操作符是通过take(1).single()来实现的;
    • ignoreElements()是通过filter(alwaysFalse())来实现的;
    • reduce(a)是通过scan(a).last()来实现的;

    参考:

    相关文章

      网友评论

      本文标题:Android--RxJava操作符

      本文链接:https://www.haomeiwen.com/subject/bxlggftx.html