美文网首页
RxJava操作符

RxJava操作符

作者: 慕尼黑凌晨四点 | 来源:发表于2020-12-08 12:33 被阅读0次

    RxJava操作符

    官网地址:http://reactivex.io/documentation/operators.html
    一个很详细的博客:https://juejin.cn/post/6844903885245513741

    按照操作符的类型可分为:创建操作符,变换操作符,过滤操作符,组合操作符,错误操作符,辅助操作符,条件和布尔操作符,算术和聚合操作符 及 连接操作符 等。

    RxJava3 中 Action<T>被弃用了,改为Consumer<T>

    创建操作符

    • create传入自定义Onsubscribe
    Observable.create(new Observable.OnSubscribe<T>(){
        @Overrode
        public void call(Subscriber<? super T>)  sub){
            sub.onNext("0");
            sub.onNext("1");
            sub.onCompleted();
        }
    })
    
    • from 传入数组:
    String list = {"0","1","2"}
    Observable.from(list);
    
    • just
    Observable.just("0","1","2")
    
    • interval 创建一个 按固定时间间隔发射整数<Long>序列 的Observable。
    Observable<Long> observable = Observable.interval(3, TimeUnit.SECONDS, Schedulers.trampoline());
    //, Schedulers.trampoline() 指定一个线程,默认新线程(NEW_THREAD),这种情况下主线程结束,程序就结束了。
    observable.subscribe(new Consumer<Long>() {  //泛型只能是Long类型,下同
        public void accept(Long aLong) throws Throwable {
             //TODO 
        }
    });
    

    <img src="C:\Users\Administrator\AppData\Roaming\Typora\typora-user-images\image-20201207122725363.png" alt="image-20201207122725363" style="zoom:50%;" />

    • range 创建 发射指定范围的整数<Integer>序列 的Observable。
    Observable<Integer> observable = Observable.range(0,5);//左闭右开
    
    • repeat 重复多少次
    Observable<Integer> observable = Observable.range(0,5)
            .repeat(2)//重复两次
    

    变换操作符

    • map 将一个对象转变为另一个对象; ----偷梁换柱
    Observable<Integer> observable = Observable.range(0,5);
    Observable<String> map = observable.map(new Function<Integer, String>() {
        public String apply(Integer integer) throws Throwable {
            return integer.toString();
        }
    });
    
    • flatMapcast flatMap 将一个对象转为Observable集合; cast转为集合后电脑不知道你的Observable的泛型类型,需要用cast转换。 -----偷天换日
    final Observable<Integer> observable = Observable.range(0,5);
    observable.flatMap(new Function<Integer, ObservableSource<?>>() {
        public ObservableSource<?> apply(Integer integer) throws Throwable {
            return Observable.just("0","1","haha");
        }
    }).cast(String.class);
    
    • concatMap

    FlatMap类似。区别在于:flatMap不保证顺序,当上一个操作未执行时,下一个操作可能已经发射。

    concatMap保证了顺序,必须等到上一个操作发射完,才会发射下一个。

    concatMapflatMap的功能是一样的, 将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据放进一个单独的Observable。只不过最后合并ObservablesflatMap采用的merge,而concatMap采用的是连接(concat)。总之一句一话,他们的区别在于:concatMap是有序的,flatMap是无序的,concatMap最终输出的顺序与原序列保持一致,而flatMap则不一定,有可能出现交错。 ---出自这里

    • **flatMapIteravle ** 把每一个元素转换成Iterable
    Observable
            .interval(1, TimeUnit.SECONDS, Schedulers.trampoline())
            .flatMapIterable( aLong -> Arrays.asList("a", "integer") )
            .subscribe(s -> System.out.println(s));
    
    • buffer 将Observable转为一个新的Observable,这个新Observable每次发射一组列表,而不是一个个发射。
    Observable
            .interval(1, TimeUnit.SECONDS, Schedulers.trampoline())
            .buffer(3)//集满3个再一并发射
    
    • groupBy 将Observable转为一个新的Observable,然后可以分组。
    Person p1 = new Person("hh",13);
    Person p2 = new Person("ff",13);
    Person p3 = new Person("gg",23);
    Person p4 = new Person("aa",13);
    Person p5 = new Person("bb",33);
    @NonNull Observable<GroupedObservable<Integer, Person>> group = Observable.just(p1, p2, p3, p4, p5)
        .groupBy(p -> p.age);//按年龄进行排序
    Observable.concat(group).subscribe(new Consumer<Person>() {
        @Override
        public void accept(Person p) throws Throwable {
            System.out.println(">>>>>>"+p.name+","+p.age);
        }
    });
    //Result ----------------------按年龄进行排序结果----
    >>>>>>hh,13
    >>>>>>ff,13
    >>>>>>aa,13
    >>>>>>gg,23
    >>>>>>bb,33
    
    

    过滤操作符

    • filter 过滤
    Observable.just(1, 2, 3, 4,7, 5)
            .filter(i -> i>2)//只保留大于2的元素
    
    • elementAt 返回指定位置的数据

    • distinct 去重

    • skip 和 take skip过滤掉前n项,take只取前n项目。

    • ignoreElements 忽略所有元素,只保留onComplete和onError通知

    • throttleFirst 定期发射这个时间段里源Observable发送的第一条数据。默认调度器computation

    Observable.interval(1000,TimeUnit.MILLISECONDS, Schedulers.trampoline())
            .throttleFirst(2000,TimeUnit.MILLISECONDS)
            .subscribe(l -> System.out.println(l));
    //-------------------结果-----------
    //  0,3,6,9, .........
    //2000改为1900则是 2,4,6,8 ...自己体会
    
    image-20201208100601354.png
    • throttleWithTimeOut

    通过时间来限流。源Observable每次发射出来一个数据后都会进行计时,如果再设定好的时间结束前Observable有新的数据发射出来,这个数据就会被丢弃,同时开始重新计时。

    组合操作

    • startWith 在源数据之前插入一系列数据;
    Observable.just(3,4,5).startWith(1,2)
    
    • merge 合并Observable,可能会数据交错。
    Observable.merge(obs1,obs2)
    
    • concat 合并Observable,严格按照顺序发射。
    Observable.concat(obs1,obs2)
    
    • zip 合并数据。
    @NonNull Observable<Long> obs1 = Observable.just(1L,2L,3L);
    @NonNull Observable<String> obs2 = Observable.just("1L","2L","3L");
    Observable.zip(obs1,obs2,(l1,l2) -> l1+","+l2).subscribe(l -> System.out.println(l));
    ----------------------
    1,1L
    2,2L
    3,3L
    
    • **combineLasetst ** 用前者最后一条发射的数据进行合并。
    @NonNull Observable<Long> obs1 = Observable.just(1L,2L,3L);
    @NonNull Observable<String> obs2 = Observable.just("1L","2L","3L");
    Observable.combineLatest(obs1,obs2,(l1,l2) -> l1+","+l2).subscribe(l -> System.out.println(l));
    ------------------------
    3,1L
    3,2L
    3,3L
    

    辅助操作符

    • delay 延迟操作

    • Do doXXX,do后面加上某个生命周期,即对这个生命周期加了个回调。

    • subscribeOnobserveOn 指定线程;在哪个线程发布,在哪个线程观察

    • timeOut 指定一段时间,超时则onError;

    错误处理操作符

    • catch onErrorReturnonErrorResumeonExceptionResumeNext
    RxJavaPlugins.setErrorHandler(e -> { });
    Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
        for (int i = 0; i < 20; i++) {
            if (i>2{
                emitter.onError(new Throwable("ERROR"));
            }
            emitter.onNext(i);
        }
        emitter.onComplete();
    })onErrorReturn(t -> 100)//错误的时候直接返回数据100
    0
    1
    2
    100
    complete
    
    • retry 出错后重试的次数。

    布尔操作符

    • all 对源Observable发射的所有数据进行判断,全满足条件则返回true,否则返回false
    Observable.just(1,2,3,4,5)
            .all(integer -> {
                System.out.println(integer);
                return integer<3;
            })
            .subscribe(aBoolean -> System.out.println(aBoolean));
    //-----------------结果------
    1
    2
    3
    false
    
    • contains 是否包含这个数据;isEmpty 是否为空;

    条件操作符

    • amb 多个Observable中,只发射第一个发射数据的的Observable。
    • defaultEmpty 没发射数据则返回一个默认数据。

    转换操作符

    将Observable转换成另一个对象或数据结构。比map更简介。

    • toList
    Observable.just(1,2,3,4,5)
            .toList()
            .subscribe(list -> {System.out.println(list.size())});//{1,2,3.4,5}
    
    • toSortedListtoList一样,不过多了个排序功能,所以发射对象要Comparable。
    Observable.just(1,2,3,4,5)
            .toMap(new Function<Integer, String>() {
                @Override
                public String apply(Integer integer) throws Throwable {
                    return integer.toString();
                }
            })
            .subscribe(map -> System.out.println(map.toString()));
    //-------------------------result---------------
    //.subscribe(map -> System.out.println(map.toString()));
    //返回的是 {("1",1),("2",2) ...("5",5)}这个map
    

    未完待续。。。

    ps: 其实还有很多没写,太多了,学不动了😵

    相关文章

      网友评论

          本文标题:RxJava操作符

          本文链接:https://www.haomeiwen.com/subject/hzmfgktx.html