美文网首页
RxJava2主题v5

RxJava2主题v5

作者: keyboard3 | 来源:发表于2017-07-17 21:12 被阅读50次

    RxJava和java8的Stream类似 都是对数据流进行操作
    基于rxJava版本rxjava2-2.0.1

    Rxjava常用操作符

    创建

    create

    通过创建回调函数自定义subscribe发送

            Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onNext(1);
                    subscriber.onNext(2);
                    subscriber.onNext(3);
                }
            }).subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    System.out.print(integer + ",");
                }
            });
    //结果
    1,2,3,
    

    just

    将单个item作为输入,然后单项发送给subscribe

            Observable.just(1, 2, 3).subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    System.out.print(integer + ",");
                }
            });
    //结果
    1,2,3,
    

    from

    将数组作为输入,然后依次单个发送给subscribe

            Observable.from(Arrays.asList(1, 2, 3)).subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    System.out.print(integer + ",");
                }
            });
    //结果
    1,2,3,
    

    Empty/Never/Throw

    Empty:创建一个不发射任何数据但是正常终止的Observable(调用complete)
    Never:创建一个不发射数据也不终止的Observable(不调用complete)
    Throw:创建一个不发射数据以一个错误终止的Observable(调用error)

            System.out.println("\nEmpty:");
            Observable.empty().subscribe(new Subscriber<Object>() {
                @Override
                public void onCompleted() {
                    System.out.print("onCompleted");
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.print("onError");
                }
    
                @Override
                public void onNext(Object o) {
                    System.out.print("onNext");
                }
            });
            System.out.println("\nNever:");
            Observable.never().subscribe(new Subscriber<Object>() {
                @Override
                public void onCompleted() {
                    System.out.print("onCompleted");
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.print("onError");
                }
    
                @Override
                public void onNext(Object o) {
                    System.out.print("onNext");
                }
            });
    //结果
    Empty:
    onCompleted
    Never:
    

    Interval

    创建一个按固定时间间隔发射整数序列的Observable(异步的)
    Timer效果与其类似,不建议使用

           Observable.interval(1, TimeUnit.MILLISECONDS).subscribe(new Subscriber<Object>() {
                @Override
                public void onCompleted() {
                    System.out.print("onCompleted");
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.print("onError");
                }
    
                @Override
                public void onNext(Object o) {
                    System.out.print("onNext");
                }
            });
    //结果 每隔一秒生成一个next
    

    Range

    Range操作符发射一个范围内的有序整数序列,你可以指定范围的起始和长度。

            Observable.range(7, 3).subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    System.out.print(integer + ",");
                }
            });
    //结果
    7,8,9,
    

    repeat

    重复数据发送n遍
    repeatWhen指定条件才重复发送

            Observable.just(1, 2)
                    .repeat(2)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.print(integer + " ");
                        }
                    });
    //结果
    1 2 1 2 
    

    转换

    Buffer

    定期收集Observable的数据放进一个数据包裹,然后发射这些数据集合。(感觉应用场景:将数组按照拆分成大小相同的多个子数组)

            Observable.range(2, 6).buffer(3)
                    .subscribe(new Subscriber<List<Integer>>() {
                @Override
                public void onCompleted() {
                    System.out.printf("onCompleted\n");
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onNext(List<Integer> integers) {
                    System.out.printf(integers.toString());
                }
            });
    //结果
    [2, 3, 4][5, 6, 7]onCompleted
    

    window

    定期将来自原始Observable的数据分解为一个Observable窗口,发射这些窗口,而不是每次发射一项数据

            Observable.range(2, 6).window(3)
                    .subscribe(new Subscriber<Observable<Integer>>() {
                @Override
                public void onCompleted() {
                    System.out.printf("w-Completed\n");
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onNext(Observable<Integer> integerObservable) {
                    integerObservable.subscribe(new Subscriber<Integer>() {
                        @Override
                        public void onCompleted() {
                            System.out.printf("inner-Completed\n");
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            System.out.printf(integer + ",");
                        }
                    });
                }
            });
    //结果
    2,3,4,inner-Completed
    5,6,7,inner-Completed
    w-Completed
    

    cast

    在Observable发射数据前将所有数据类型进行强转为指定类型

            Observable.just(2.5F, 3, 5F, 0.6F).cast(Object.class).subscribe(new Action1<Object>() {
                @Override
                public void call(Object s) {
                    System.out.print(s + ",");
                }
            });
    //结果
    2.5,3,5.0,0.6,
    

    Map

    仅仅是将数据进行转换

            Observable.range(2, 5).map(new Func1<Integer, String>() {
                @Override
                public String call(Integer integer) {
                    return integer + "call";
                }
            }).subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    System.out.printf(s + ",");
                }
            });
    //结果
    2call,3call,4call,5call,6call,
    

    FlatMap

    将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,内部采用merge合并。(就是将数据转化为Observable,实际应用场景:将数组数据项通过from转成Observable,然后通过Merge将Observables中的数据项都平展开到新的Observable中,成为了将多维数组降维的目的)

            Observable.create(new Observable.OnSubscribe<List<Integer>>() {
                @Override
                public void call(Subscriber<? super List<Integer>> subscriber) {
                    subscriber.onNext(Arrays.asList(1, 2, 3, 4, 5));
                }
            }).flatMap(new Func1<List<Integer>, Observable<Integer>>() {
                @Override
                public Observable<Integer> call(List<Integer> integers) {
                    return Observable.from(integers);
                }
            }).subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    System.out.printf(integer + ",");
                }
            });
    //结果
    1,2,3,4,5,
    

    GroupBy

    将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据。(其实就是将数据项对应生成Key处理后转为一个Observable)

            Observable.range(0, 10).groupBy(new Func1<Integer, Integer>() {
                @Override
                public Integer call(Integer integer) {
                    return integer % 3;
                }
            }).subscribe(new Action1<GroupedObservable<Integer, Integer>>() {
                @Override
                public void call(final GroupedObservable<Integer, Integer> item) {
                    item.subscribe(new Subscriber<Integer>() {
                        @Override
                        public void onCompleted() {
                            System.out.print(item.getKey() + ":" + "onCompleted");
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            System.out.printf(item.getKey() + ":" + integer + ",");
                        }
                    });
                }
            });
    //结果
    0:0,1:1,2:2,0:3,1:4,2:5,0:6,1:7,2:8,0:9,0:onCompleted1:onCompleted2:onCompleted
    

    Scan

    对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值。

            Observable.range(0, 5).scan(new Func2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer sum, Integer item) {
                    return sum + item;
                }
            }).subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    System.out.printf(integer + ",");
                }
            });
    //结果
    0,1,3,6,10,
    

    过滤

    Debounce

    过滤特定时间内最近的数据发送,其他数据不发送。

            Observable.just(1, 2, 3)
                    .debounce(400, TimeUnit.MILLISECONDS)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.print("integer=" + integer);
                        }
                    });
    //结果:过滤400毫秒后最近的一个
    integer=3
    

    Sample

    周期性发送最近的数据

    Observable.interval(300, TimeUnit.MILLISECONDS)
                    .sample(500, TimeUnit.MILLISECONDS)
                    .subscribe(new Action1<Long>() {
                        @Override
                        public void call(Long aLong) {
                            System.out.print("s=" + aLong + ",");
                        }
                    });
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    //结果
    s=0,s=2,s=4,
    

    Distinct

    过滤排除重复的数据

            Observable.just(1, 2, 4, 1, 2, 1)
                    .distinct()
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.print("integer=" + integer + ",");
                        }
                    });
    //结果
    integer=1,integer=2,integer=4,
    

    ElementAt

    只过滤发送Observable中指定位置的数据

            Observable.just(1, 2, 3, 4, 5)
                    .elementAt(3)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.print("integer=" + integer + ",");
                        }
                    });
    //结果
    ElementAt
    integer=4,
    

    Filter

    定义过滤函数来过滤发送的数据

            Observable.just(1, 2, 3, 4)
                    .filter(new Func1<Integer, Boolean>() {
                        @Override
                        public Boolean call(Integer integer) {
                            return integer == 3||integer==4;
                        }
                    })
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.print("integer=" + integer + ",");
                        }
                    });
    //结果
    integer=3,integer=4,
    

    First

    过滤第一个数据或者符合过滤条件的第一个数据

            Observable.just(1, 2, 3, 4)
                    .first(new Func1<Integer, Boolean>() {
                        @Override
                        public Boolean call(Integer integer) {
                            return integer % 2 == 0;
                        }
                    })
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.print("integer=" + integer + ",");
                        }
                    });
    //结果
    integer=2,
    

    Last

    过滤最后一个数据或者符合过滤条件的最后一个数据

            Observable.just(1, 2, 3, 4)
                    .last(new Func1<Integer, Boolean>() {
                        @Override
                        public Boolean call(Integer integer) {
                            return integer % 2 == 0;
                        }
                    })
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.print("integer=" + integer + ",");
                        }
                    });
    //结果
    integer=4,
    

    IgnoreElements

    忽略不让原Observable所有数据发送,但允许发送终止通知。如onError和onComplete

            Observable
                    .create(new Observable.OnSubscribe<Integer>() {
                        @Override
                        public void call(Subscriber<? super Integer> subscriber) {
                            subscriber.onNext(1);
                            subscriber.onCompleted();
                            subscriber.onNext(2);
                            subscriber.onNext(3);
                        }
                    })
                    .ignoreElements()
                    .subscribe(new Subscriber<Integer>() {
                        @Override
                        public void onCompleted() {
                            System.out.print("onCompleted");
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            System.out.print("integer=" + integer + ",");
                        }
                    });
    //结果
    onCompleted
    

    Skip

    跳过头到第n个数据发送

            Observable.range(0, 10)
                    .skip(5)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.print("integer=" + integer + ",");
                        }
                    });
    //结果
    integer=5,integer=6,integer=7,integer=8,integer=9,
    

    SkipLast

    跳过从尾数到第n个数据发送

            Observable.range(0, 10)
                    .skipLast(5)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.print("integer=" + integer + ",");
                        }
                    });
    //结果
    integer=0,integer=1,integer=2,integer=3,integer=4,
    

    Take

    从头开始过滤出n个数据发送

            Observable.range(0, 10)
                    .take(5)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.print("integer=" + integer + ",");
                        }
                    });
    //结果
    integer=0,integer=1,integer=2,integer=3,integer=4,
    

    TakeLast

    从尾开始过滤出n个数据发送

            Observable.range(0, 10)
                    .takeLast(5)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            System.out.print("integer=" + integer + ",");
                        }
                    });
    //结果
    integer=5,integer=6,integer=7,integer=8,integer=9,
    

    组合

    merge

    合并两个Observable的数据结果,顺序不定(猜测是异步发送)

            Observable<Long> interval1 = Observable.interval(100, TimeUnit.MILLISECONDS)
                    .map(new Func1<Long, Long>() {
                        @Override
                        public Long call(Long aLong) {
                            return aLong - 5L;
                        }
                    }).take(5);//抑制定时整数序列数量5个
            Observable<Long> interval2 = Observable.interval(100, TimeUnit.MILLISECONDS).take(5);//抑制定时整数序列数量5个
            Observable.merge(interval1, interval2).subscribe(new Action1<Number>() {
                @Override
                public void call(Number number) {
                    System.out.print(number + " ");
                }
            });
    //结果
    0 -5 -4 1 -3 2 -2 3 -1 4 
    

    Concat

    合并两个Observable的数据结果,顺序前后确定(同步发送,第一个发送完毕才会发送第二个)

            Observable<Long> interval1 = Observable.interval(100, TimeUnit.MILLISECONDS)
                    .map(new Func1<Long, Long>() {
                        @Override
                        public Long call(Long aLong) {
                            return aLong - 5L;
                        }
                    }).take(5);//抑制定时整数序列数量5个
            Observable<Long> interval2 = Observable.interval(100, TimeUnit.MILLISECONDS).take(5);//抑制定时整数序列数量5个
            Observable.concat(interval1, interval2).subscribe(new Action1<Number>() {
                @Override
                public void call(Number number) {
                    System.out.print(number + " ");
                }
            });
    //结果
    -5 -4 -3 -2 -1 0 1 2 3 4 
    

    startWith

    在observable的数据源前面插入数据

            Observable<Integer> range = Observable.range(0, 10);
            range.startWith(-2, -1).subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    System.out.print(integer + " ");
                }
            });
    //结果
    -2 -1 0 1 2 3 4 5 6 7 8 9 
    

    zip

    合并Observables的数据,使用发射的顺序作为合并的标记
    Javadoc: zip(Iterable,FuncN)

    Javadoc: zip(Observable,FuncN)

    Javadoc: zip(Observable,Observable,Func2) (最多可以有九个Observables参数)

            Observable<Long> interval5 = Observable.interval(100, TimeUnit.MILLISECONDS)
                    .map(new Func1<Long, Long>() {
                        @Override
                        public Long call(Long aLong) {
                            return aLong - 5L;
                        }
                    }).take(6);//抑制定时整数序列数量5个
            Observable<Long> interval6 = Observable.interval(100, TimeUnit.MILLISECONDS).take(5);//抑制定时整数序列数量5个
            Observable.zip(interval5, interval6, new Func2<Long, Long, String>() {
                @Override
                public String call(Long aLong, Long aLong2) {
                    return aLong + "~" + aLong2;
                }
            }).subscribe(new Action1<String>() {
                @Override
                public void call(String item) {
                    System.out.print(item + " ");
                }
            });
    //结果
    -5~0 -4~1 -3~2 -2~3 -1~4 
    

    CombineLatest

    根据每个Observable的发送时间作为合并标记,任何一个Observable数据发送之后就和其他的Observable最近发送的数据进行合并。

            Observable<Long> interval7 = Observable.interval(100, TimeUnit.MILLISECONDS)
                    .map(new Func1<Long, Long>() {
                        @Override
                        public Long call(Long aLong) {
                            return aLong - 5L;
                        }
                    }).take(6);//抑制定时整数序列数量5个
            Observable<Long> interval8 = Observable.interval(200, TimeUnit.MILLISECONDS).take(5);//抑制定时整数序列数量5个
            Observable.combineLatest(interval7, interval8, new Func2<Long, Long, String>() {
                @Override
                public String call(Long aLong, Long aLong2) {
                    return aLong + "~" + aLong2;
                }
            }).subscribe(new Action1<String>() {
                @Override
                public void call(String item) {
                    System.out.print(item + " ");
                }
            });
    //结果
    -4~0 -3~0 -3~1 -2~1 -1~1 -1~2 0~2 0~3 0~4 
    

    join

    合并的两个Observable的数据,合并标记基于对Observable数据定义的生命周期,如果数据过了生命周期则就不会和其他Observable的数据进行合并,否则就排列组合形式合并。(如果不懂建议点击链接直接看图)

            Observable<Long> observable1 = Observable.interval(300, TimeUnit.MILLISECONDS).delay(400, TimeUnit.MILLISECONDS).take(5);//延迟1个多身位
            Observable<Long> observable2 = Observable.interval(300, TimeUnit.MILLISECONDS).take(5);
            observable1.join(observable2, new Func1<Long, Observable<Long>>() {
                @Override
                public Observable<Long> call(Long aLong) {
                    return Observable.just(aLong).delay(100, TimeUnit.MILLISECONDS);//每个数据100毫秒生命
                }
            }, new Func1<Long, Observable<Long>>() {
                @Override
                public Observable<Long> call(Long aLong) {
                    return Observable.just(aLong).delay(150, TimeUnit.MILLISECONDS);//每个数据150毫秒生命
                }
            }, new Func2<Long, Long, String>() {
                @Override
                public String call(Long aLong, Long aLong2) {
                    return "A-" + aLong + ":B-" + aLong2;
                }
            }).subscribe(new Action1<String>() {
                @Override
                public void call(String s) {
                    System.out.print(s+" ");
                }
            });
    //结果
    A-0:B-1 A-1:B-2 A-2:B-3 A-3:B-4 
    

    错误处理

    catch

    拦截原始Observable的onError通知,将它替换为其它的数据项或数据序列,让产生的Observable能够正常终止或者根本不终止。

    • onErrorReturn
      让Observable遇到错误时发射一个特殊的项并且正常终止。
            Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onError(new IllegalAccessException("no access"));
                }
            }).onErrorReturn(new Func1<Throwable, Integer>() {
                @Override
                public Integer call(Throwable throwable) {
                    return 10001;
                }
            }).subscribe(new Subscriber<Integer>() {
                @Override
                public void onCompleted() {
    
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.print("onError");
                }
    
                @Override
                public void onNext(Integer integer) {
                    System.out.print("error_code:" + integer);
                }
            });
    //结果
    error_code:10001
    
    • onErrorResumeNext
      让Observable在遇到错误时开始发射指定的第二个Observable的数据序列。
            Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onError(new IllegalAccessException("no access"));
                }
            }).onErrorResumeNext(new Func1<Throwable, Observable<? extends Integer>>() {
                @Override
                public Observable<? extends Integer> call(Throwable throwable) {
                    return Observable.just(1, 2);
                }
            }).subscribe(new Subscriber<Integer>() {
                @Override
                public void onCompleted() {
    
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.print("onError");
                }
    
                @Override
                public void onNext(Integer integer) {
                    System.out.print(",value:" + integer);
                }
            });
    //结果
    ,value:1,value:2
    
    • onExceptionResumeNext
      和onErrorResumeNext类似,onExceptionResumeNext方法返回一个备用Observable,不同的是,如果onError收到的Throwable不是一个Exception,它会将错误传递给观察者的onError方法,不会使用备用的Observable。
            Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onError(null);
                }
            }).onExceptionResumeNext(Observable.just(1, 2, 3, 4))
                    .subscribe(new Subscriber<Integer>() {
                        @Override
                        public void onCompleted() {
    
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            System.out.print("onError:");
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            System.out.print(",value:" + integer);
                        }
                    });
    //结果
    onError:
    

    重试

    如果原始Observable遇到错误,重新订阅它期望它能正常终止,数据会有部分重复。

            Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onError(new IllegalAccessException("IllegalAccessException"));
                }
            }).retry(new Func2<Integer, Throwable, Boolean>() {
                @Override
                public Boolean call(Integer integer, Throwable throwable) {
                    System.out.println("num:" + integer + " throwable:" + throwable.getMessage());
                    return !(throwable instanceof IllegalAccessException);
                }
            }).subscribe(new Subscriber<Integer>() {
                @Override
                public void onCompleted() {
    
                }
    
                @Override
                public void onError(Throwable e) {
                    System.out.println("onError:" + e.getMessage());
                }
    
                @Override
                public void onNext(Integer integer) {
    
                }
            });
    //结果
    num:1 throwable:IllegalAccessException
    onError:IllegalAccessException
    
    • retrywhen
      不太理解...(主要作用可以和retorfit一起用)
            Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    System.out.println("subscribing");
                    subscriber.onError(new RuntimeException("always fails"));
                }
            }).retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                @Override
                public Observable<?> call(Observable<? extends Throwable> observable) {
                    return observable.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Integer>() {
                        @Override
                        public Integer call(Throwable throwable, Integer integer) {
                            return integer;
                        }
                    }).flatMap(new Func1<Integer, Observable<?>>() {
                        @Override
                        public Observable<?> call(Integer integer) {
                            System.out.println("delay retry by " + integer + " second(s)");
                            return Observable.timer(integer, TimeUnit.SECONDS);
                        }
                    });
                }
            }).toBlocking().forEach(new Action1<Object>() {
                @Override
                public void call(Object o) {
                    System.out.println("foreach:"+o);
                }
            });
    //结果
    subscribing
    delay retry by 1 second(s)
    subscribing
    delay retry by 2 second(s)
    subscribing
    delay retry by 3 second(s)
    subscribing
    

    除了这些还有辅助、条件和布尔、算术和聚合、连接、转换、操作符决策树操作符
    其他操作符太零散了,直接参考下面
    中文翻译
    官网-操作符介绍
    操作符doc文档

    相关文章

      网友评论

          本文标题:RxJava2主题v5

          本文链接:https://www.haomeiwen.com/subject/mgoxkxtx.html