美文网首页
Rxjava(二)之创建操作符与转换操作符

Rxjava(二)之创建操作符与转换操作符

作者: 梦星夜雨 | 来源:发表于2021-01-25 10:25 被阅读0次

    前言

    Rxjava之所以如此受欢迎,与其强大的操作符是息息相关的。它几乎能完成所有的功能需求。下面我们开始介绍常见的操作符。

    创建型操作符

    常见的创建型操作符有,create、just、fromArray、empty、range。
    对于create我们这里就不多做介绍了。

    just操作符

    Observable.just(1,2,3).subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG,"accept: "+integer);
                        }
                    });
    

    just操作符可以在内部发射任意数量相同类型的元素给观察者,通过分析源码我们可以知道其实内部调用的是fromArray操作符。

    fromArray操作符

    String[] items = {"张三","李四","王五"};
    Observable.fromArray(items).subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String str) throws Exception {
                            Log.d(TAG,"accept: "+str);
                        }
                    });
    

    fromArray操作符内部通过发射一个数集对象给观察者。

    empty操作符

    Observable.empty().subscribe(new Observer<Object>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG,"onSubscribe");
                }
    
                @Override
                public void onNext(Object o) {
                    Log.d(TAG,"onNext: "+o);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG,"onError");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG,"onComplete");
                }
            });
    
    onSubscribe
    onComplete
    

    empty操作符内部自己发射,下游默认是Object,无法发送有值事件,只会发送onComplete。

    range操作符

    Observable.range(30,5).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG,"accpt: "+integer);
                }
            });
    
    accpt: 30
    accpt: 31
    accpt: 32
    accpt: 33
    accpt: 34
    

    range操作符内部自己发射,从start(参数一)开始,发射count(参数二)个,每次加一。

    变换操作符

    变换操作符是对被观察者发送的事件进行一定的加工处理(转换)操作,然后再发送给被观察者。常见的变换操作符有map、flatMap、concatMap、groupBy、buffer。
    map操作符

    Observable.create(new ObservableOnSubscribe<Integer>() {
    
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    e.onNext(2);
                }
            }).map(new Function<Integer, String>() {
                @Override
                public String apply(Integer integer) throws Exception {
                    Log.d(TAG,"apply: "+integer);
                    return "["+integer+"}";
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String str) throws Exception {
                    Log.d(TAG,"accpt: "+str);
                }
            });
    
    apply: 2
    accpt: {2}
    

    可以看到,被观察者发送的是一个int型的数据,但我们通过map操作符,增加了自己的逻辑,然后返回了一个String类型的数据给观察者。

    flatMap操作符

    Observable.create(new ObservableOnSubscribe<String>() {
    
                @Override
                public void subscribe(ObservableEmitter<String> e) throws Exception {
                    e.onNext("张三");
                    e.onNext("李四");
                    e.onNext("王五");
                }
            }).flatMap(new Function<String, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(String str) throws Exception {
                    List<String> list = new ArrayList<>();
                    for (int i = 0; i < 3; i++) {
                        list.add(str + " 下标:" + (1 + i));
                    }
                    return Observable.fromIterable(list).delay(5, TimeUnit.SECONDS); // 创建型操作符,创建被观察者
    
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String str) throws Exception {
                    Log.d(TAG,"accpt: "+str);
                }
            });
    
    accpt: 张三 下标:1
    accpt: 张三 下标:2
    accpt: 张三 下标:3
    accpt: 李四 下标:1
    accpt: 王五 下标:1
    accpt: 王五 下标:2
    accpt: 王五 下标:3
    accpt: 李四 下标:2
    accpt: 李四 下标:3
    

    依旧是在被观察者和订阅观察者之间添加flatMap操作符,我们可以看到flatMap操作符需要返回一个ObservableSource对象,而ObservableSource实际上是一个接口,并且Observable实现了这个接口,那么我们可以返回一个Observable对象,我们可以在apply方法中添加自己的操作。
    这里我们为什么要发送这么对消息呢,因为我们要证明flatMap操作符是不排序的,由打印日志可以得到证明。

    concatMap操作符
    用法上和flatMap一样,它们的唯一区别就是concatMap操作符是排序的,至于代码和应用我这里就不赘述了。

    groupBy操作符

    Observable.just(1, 2, 100, 200, 3000, 4000).groupBy(new Function<Integer, String>() {
                @Override
                public String apply(Integer integer) throws Exception {
                    if (integer < 10) {
                        return "small";
                    } else if (integer >= 10 && integer < 1000) {
                        return "medium";
                    } else {
                        return "big";
                    }
                }
            }).subscribe(new Consumer<GroupedObservable<String, Integer>>() {
                @Override
                public void accept(final GroupedObservable<String, Integer> groupedObservable) throws Exception {
                    groupedObservable.subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG, "accept: " + groupedObservable.getKey()+"---"+integer);
                        }
                    });
                }
            });
    
     accept: small---1
     accept: small---2
     accept: medium---100
     accept: medium---200
     accept: big---3000
     accept: big---4000
    

    groupBy操作符是对被观察者发送的数据进行分组,然后将分组后的数据传递给观察者。这里需要注意的是,在观察者中得到的是GroupedObservable对象,若想得到原本被观察者的值,则需要再一次进行封装。

    buffer操作符

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    for(int i = 0;i<100;i++){
                        e.onNext(i);
                    }
                }
            }).buffer(20).subscribe(new Consumer<List<Integer>>() {
                @Override
                public void accept(List<Integer> integers) throws Exception {
                    Log.d(TAG,"accept: "+ integers);
                }
            });
    
    accept: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
    accept: [20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
    accept: [40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59]
    accept: [60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79]
    accept: [80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
    

    buffer操作符是对被观察者发送的数据进行分组,然后将分组完成后的数据封装成一个List发送给观察者。

    相关文章

      网友评论

          本文标题:Rxjava(二)之创建操作符与转换操作符

          本文链接:https://www.haomeiwen.com/subject/pxeiaktx.html