RxJava操作符(二)

作者: 小白要超神 | 来源:发表于2017-09-30 13:54 被阅读155次

    注:只包含标准包中的操作符,用于个人学习及备忘
    参考博客:http://blog.csdn.net/maplejaw_/article/details/52396175

    本篇将介绍rxjava中的错误处理/重试机制、连接操作、阻塞操作以及工具集的使用,只针对用法不涉及原理,对RxJava不熟悉的可参考:http://gank.io/post/560e15be2dca930e00da1083

    错误处理 / 重试机制

    • onErrorResumeNext:当原始的Observable在遇到错误时,使用备用的Observable

        Observable.just(1, 2, "3")
                .cast(Integer.class)    //强制转换成Integer类型,强转String类型数据时异常
                .onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() {
                    @Override
                    public Observable<? extends Integer> call(Throwable throwable) {
                        return Observable.just(4, 5, 6);
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印1,2,4,5,6
                    }
                });
      
        Observable.just(1, 2, "3")
                .cast(Integer.class)    //强制转换成Integer类型,强转String类型数据时异常
                .onErrorResumeNext(Observable.just(4, 5, 6))
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印1,2,4,5,6
                    }
                });
      
    • onExceptionResumeNext:当原始Observable在遇到异常时,使用备用的Observable。与onErrorResumeNext类似,区别在于onErrorResumeNext可以处理所有的错误,onExceptionResumeNext只能处理异常。

        Observable.just(1, 2, "3")
                .cast(Integer.class)    //强制转换成Integer类型,强转String类型数据时异常
                .onExceptionResumeNext(Observable.just(4, 5, 6))
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印1,2,4,5,6
                    }
                });
      
    • onErrorReturn:当原始Observable在遇到错误时发射一个指定的数据

        Observable.just(1, 2, "3", 4)
                .cast(Integer.class)
                .onErrorReturn(new Func1<Throwable, Integer>() {
                    @Override
                    public Integer call(Throwable throwable) {
                        return 5;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印1,2,5
                    }
                });
      
    • retry:当原始的Observable在遇到错误时进行重试

        Observable.just(1, 2, "3", 4)
                .cast(Integer.class)
                .retry()    //默认无限重试次数
                .observeOn(Schedulers.newThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印1,2,1,2,1,2 ...
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        Log.d("debug", "onError");
                    }
                });
      
        Observable.just(1, 2, "3", 4)
                .cast(Integer.class)
                .retry(2)    //设置最多重试2次
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印1,2,1,2,1,2,onError
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        Log.d("debug", "onError");
                    }
                });
      
        Observable.just(1, 2, "3", 4)
                .cast(Integer.class)
                .retry(new Func2<Integer, Throwable, Boolean>() {
                    @Override
                    public Boolean call(Integer count, Throwable throwable) {
                        Log.d("debug", "count:" + count); //count表示是第几次判断是否重试
                        return count != 2;    //返回true表示重试,返回false表示终止
                    }
                })    
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印1,2,count:1,1,2,count:2,onError
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        Log.d("debug", "onError");
                    }
                });
      
    • retryWhen:当原始Observable在遇到错误,将错误传递给另一个Observable来决定是否要重新订阅这个Observable,如果发送的是onCompleted或者onError事件,将不会触发重订阅。相对的,如果它发送onNext事件,则触发重订阅(不管onNext实际上是什么事件),内部调用的是retry
      参考博客:http://www.cnblogs.com/resentment/p/5988241.html

        Observable.just(1, 2, "3", 4)
                .cast(Integer.class)
                //官方例子:第一次等待1秒重试,第二次等待2秒,第三次等待3秒,第四次停止重试
                .retryWhen(new Func1<Observable<? extends Throwable>, Observable<Long>>() {
                    @Override
                    public Observable<Long> call(Observable<? extends Throwable> observable) {
                        return observable.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
                            @Override
                            public Integer call(Throwable throwable, Integer integer) {
                                return integer;
                            }
                        }).flatMap(new Func1<Integer, Observable<Long>>() {
                            @Override
                            public Observable<Long> call(Integer integer) {
                                return Observable.timer(integer,TimeUnit.SECONDS);
                            }
                        });
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString()); //打印1,2,1,2,1,2,1,2
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        Log.d("debug", "onError");
                    }
                });
        /**
         * 注:输入的Observable必须作为输出Observable的源,必须对Observable<Throwable>做出反应,然后基于它发送事件。
         */
        Observable.just(1, 2, "3", 4)
                .subscribeOn(Schedulers.newThread())
                .cast(Integer.class)
                .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {    //相当于retry(),即无限重试
                    @Override
                    public Observable<?> call(Observable<? extends Throwable> observable) {
                        return observable;
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString());
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        Log.d("debug", "onError");
                    }
                });
      
        Observable.just(1, 2, "3", 4)
                .cast(Integer.class)
                //重试3次,每次间隔1秒
                .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {    
                    @Override
                    public Observable<?> call(Observable<? extends Throwable> observable) {
                        return observable.zipWith(Observable.interval(1, TimeUnit.SECONDS).take(3), new Func2<Throwable, Long, Long>() {
                            @Override
                            public Long call(Throwable throwable, Long aLong) {
                                return aLong;
                            }
                        });
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString());  //打印1,2,1,2,1,2,1,2
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        Log.d("debug", "onError");
                    }
                });
      

    连接操作

    • ConnectableObservable:可连接的Observable,一种特殊的Observable对象,不会在订阅时就开始发射数据,而是调用connect操作符时才开始发射数据,可以用来灵活控制发射数据的时机,需要注意的是,如果实在开始发射数据后再进行订阅,就只能接收到订阅以后发射的数据

    • ConnectableObservable.connect():指示一个可连接的Observable开始发射数据,没有订阅时也可发射数据

    • Observable.publish():将一个Observable转换为可连接的Observable

    • ConnectableObservable.refCount():将一个可连接的Observable转换成普通的Obervable

        /**
         * ConnectableObservable,ConnectableObservable转换成的普通Observable,以及普通的Observable之间的比较
         */
      
        //第一种:ConnectableObservable
        new Thread(new Runnable() {
            @Override
            public void run() {
                ConnectableObservable<Long> co = Observable.interval(1, TimeUnit.SECONDS)
                        .take(3).publish();//转换成可连接的Observable
      
                co.subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.d("debug", aLong.toString()); //没有接收到数据
                    }
                });
      
                co.connect();   //开始发射数据,第一个订阅者接收到数据,每秒打印一次,分别打印 0,1,2
      
                //延迟1.5秒后开始订阅
                SystemClock.sleep(1500);
                co.subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.d("debug", aLong + 3 + ""); //只能接收到订阅以后发射的数据,打印4,5
                    }
                });
      
                //再延迟2.5秒后再次订阅,此时数据已经发送完毕
                SystemClock.sleep(2500);
                co.subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.d("debug", aLong + 6 + ""); //没有接收到数据
                    }
                });
            }
        }).start();
      
        //第二种:ConnectableObservable转换成的普通Observable
        new Thread(new Runnable() {
            @Override
            public void run() {
                ConnectableObservable<Long> co = Observable.interval(1, TimeUnit.SECONDS)
                        .take(3).publish();//转换成可连接的Observable
      
                co.subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.d("debug", aLong.toString()); //没有接收到数据
                    }
                });
      
                co.connect();   //开始发射数据,第一个订阅者接收到数据,每秒打印一次,分别打印 0,1,2
      
                //延迟1.5秒后开始订阅
                SystemClock.sleep(1500);
                co.refCount()   //转换成普通Observable
                        .subscribe(new Action1<Long>() {
                            @Override
                            public void call(Long aLong) {
                                Log.d("debug", aLong + 3 + ""); //只能接收到订阅以后发射的数据,打印4,5
                            }
                        });
      
                //再延迟2.5后再次订阅,此时数据已经发送完毕
                SystemClock.sleep(2500);
                co.refCount()   //转换成普通Observable
                        .subscribe(new Action1<Long>() {
                            @Override
                            public void call(Long aLong) {
                                Log.d("debug", aLong + 6 + ""); //Observable重新发送数据,打印6,7,8
                            }
                        });
            }
        }).start();
      
        //第三种:普通的Observable
        new Thread(new Runnable() {
            @Override
            public void run() {
                Observable<Long> ob = Observable.interval(1, TimeUnit.SECONDS).take(3);
      
                ob.subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.d("debug", aLong.toString()); //打印0,1,2
                    }
                });
      
                //延迟1.5秒后开始订阅
                SystemClock.sleep(1500);
                ob.subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.d("debug", aLong + 3 + ""); //Observable重新发射数据,打印3,4,5
                    }
                });
      
                //再延迟2.5后再次订阅,此时数据已经发送完毕
                SystemClock.sleep(2500);
                ob.subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.d("debug", aLong + 6 + ""); //Observable重新发射数据,打印6,7,8
                    }
                });
            }
        }).start();
      
    • Observable.replay():返回一个可连接的Observable,并且可以缓存其发射过的数据,这样即使有订阅者在其发射数据之后进行订阅也能收到其之前发射过的数据,使用时最好设置其缓存大小,避免占用太多内存
      参考博客:http://blog.csdn.net/lizubing1992/article/details/51321422

    //创建缓存2个数据的可连接的Observable
    private ConnectableObservable<Long> relayCountObserver() {
        return Observable.interval(1, TimeUnit.SECONDS)
                .observeOn(Schedulers.newThread())
                .take(6)
                .replay(2);
    }
    
    //创建缓存3秒前的数据的可连接的Observable
    private ConnectableObservable<Long> relayTimeObserver() {
        return Observable.interval(1, TimeUnit.SECONDS)
                .observeOn(Schedulers.newThread())
                .take(6)
                .replay(3, TimeUnit.SECONDS);
    }
    
    //创建缓存3秒前的最后2个数据的可连接的Observable
    private ConnectableObservable<Long> relayTimeAndCountObserver() {
        return Observable.interval(1, TimeUnit.SECONDS)
                .observeOn(Schedulers.newThread())
                .take(6)
                .replay(2, 3, TimeUnit.SECONDS);
    }
    
    //创建两个观察者对可连接的Obserable进行订阅,第一个观察者直接订阅,
    // 第二个观察者在Observable发射4个数据后再进行订阅
    private void subscribe(ConnectableObservable cb) {
        Action1<Long> action2 = new Action1<Long>() {
            @Override
            public void call(Long aLong) {
                Log.d("debug", "action2:" + aLong);
            }
        };
    
        Action1<Long> action1 = new Action1<Long>() {
            @Override
            public void call(Long aLong) {
                Log.d("debug", "action1:" + aLong);
                if (aLong == 3) {   //发射第4个数据时(即过了4秒),action2开始订阅
                    cb.subscribe(action2);
                }
            }
        };
    
        cb.subscribe(action1);
        cb.connect();
    }
    
        //创建不同的可连接的Obserable进行订阅,并观察打印结果
        //订阅缓存两个数据的可连接的Observable
        subscribe(relayCountObserver());
        /**
         * action1:打印0,1,2,3,4,5
         * action2:打印2,3,4,5(action2订阅前已经发射了4个数据,即0,1,2,3,
         * 其中2,3被可连接的Observable缓存,因此可获取到)
         */
    
        //订阅缓存3秒的可连接的Observable
        subscribe(relayTimeObserver());
        /**
         * action1:打印0,1,2,3,4,5
         * action2:打印1,2,3,4,5(action2订阅前已经发射了4秒,可连接的Observable缓存了3秒内的数据,
         * 即1,2,3,因此可获取到
         */
    
        //订阅缓存3秒前最后2个数据的可连接的Observable
        subscribe(relayTimeAndCountObserver());
        /**
         * action1:打印0,1,2,3,4,5
         * action2:打印2,3,4,5(action2订阅前已经发射了4秒,3秒内发射的数据为1,2,3,最后2位数据被
         * Observable缓存,即2,3,因此可获取到
         */
    

    阻塞操作

    BlockingObservable:一个阻塞的Observable,普通的Observable可使用Observable.toBlocking( )方法或者BlockingObservable.from( )方法转换成BlockingObservable

    • forEach:对BlockingObservable发射的每一项数据调用一个方法,会阻塞直到Observable完成

        Observable.interval(500, TimeUnit.MILLISECONDS,Schedulers.newThread())
                .take(3)
                .toBlocking()
                .forEach(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.d("debug", aLong.toString() + " " + Thread.currentThread().getName());
                    }
                });
        
        Log.d("debug", Thread.currentThread().getName());  //打印当前线程名
        
        /**
         * 日志输出:阻塞的Observable会在Observable完成前阻塞当前线程
         * 0 RxNewThreadScheduler-1
         * 1 RxNewThreadScheduler-1
         * 2 RxNewThreadScheduler-1
         * main
         */
      

    *first / firstOrDefault / last / lastOrDefault:这几个操作符的用法与在普通的Observable中用法相同,用于阻塞操作时会直接返回发射的数据(使用first或last时,找不到指定元素会报异常)

        Integer first = just(1, 2, 3).toBlocking().first();
        Log.d("debug", first.toString());   //打印1
    
        Integer first1 = just(1, 2, 3).toBlocking().first(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                return integer > 2;
            }
        });
        Log.d("debug", first1.toString());  //打印3
    
        Integer first3 = Observable.just(1, 2, 3).toBlocking().firstOrDefault(4, new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                return integer > 3;
            }
        });
                
        Log.d("debug", first3.toString());  //打印4
    
        //last与lastOrDefault也是一样,不重复了
    
    • single / singleOrDefault:如果Observable终止时只发射了一个值或者满足条件的值只有一个,返回那个值,否则抛出异常或者发射默认值

        Integer single1 = Observable.just(1).toBlocking().single();
        Log.d("debug", single1.toString());     //打印1
      
        Integer single2 = Observable.just(1, 2, 3).toBlocking().single();
        Log.d("debug", single2.toString());     //报异常,发射的数据不止1个
      
        Integer single3 = Observable.from(new ArrayList<Integer>()).toBlocking().single();
        Log.d("debug", single3.toString());  //报异常,没有发射数据
      
        Integer single4 = Observable.just(1, 2, 3).toBlocking().single(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                return integer > 2;
            }
        });
        Log.d("debug", single4.toString());  //打印3
      
        Integer single5 = Observable.just(1, 2, 3).toBlocking().single(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                return integer > 1;
            }
        });
        Log.d("debug", single5.toString());  //报异常,此时满足条件的有2,3
      
        Integer singleOrDefault1 = Observable.just(1, 2, 3).toBlocking().singleOrDefault(5);
        Log.d("debug", singleOrDefault1.toString()); //报异常,发射多个数据时报异常
      
        Integer singleOrDefault2 = Observable.from(new ArrayList<Integer>()).toBlocking().singleOrDefault(5);
        Log.d("debug", singleOrDefault2.toString());  //打印5,没有发射数据时发射默认值
      
    • mostRecent:返回一个Iterable,Iterable返回的值总为Observable最后发射的数据,Observable未发射数据前总返回一个指定的默认值,Observable结束后不再返回

        new Thread(new Runnable() {
            @Override
            public void run() {
                BlockingObservable<Integer> blockingObservable = Observable.create(new Observable.OnSubscribe<Integer>() {
                    @Override
                    public void call(Subscriber<? super Integer> subscriber) {
                        SystemClock.sleep(1000);
                        subscriber.onNext(1);
                        SystemClock.sleep(1000);
                        subscriber.onNext(2);
                        SystemClock.sleep(1000);
                        subscriber.onNext(3);
                        SystemClock.sleep(1000);
                        subscriber.onCompleted();
                    }
                }).subscribeOn(Schedulers.newThread()).toBlocking();
                Iterable<Integer> integers = blockingObservable.mostRecent(4);
                for (Integer integer : integers) {
                    Log.d("debug", integer.toString());
                    SystemClock.sleep(500);     //打印4,4,1,1,2,2,3,3
                }
            }
        }).start();
      
    • next:返回一个Iterable,每当Iterable准备返回一个值时,都会阻塞直到Observable发射下一个数据,然后返回这个值

        new Thread(new Runnable() {
            @Override
            public void run() {
                BlockingObservable<Integer> blockingObservable = Observable.create(new Observable.OnSubscribe<Integer>() {
                    @Override
                    public void call(Subscriber<? super Integer> subscriber) {
                        SystemClock.sleep(1000);
                        subscriber.onNext(1);
                        SystemClock.sleep(1000);
                        subscriber.onNext(2);
                        SystemClock.sleep(1000);
                        subscriber.onNext(3);
                        SystemClock.sleep(1000);
                        subscriber.onCompleted();
                    }
                }).subscribeOn(Schedulers.newThread()).toBlocking();
                Iterable<Integer> integers = blockingObservable.next();
                Iterator<Integer> iterator = integers.iterator();
      
                long startTime = System.currentTimeMillis();  //开始时间
      
                Integer next1 = iterator.next();    //阻塞了1秒
                Log.d("debug", System.currentTimeMillis() - startTime + ":" + next1);   //打印1003:1
      
                Integer next2 = iterator.next();
                Log.d("debug", System.currentTimeMillis() - startTime + ":" + next2);   //打印2004:2
      
                Integer next3 = iterator.next();
                Log.d("debug", System.currentTimeMillis() - startTime + ":" + next3);   //打印3004:3
            }
        }).start();
      
    • latest:返回一个Iterable,与next()操作符类似,但是它不会阻塞等待下一个值,而是立即返回最近发射的数据项,只在Observable未发出时阻塞

        new Thread(new Runnable() {
            @Override
            public void run() {
                Iterable<Integer> latest = Observable.create(new Observable.OnSubscribe<Integer>() {
                    @Override
                    public void call(Subscriber<? super Integer> subscriber) {
                        SystemClock.sleep(1000);
                        subscriber.onNext(1);
                        SystemClock.sleep(1000);
                        subscriber.onNext(2);
                        SystemClock.sleep(1000);
                        subscriber.onNext(3);
                        SystemClock.sleep(1000);
                        subscriber.onCompleted();
                    }
                }).subscribeOn(Schedulers.newThread()).toBlocking().latest();
      
                Iterator<Integer> iterator = latest.iterator();
                long startTime = System.currentTimeMillis();
      
                Integer next1 = iterator.next();    //阻塞了1秒
                Log.d("debug", System.currentTimeMillis() - startTime + ":" + next1);   //打印1003:1
      
                Integer next2 = iterator.next();
                Log.d("debug", System.currentTimeMillis() - startTime + ":" + next2);   //打印2004:2
      
                Integer next3 = iterator.next();
                Log.d("debug", System.currentTimeMillis() - startTime + ":" + next3);   //打印3004:3
            }
        }).start();
      
    • toIterable:将一个发射数据序列的Observable转换为一个Iterable,通过这个Iterable可获取到Observable发射的所有数据,每当Iterable准备返回一个值时,如果Observable未发射该项数据,则阻塞直到Observable发射并返回该值,如果Observable已发射该项数据,则直接返回(与next, latest之间的差异见下面代码)

        BlockingObservable<Integer> blockingObservable = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                SystemClock.sleep(1000);
                subscriber.onNext(1);
                SystemClock.sleep(1000);
                subscriber.onNext(2);
                SystemClock.sleep(1000);
                subscriber.onNext(3);
                SystemClock.sleep(1000);
                subscriber.onCompleted();
            }
        }).subscribeOn(Schedulers.newThread()).toBlocking();
      
        new Thread(new Runnable() {
            @Override
            public void run() {
                Iterable<Integer> iterable  = blockingObservable.toIterable();
      
                Iterator<Integer> iterator = iterable.iterator();
                long startTime = System.currentTimeMillis();
      
                Integer next1 = iterator.next();    //阻塞了1秒
                Log.d("debug", System.currentTimeMillis() - startTime + ":" + next1);   //打印1003:1
      
                SystemClock.sleep(3000);    //等待3秒,此时Observable的数据已经全部发射完成
      
                Integer next2 = iterator.next();    //直接返回
                Log.d("debug", System.currentTimeMillis() - startTime + ":" + next2);   //打印4002:2
      
                Integer next3 = iterator.next();    //直接返回
                Log.d("debug", System.currentTimeMillis() - startTime + ":" + next3);   //打印4004:3
            }
        }).start();
      
        new Thread(new Runnable() {
            @Override
            public void run() {
                Iterable<Integer> integers = blockingObservable.next();
                Iterator<Integer> iterator = integers.iterator();
      
                long startTime = System.currentTimeMillis();  //开始时间
      
                Integer next1 = iterator.next();    //阻塞了1秒
                Log.d("debug", System.currentTimeMillis() - startTime + ":" + next1);   //打印1003:1
      
                SystemClock.sleep(1500);    //阻塞1.2秒,此时数据2发射完成,数据3未发射
      
                Integer next2 = iterator.next();
                //(next返回的是Observable最近发射数据的下一个数据,下个数据发射完成前进行阻塞)
                Log.d("debug", System.currentTimeMillis() - startTime + ":" + next2);   //打印3004:3
            }
        }).start();
      
        new Thread(new Runnable() {
            @Override
            public void run() {
                Iterable<Integer> integers = blockingObservable.latest();
                Iterator<Integer> iterator = integers.iterator();
      
                long startTime = System.currentTimeMillis();  //开始时间
      
                Integer next1 = iterator.next();    //阻塞了1秒
                Log.d("debug", System.currentTimeMillis() - startTime + ":" + next1);   //打印1001:1
      
                SystemClock.sleep(2500);    //阻塞2.5秒,此时数据2和3发射完成
      
                Integer next2 = iterator.next();
                //(latest返回的是Observable最近发射的数据,数据未发射时进行阻塞)
                Log.d("debug", System.currentTimeMillis() - startTime + ":" + next2);   //打印3501:3
            }
        }).start();
      
    • getIterator:与toIterable类似,返回的是一个Iterator,不重复了

    • toFuture:将Observable转换为一个返回单个数据项的Future,如果原始Observable发射多个数据项,Future会收到一个IllegalArgumentException;如果原始Observable没有发射任何数据,Future会收到一个NoSuchElementException

        Future<Integer> future = Observable.just(1).toBlocking().toFuture();
        try {
            Integer integer = future.get();
            Log.d("debug", integer.toString()); //打印1
        } catch (Exception e) {
            e.printStackTrace();
        }
      

    工具集

    • materialize: 将Observable转换成一个通知列表

        Observable.just(1,2,3)
                .materialize()
                .subscribe(new Action1<Notification<Integer>>() {
                    @Override
                    public void call(Notification<Integer> integerNotification) {
                        Log.d("debug", integerNotification.getKind() + " " + integerNotification.getValue());
                        /**
                         * 打印
                         * OnNext 1
                         * OnNext 2
                         * OnNext 3
                         * OnCompleted null
                         */
                    }
                });
      
    • dematerialize:与materialize作用相反,将通知逆转成一个Observable

        Observable.just(1, 2, 3)
                .materialize()      //转换成通知列表
                .dematerialize()    //逆转回Observable
                .subscribe(new Action1<Object>() {
                    @Override
                    public void call(Object o) {
                        Log.d("debug", o.toString());   //打印1,2,3
                    }
                });
      
    • timestamp:给Observable发射的每个数据项添加一个时间戳

        Observable.just(1, 2, 3)
                .timestamp()
                .subscribe(new Action1<Timestamped<Integer>>() {
                    @Override
                    public void call(Timestamped<Integer> integerTimestamped) {
                        Log.d("debug", integerTimestamped.getTimestampMillis() + " " + integerTimestamped.getValue());
                        /**
                         * 打印
                         * 1506740662707 1
                         * 1506740662707 2
                         * 1506740662708 3
                         */
                    }
                });
      
    • serialize:强制Observable按次序发射数据并且要求功能是完好的

    • cache:缓存Observable发射的数据序列并发射相同的数据序列给后续的订阅者

        new Thread(new Runnable() {
            @Override
            public void run() {
                Observable<Long> cache = Observable.interval(500, TimeUnit.MILLISECONDS)
                        .take(4)
                        .cache();
      
                cache.subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.d("debug", aLong.toString());   //打印0,1,2,3
                    }
                });
      
                SystemClock.sleep(1200);
                cache.subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.d("debug", aLong.toString());   //打印0,1,2,3
                    }
                });
            }
        }).start();
      
    • observeOn:指定观察者观察Observable的调度器

    • subscribeOn:指定Observable执行任务的调度器

    • doOnEach:注册一个动作,对Observable发射的每个数据项使用

        Observable.just(1, 2, 3)
                .doOnEach(new Observer<Integer>() {
                    @Override
                    public void onCompleted() {
                        Log.d("debug", "doOnEach: onCompleted");
                    }
      
                    @Override
                    public void onError(Throwable e) {
                        Log.d("debug", "doOnEach: onError");
                    }
      
                    @Override
                    public void onNext(Integer integer) {
                        Log.d("debug", "doOnEach: onNext " + integer);
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString());
                    }
                });
        /**
         * 打印
         * doOnEach: onNext 1
         * 1
         * doOnEach: onNext 2
         * 2
         * doOnEach: onNext 3
         * 3
         * doOnEach: onCompleted
         */
      
        Observable.just(1, 2, 3)
                .doOnEach(new Action1<Notification<? super Integer>>() {
                    @Override
                    public void call(Notification<? super Integer> notification) {
                        Log.d("debug", "doOnEach: " + notification.getKind() + " " + notification.getValue());
                    }
                })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("debug", integer.toString());
                    }
                });
        /**
         * 打印
         * doOnEach: onNext 1
         * 1
         * doOnEach: onNext 2
         * 2
         * doOnEach: onNext 3
         * 3
         * doOnEach: onCompleted null
         */
      
    • doOnCompleted:注册一个动作,对正常完成的Observable使用(即在OnCompleted)

    • doOnError:注册一个动作,对发生错误的Observable使用(即在OnError)

    • doOnTerminate:注册一个动作,对完成的Observable使用,无论是否发生错误

    • doOnSubscribe:注册一个动作,在观察者订阅时使用(可通过subscribeOn指定发生的线程)

    • doOnUnsubscribe: 注册一个动作,在观察者取消订阅时使用

    • finallyDo / doAfterTerminate: 注册一个动作,在Observable完成时使用

    • delay:延时发射Observable的结果。即让原始Observable在发射每项数据之前都暂停一段指定的时间段。效果是Observable发射的数据项在时间上向前整体平移了一个增量(除了onError,它会即时通知)

    • delaySubscription:延时处理订阅请求

    • using:创建一个只在Observable生命周期存在的资源,当Observable终止时这个资源会被自动释放

        Observable.using(new Func0<File>() {//资源工厂
            @Override
            public File call() {  //创建资源
      
                File file = new File(getCacheDir(), "a.txt");
                if (!file.exists()) {
                    try {
                        Log.d("debug", "--create--");
                        file.createNewFile();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                return file;
            }
        }, new Func1<File, Observable<String>>() { //返回生成的Observable
            @Override
            public Observable<String> call(File file) {
                return Observable.just(file.exists() ? "exist" : "no exist");
            }
        }, new Action1<File>() {//释放资源动作
            @Override
            public void call(File file) {
                if (file != null && file.exists()) {
                    Log.d("debug", "--delete--");
                    file.delete();
                }
            }
        })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.d("debug", s);
                    }
                });
      
        /**
         * 打印
         * --create--
         * exist
         * --delete--
         */
      
    • single / singleOrDefault:强制返回单个数据,与阻塞操作中的single基本相同,Observable发射多个数据时抛出异常,没有发射数据时,single抛出异常,singleOrDefault则返回默认数据

    最后

    Rxjava标准库的操作符已经介绍完毕

    相关文章

      网友评论

        本文标题:RxJava操作符(二)

        本文链接:https://www.haomeiwen.com/subject/frfdextx.html