美文网首页工作生活
Rxjava2的操作符 一

Rxjava2的操作符 一

作者: 天地玄黄 | 来源:发表于2019-07-08 10:24 被阅读0次

    一 创建操作符

    1.1 创建被观察者

    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            e.onNext("Hello Observer");
            e.onComplete();
        }
    });
    

    表示创建一个被观察者,其中 e.onNext("Hello Observer"),表示被观察者发送 "Hello Observer"到观察者。

    1.2 创建观察者

    Observer<String> observer = new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(String s) {
            Log.d("chan","=============onNext " + s);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
            Log.d("chan","=============onComplete ");
        }
    };   
    observable.subscribe(observer);
    

    表示创建一个观察者,动过订阅操作,可以接受来自被观察者发送的数据。

    1.3 just()

    Observable.just(1, 2, 3)
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "=================onSubscribe");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "=================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "=================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "=================onComplete ");
        }
    });
    

    表示创建一个被观察者,并发送事件,发送的事件不可以超过10个以上。

    1.4 From 操作符
    1.4.1 fromArray()

    Integer array[] = {1, 2, 3, 4};
    Observable.fromArray(array)
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "=================onSubscribe");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "=================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "=================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "=================onComplete ");
        }
    });
    
    

    这个方法和 just() 类似,只不过 fromArray 可以传入多于10个的变量,并且可以传入一个数组。

    1.4.2 fromCallable()

    Observable.fromCallable(new Callable < Integer > () {
    
        @Override
        public Integer call() throws Exception {
            return 1;
        }
    })
    .subscribe(new Consumer < Integer > () {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "================accept " + integer);
        }
    });
    

    这里的 Callable 是 java.util.concurrent 中的 Callable,Callable 和 Runnable 的用法基本一致,只是它会返回一个结果值,这个结果值就是发给观察者的。

    1.4.3 fromFuture()

    FutureTask < String > futureTask = new FutureTask < > (new Callable < String > () {
        @Override
        public String call() throws Exception {
            Log.d(TAG, "CallableDemo is Running");
            return "返回结果";
        }
    });
    
    Observable.fromFuture(futureTask)
        .doOnSubscribe(new Consumer < Disposable > () {
        @Override
        public void accept(Disposable disposable) throws Exception {
            futureTask.run();
        }
    })
    .subscribe(new Consumer < String > () {
        @Override
        public void accept(String s) throws Exception {
            Log.d(TAG, "================accept " + s);
        }
    });
    

    参数中的 Future 是 java.util.concurrent 中的 Future,Future 的作用是增加了 cancel() 等方法操作 Callable,它可以通过 get() 方法来获取 Callable 返回的值。

    1.4.4 fromIterable()

    List<Integer> list = new ArrayList<>();
    list.add(0);
    list.add(1);
    list.add(2);
    list.add(3);
    Observable.fromIterable(list)
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "=================onSubscribe");
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "=================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "=================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "=================onComplete ");
        }
    });
    

    直接发送一个 List 集合数据依次给观察者

    1.5 defer()

    // i 要定义为成员变量
    Integer i = 100;
            
    Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
        @Override
        public ObservableSource<? extends Integer> call() throws Exception {
            return Observable.just(i);
        }
    });
    
    i = 200;
    
    Observer observer = new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "================onNext " + integer);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    };
    
    observable.subscribe(observer);
    
    i = 300;
    
    observable.subscribe(observer);
    

    这个方法的作用就是直到被观察者被订阅后才会创建被观察者。因为 defer() 只有观察者订阅的时候才会创建新的被观察者,所以每订阅一次就会打印一次,并且都是打印 i 最新的值。所以打印出来结果,第一个值是200,第二个值是300

    1.6 timer()

    Observable.timer(2, TimeUnit.SECONDS)
    .subscribe(new Observer < Long > () {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(Long aLong) {
            Log.d(TAG, "===============onNext " + aLong);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    

    当到指定时间后就会发送一个 值给观察者。

    1.7 interval()

    Observable.interval(4, TimeUnit.SECONDS)
    .subscribe(new Observer < Long > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==============onSubscribe ");
        }
    
        @Override
        public void onNext(Long aLong) {
            Log.d(TAG, "==============onNext " + aLong);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    

    每隔一段时间就会发送一个事件,这个事件是从0开始,不断增1的数字。

    1.8 intervalRange()

    Observable.intervalRange(0, 5, 2, 1, TimeUnit.SECONDS)
    .subscribe(new Observer < Long > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==============onSubscribe ");
        }
    
        @Override
        public void onNext(Long aLong) {
            Log.d(TAG, "==============onNext " + aLong);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    

    这个的意思是,延迟两秒之后,从0开始,每间隔一秒发送一次,累计发送五次。

    1.9 range()

    Observable.range(2, 5)
    .subscribe(new Observer < Integer > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==============onSubscribe ");
        }
    
        @Override
        public void onNext(Integer aLong) {
            Log.d(TAG, "==============onNext " + aLong);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    

    同时发送一定范围的事件序列。

    1.10 rangeLong()
    作用与 range() 一样,只是数据类型为 Long

    1.11 empty() & never() & error()

    Observable.empty()
    .subscribe(new Observer < Object > () {
    
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "==================onSubscribe");
        }
    
        @Override
        public void onNext(Object o) {
            Log.d(TAG, "==================onNext");
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "==================onError " + e);
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "==================onComplete");
        }
    });
    

    empty() : 直接发送 onComplete() 事件
    05-26 14:06:11.881 15798-15798/com.example.rxjavademo D/chan: ==================onSubscribe
    ==================onComplete
    never():不发送任何事件
    05-26 14:12:17.554 16805-16805/com.example.rxjavademo D/chan: ==================onSubscribe
    error():发送 onError() 事件
    05-26 14:12:58.483 17817-17817/com.example.rxjavademo D/chan: ==================onSubscribe
    ==================onError java.lang.NullPointerException

    二 转换操作符

    2.1 map()

    Observable.just(1, 2, 3)
    .map(new Function < Integer, String > () {
        @Override
        public String apply(Integer integer) throws Exception {
            return "I'm " + integer;
        }
    })
    .subscribe(new Observer < String > () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "===================onSubscribe");
        }
    
        @Override
        public void onNext(String s) {
            Log.e(TAG, "===================onNext " + s);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    

    map 可以将被观察者发送的数据类型转变成其他的类型,上面的代码就是将整型转换成String类型

    2.2 flatMap()
    flatMap() 其实与 map() 类似,但是 flatMap() 返回的是一个 Observerable。现在用一个例子来说明 flatMap() 的用法。
    假设一个有一个 Person 类,这个类的定义如下:

    public class Person {
    
        private String name;
        private List<Plan> planList = new ArrayList<>();
    
        public Person(String name, List<Plan> planList) {
            this.name = name;
            this.planList = planList;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public List<Plan> getPlanList() {
            return planList;
        }
    
        public void setPlanList(List<Plan> planList) {
            this.planList = planList;
        }
    
    }
    

    Person 类有一个 name 和 planList 两个变量,分别代表的是人名和计划清单。

    public class Plan {
    
        private String time;
        private String content;
        private List<String> actionList = new ArrayList<>();
    
        public Plan(String time, String content) {
            this.time = time;
            this.content = content;
        }
    
        public String getTime() {
            return time;
        }
    
        public void setTime(String time) {
            this.time = time;
        }
    
        public String getContent() {
            return content;
        }
    
        public void setContent(String content) {
            this.content = content;
        }
    
        public List<String> getActionList() {
            return actionList;
        }
    
        public void setActionList(List<String> actionList) {
            this.actionList = actionList;
        }
    }
    

    现在有一个需求就是要将 Person 集合中的每个元素中的 Plan 的 action 打印出来。 首先用 map() 来实现这个需求看看:

    Observable.fromIterable(personList)
    .map(new Function < Person, List < Plan >> () {
        @Override
        public List < Plan > apply(Person person) throws Exception {
            return person.getPlanList();
        }
    })
    .subscribe(new Observer < List < Plan >> () {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(List < Plan > plans) {
            for (Plan plan: plans) {
                List < String > planActionList = plan.getActionList();
                for (String action: planActionList) {
                    Log.d(TAG, "==================action " + action);
                }
            }
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    

    可以看到 onNext() 用了嵌套 for 循环来实现,如果代码逻辑复杂起来的话,可能需要多重循环才可以实现。
    现在看下使用 flatMap() 实现:

    Observable.fromIterable(personList)
    .flatMap(new Function < Person, ObservableSource < Plan >> () {
        @Override
        public ObservableSource < Plan > apply(Person person) {
            return Observable.fromIterable(person.getPlanList());
        }
    })
    .flatMap(new Function < Plan, ObservableSource < String >> () {
        @Override
        public ObservableSource < String > apply(Plan plan) throws Exception {
            return Observable.fromIterable(plan.getActionList());
        }
    })
    .subscribe(new Observer < String > () {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(String s) {
            Log.d(TAG, "==================action: " + s);
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    

    2.3 concatMap()
    concatMap() 和 flatMap() 基本上是一样的,只不过 concatMap() 转发出来的事件是有序的,而 flatMap() 是无序的。
    还是使用上面 flatMap() 的例子来讲解,首先来试下 flatMap() 来验证发送的事件是否是无序的,代码如下:

      List<Person> people = new ArrayList<>();
    
    
                    Person personA = new Person();
                    personA.setName("小明");
                    Plan planA = new Plan();
                    planA.setName("小明学习");
                    Plan planB = new Plan();
                    planB.setName("小明写作");
                    List<Plan> planAs = new ArrayList<>();
                    planAs.add(planA);
                    planAs.add(planB);
                    personA.setPlans(planAs);
    
    
    
    
    
    
                    Person personB = new Person();
                    personB.setName("小赵");
                    Plan planA1 = new Plan();
                    planA1.setName("小赵学习");
                    Plan planB1 = new Plan();
                    planB1.setName("小赵写作");
                    List<Plan> planBs = new ArrayList<>();
                    planBs.add(planA1);
                    planBs.add(planB1);
                    personB.setPlans(planBs);
    
    
                    people.add(personA);
                    people.add(personB);
    
    
                    Observable.fromIterable(people)
                            .flatMap((Function<Person, ObservableSource<Plan>>) person -> {
                                if("小明".equals(person.getName())){
                                    return Observable.fromIterable(person.getPlans()).delay(2, TimeUnit.SECONDS);
                                }
                                return Observable.fromIterable(person.getPlans());
                            }).subscribe(plan -> Log.d("plan", plan.getName()));
    
    
    
    07-05 17:14:57.063 6793-6793/com.example.learnrxjava D/plan: 小赵学习
    07-05 17:14:57.063 6793-6793/com.example.learnrxjava D/plan: 小赵写作
    07-05 17:14:59.063 6793-6815/com.example.learnrxjava D/plan: 小明学习
    07-05 17:14:59.063 6793-6815/com.example.learnrxjava D/plan: 小明写作
    
    

    因为发送小明被延迟了两秒,所以是小赵在前面,小明在后面。
    现在来验证下 concatMap() 是否是有序的,使用上面同样的代码,只是把 flatMap() 换成 concatMap(),打印结果如下:

    07-05 17:16:15.518 6793-6849/com.example.learnrxjava D/plan: 小明学习
    07-05 17:16:15.518 6793-6849/com.example.learnrxjava D/plan: 小明写作
    07-05 17:16:15.519 6793-6849/com.example.learnrxjava D/plan: 小赵学习
    07-05 17:16:15.519 6793-6849/com.example.learnrxjava D/plan: 小赵写作
    

    即便是延迟,小明依旧在前面,这就代表 concatMap() 转换后发送的事件序列是有序的了。

    2.4 buffer()
    表示从需要发送的事件当中获取一定数量的事件,并将这些事件放到缓冲区当中一并发出。
    buffer 有两个参数,一个是 count,另一个 skip。count 缓冲区元素的数量,skip 就代表缓冲区满了之后,发送下一次事件序列的时候要跳过多少元素。先看代码:

    Observable.just(1, 2, 3, 4, 5)
    .buffer(2, 1)
    .subscribe(new Observer < List < Integer >> () {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(List < Integer > integers) {
            Log.d(TAG, "================缓冲区大小: " + integers.size());
            for (Integer i: integers) {
                Log.d(TAG, "================元素: " + i);
            }
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    
    
    
    05-21 14:09:34.015 22421-22421/com.example.rxjavademo D/chan: 
    ================缓冲区大小: 2
    ================元素: 1
    ================元素: 2
    ================缓冲区大小: 2
    ================元素: 2
    ================元素: 3
    ================缓冲区大小: 2
    ================元素: 3
    ================元素: 4
    ================缓冲区大小: 2
    ================元素: 4
    ================元素: 5
    ================缓冲区大小: 1
    ================元素: 5
    

    上面的意思是,每次缓冲两个元素,然后输出,下次跳过1个元素(也就是指针往后移动一个元素),再缓冲两个元素,直到五个元素全部输出完。

    2.5 groupBy()
    将要发送的数据进行分组,call返回的是组名。

    Observable.range(0, 10).groupBy(new Func1<Integer, Integer>() {
                @Override
                public Integer call(Integer integer) {
                    return integer % 3;////分成0,1,2 三个小组
                }
            }).subscribe(new Observer<GroupedObservable<Integer, Integer>>() {
                @Override
                public void onCompleted() {
                    LogUtils.d("------>onCompleted()");
                }
     
                @Override
                public void onError(Throwable e) {
                    LogUtils.d("------>onError()" + e);
                }
     
                @Override
                public void onNext(final GroupedObservable<Integer, Integer> integerIntegerGroupedObservable) {
                    integerIntegerGroupedObservable.subscribe(new Observer<Integer>() {
                        @Override
                        public void onCompleted() {
                            LogUtils.d("------>inner onCompleted()");
                        }
     
                        @Override
                        public void onError(Throwable e) {
                            LogUtils.d("------>inner onError()" + e);
                        }
     
                        @Override
                        public void onNext(Integer integer) {
                            LogUtils.d("------>group:" + integerIntegerGroupedObservable.getKey() + "  value:" + integer);
                        }
                    });
                }
            });
    
    
    
    02-26 18:01:36.218 9796-9796/com.rxandroid.test1 D/----->: ------>group:0  value:0
    02-26 18:01:36.218 9796-9796/com.rxandroid.test1 D/----->: ------>group:1  value:1
    02-26 18:01:36.218 9796-9796/com.rxandroid.test1 D/----->: ------>group:2  value:2
    02-26 18:01:36.218 9796-9796/com.rxandroid.test1 D/----->: ------>group:0  value:3
    02-26 18:01:36.218 9796-9796/com.rxandroid.test1 D/----->: ------>group:1  value:4
    02-26 18:01:36.218 9796-9796/com.rxandroid.test1 D/----->: ------>group:2  value:5
    02-26 18:01:36.218 9796-9796/com.rxandroid.test1 D/----->: ------>group:0  value:6
    02-26 18:01:36.218 9796-9796/com.rxandroid.test1 D/----->: ------>group:1  value:7
    02-26 18:01:36.218 9796-9796/com.rxandroid.test1 D/----->: ------>group:2  value:8
    02-26 18:01:36.218 9796-9796/com.rxandroid.test1 D/----->: ------>group:0  value:9
    02-26 18:01:36.218 9796-9796/com.rxandroid.test1 D/----->: ------>inner onCompleted()
    02-26 18:01:36.218 9796-9796/com.rxandroid.test1 D/----->: ------>inner onCompleted()
    02-26 18:01:36.218 9796-9796/com.rxandroid.test1 D/----->: ------>inner onCompleted()
    02-26 18:01:36.218 9796-9796/com.rxandroid.test1 D/----->: ------>onCompleted()
    
    

    从结果来看,也就是第0组,值为0,第1组,值为1,第二组,值为2。然后又是第0组,值为3,以此类推。

    2.6 scan()
    将数据以一定的逻辑聚合起来。

    Observable.just(1, 2, 3, 4, 5)
    .scan(new BiFunction < Integer, Integer, Integer > () {
        @Override
        public Integer apply(Integer integer, Integer integer2) throws Exception {
            Log.d(TAG, "====================apply ");
            Log.d(TAG, "====================integer " + integer);
            Log.d(TAG, "====================integer2 " + integer2);
            return integer + integer2;
        }
    })
    .subscribe(new Consumer < Integer > () {
        @Override
        public void accept(Integer integer) throws Exception {
            Log.d(TAG, "====================accept " + integer);
        }
    });
    
    05-26 14:45:27.784 22519-22519/com.example.rxjavademo D/chan: ====================accept 1
    ====================apply 
    ====================integer 1
    ====================integer2 2
    ====================accept 3
    ====================apply 
    05-26 14:45:27.785 22519-22519/com.example.rxjavademo D/chan: ====================integer 3
    ====================integer2 3
    ====================accept 6
    ====================apply 
    ====================integer 6
    ====================integer2 4
    ====================accept 10
    ====================apply 
    ====================integer 10
    ====================integer2 5
    ====================accept 15
    

    上面的意思是,apply的第一个参数是1,第二个参数是2,1+2=3,然后这个3作为第二次的第一个参数,也就是3+3 = 6。以此类推。

    2.7 window()
    发送指定数量的事件时,就将这些事件分为一组。window 中的 count 的参数就是代表指定的数量,例如将 count 指定为2,那么每发2个数据就会将这2个数据分成一组。

    Observable.just(1, 2, 3, 4, 5)
    .window(2)
    .subscribe(new Observer < Observable < Integer >> () {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "=====================onSubscribe ");
        }
    
        @Override
        public void onNext(Observable < Integer > integerObservable) {
            integerObservable.subscribe(new Observer < Integer > () {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "=====================integerObservable onSubscribe ");
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "=====================integerObservable onNext " + integer);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "=====================integerObservable onError ");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "=====================integerObservable onComplete ");
                }
            });
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "=====================onError ");
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "=====================onComplete ");
        }
    });
    
    =====================onSubscribe 
    =====================integerObservable onSubscribe 
    =====================integerObservable onNext 1
    =====================integerObservable onNext 2
    =====================integerObservable onComplete 
    =====================integerObservable onSubscribe 
    =====================integerObservable onNext 3
    =====================integerObservable onNext 4
    =====================integerObservable onComplete 
    =====================integerObservable onSubscribe 
    =====================integerObservable onNext 5
    =====================integerObservable onComplete 
    =====================onComplete 
    

    上面的意思是,发送一个1,2,然后走结束的onComplete回调。这个1,2当做一个完整的事件。然后再发送3,4.然后再走结束的回调,3,4又成一组。依次类推。

    因为RX操作符太多,这次先介绍这么多,下一篇将继续介绍操作符。

    相关文章

      网友评论

        本文标题:Rxjava2的操作符 一

        本文链接:https://www.haomeiwen.com/subject/achrhctx.html