RxJava入门(2):基础操作符

作者: tmyzh | 来源:发表于2018-01-30 19:52 被阅读14次

介绍一下RxJava创建时候的一些基础操作符。主要以代码和打印结果的方式介绍,简单明了,均在模拟器上实现过。

create()

最基本的操作符,创建一个被观察者对象

Observable<String> observable =Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("a");
                e.onNext("b");
                e.onNext("c");
                e.onComplete();
            }
        });
        Observer<String> observer =new Observer<String>() {


            @Override
            public void onSubscribe(Disposable d) {
                Log.e("yzh","onsubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.e("yzh","onNext--"+s);
            }

            @Override
            public void onError(Throwable t) {
                Log.e("yzh","onError--"+t.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e("yzh","onComplete");
            }
        };

        observable.subscribe(observer);

打印结果

01-30 06:26:45.095 2479-2479/? E/yzh: onsubscribe
01-30 06:26:45.095 2479-2479/? E/yzh: onNext--a
01-30 06:26:45.095 2479-2479/? E/yzh: onNext--b
01-30 06:26:45.096 2479-2479/? E/yzh: onNext--c
01-30 06:26:45.096 2479-2479/? E/yzh: onComplete
just()

快速创建一个被观察者对象,可以替代多个onNext()发送事件,最多只能发送10个参数

//等同于create()方式创建时onNext(1),onNext(2),onNext(3),onNext(4)
Observable.just(1,2,3,4)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e("yzh","onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.e("yzh","onNext--"+integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e("yzh","onError-"+e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.e("yzh","onComplete");
                    }
                });

打印结果

01-30 06:43:11.911 4395-4395/com.stone.appuser.rxtest E/yzh: onSubscribe
01-30 06:43:11.911 4395-4395/com.stone.appuser.rxtest E/yzh: onNext--1
01-30 06:43:11.911 4395-4395/com.stone.appuser.rxtest E/yzh: onNext--2
01-30 06:43:11.911 4395-4395/com.stone.appuser.rxtest E/yzh: onNext--3
01-30 06:43:11.911 4395-4395/com.stone.appuser.rxtest E/yzh: onNext--4
01-30 06:43:11.911 4395-4395/com.stone.appuser.rxtest E/yzh: onComplete
fromArray()

快速创建1个被观察者对象,直接发送 传入的数组数据,可以发送超过10个数据

 Integer[] items ={0,1,2,3};
        Observable.fromArray(items)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e("yzh","onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.e("yzh","onNext--"+integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e("yzh",e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.e("yzh","onComplete");
                    }
                });

打印结果

01-30 06:55:28.353 4879-4879/com.stone.appuser.rxtest E/yzh: onSubscribe
01-30 06:55:28.354 4879-4879/com.stone.appuser.rxtest E/yzh: onNext--0
01-30 06:55:28.354 4879-4879/com.stone.appuser.rxtest E/yzh: onNext--1
01-30 06:55:28.354 4879-4879/com.stone.appuser.rxtest E/yzh: onNext--2
01-30 06:55:28.354 4879-4879/com.stone.appuser.rxtest E/yzh: onNext--3
01-30 06:55:28.354 4879-4879/com.stone.appuser.rxtest E/yzh: onComplete
fromIterable

快速创建1个被观察者对象,直接发送 传入的集合List数据

 List<String> list =new ArrayList<>();
        list.add("A");
        list.add("B");
        list.add("C");
        Observable.fromIterable(list)
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e("yzh","onSubscribe");
                    }

                    @Override
                    public void onNext(String s) {
                        Log.e("yzh","onNext--"+s);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e("yzh",e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.e("yzh","onComplete");
                    }
                });

打印结果

01-30 07:11:57.670 5425-5425/com.stone.appuser.rxtest E/yzh: onSubscribe
01-30 07:11:57.670 5425-5425/com.stone.appuser.rxtest E/yzh: onNext--A
01-30 07:11:57.670 5425-5425/com.stone.appuser.rxtest E/yzh: onNext--B
01-30 07:11:57.670 5425-5425/com.stone.appuser.rxtest E/yzh: onNext--C
01-30 07:11:57.670 5425-5425/com.stone.appuser.rxtest E/yzh: onComplete
被观察者的几个特殊创建
//         除onSubscribe不会触发观察者的任何事件
//        Observable observable=Observable.never();
//          除onSubscribe仅触发观察者的onComplete
//        Observable observable=Observable.empty();
//          除onSubscribe仅触发观察者的onError
        Observable observable=Observable.error(new RuntimeException());
        Observer observer =new Observer() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("yzh","onSubscribe");
            }

            @Override
            public void onNext(Object o) {
                Log.e("yzh","onNext");
            }

            @Override
            public void onError(Throwable e) {

                Log.e("yzh","onError--"+e.toString());
            }

            @Override
            public void onComplete() {
                Log.e("yzh","onComplete");
            }
        };
defer()

相比较于just这些方式,defer创建会在订阅的时候创建,可以确保被观察者中的数据是最新的

   i=10;
//        Observable<Integer> observable =Observable.just(i);
//    此时被观察者对象还没创建
          Observable<Integer> observable=Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
            @Override
            public ObservableSource<? extends Integer> call() throws Exception {
                return Observable.just(i);
            }
        });
        Observer<Integer> observer=new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("yzh","onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e("yzh","onNext--"+integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.e("yzh","onError--"+e.toString());
            }

            @Override
            public void onComplete() {
                Log.e("yzh","onComlete");
            }
        };
        i=20;
    //订阅才会调用defer()创建被观察者对象(Observable)
        observable.subscribe(observer);

打印结果

01-30 07:44:05.985 7165-7165/com.stone.appuser.rxtest E/yzh: onSubscribe
01-30 07:44:05.985 7165-7165/com.stone.appuser.rxtest E/yzh: onNext--20
01-30 07:44:05.985 7165-7165/com.stone.appuser.rxtest E/yzh: onComlete

使用屏蔽的代码,即使用just方式时, 打印结果会变成onNext--10;、

timer()

快速创建一个被观察者,但是会延迟事件的发送,实验带一个just方法传入事件,但是观察者收不到,只能收到0,实际操作中可以在这里触发方法但是无法带入值

Observable.timer(2, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e("yzh","onSubscribe");
                    }

                    @Override
                    public void onNext(Long aLong) {
                        //接受到的值是0,可以放一个action()做一个操作无法带参数
                        Log.e("yzh","onNext--"+aLong);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e("yzh","onError--"+e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.e("yzh","onComplete");
                    }
                });

打印结果

01-30 07:54:32.650 7675-7675/com.stone.appuser.rxtest E/yzh: onSubscribe
01-30 07:54:34.650 7675-7695/com.stone.appuser.rxtest E/yzh: onNext--0
01-30 07:54:34.651 7675-7695/com.stone.appuser.rxtest E/yzh: onComplete
interval()

快速创建一个被观察者,指定事件发送事件并且间隔时间发送
从0开始、无限递增1的发送整数

//参数一开始发送事件时间
//参数二连续发送事件的时间间隔
//参数三时间单位
Observable.interval(3,1,TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e("yzh","onSubscribe");
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.e("yzh","onNext--"+aLong);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e("yzh","onError--"+e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.e("yzh","onComplete");
                    }
                });

打印结果,可以观察一下时间信息

01-30 08:28:25.580 8710-8710/com.stone.appuser.rxtest E/yzh: onSubscribe
01-30 08:28:28.580 8710-8735/com.stone.appuser.rxtest E/yzh: onNext--0
01-30 08:28:29.580 8710-8735/com.stone.appuser.rxtest E/yzh: onNext--1
01-30 08:28:30.581 8710-8735/com.stone.appuser.rxtest E/yzh: onNext--2
01-30 08:28:31.580 8710-8735/com.stone.appuser.rxtest E/yzh: onNext--3

会按一秒一条一直打印

intervalRange()

快速创建一个被观察,与interval的区别在与可以设置发送事件的起始值和发送事件次数

//参数一 发送事件的起始值
//参数二 发送事件的次数
 Observable.intervalRange(3,10,2,1,TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e("yzh","onSubscribe");
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.e("yzh","onNext--"+aLong);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e("yzh","onError--"+e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.e("yzh","onComplete");
                    }
                });

打印结果

01-30 08:47:51.658 9331-9331/com.stone.appuser.rxtest E/yzh: onSubscribe
01-30 08:47:53.659 9331-9370/com.stone.appuser.rxtest E/yzh: onNext--3
01-30 08:47:54.659 9331-9370/com.stone.appuser.rxtest E/yzh: onNext--4
01-30 08:47:55.659 9331-9370/com.stone.appuser.rxtest E/yzh: onNext--5
01-30 08:47:56.659 9331-9370/com.stone.appuser.rxtest E/yzh: onNext--6
01-30 08:47:57.659 9331-9370/com.stone.appuser.rxtest E/yzh: onNext--7
01-30 08:47:58.659 9331-9370/com.stone.appuser.rxtest E/yzh: onNext--8
01-30 08:47:59.659 9331-9370/com.stone.appuser.rxtest E/yzh: onNext--9
01-30 08:48:00.659 9331-9370/com.stone.appuser.rxtest E/yzh: onNext--10
01-30 08:48:01.659 9331-9370/com.stone.appuser.rxtest E/yzh: onNext--11
01-30 08:48:02.659 9331-9370/com.stone.appuser.rxtest E/yzh: onNext--12
01-30 08:48:02.659 9331-9370/com.stone.appuser.rxtest E/yzh: onComplete
range()

快速创建一个被观察者对象,连续发送一个区间类的事件

Observable.range(3,10)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e("yzh","onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.e("yzh","onNext--"+integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e("yzh","onError--"+e.toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.e("yzh","onComplete");
                    }
                });

打印结果

01-30 09:06:36.813 10204-10204/com.stone.appuser.rxtest E/yzh: onSubscribe
01-30 09:06:36.813 10204-10204/com.stone.appuser.rxtest E/yzh: onNext--3
01-30 09:06:36.813 10204-10204/com.stone.appuser.rxtest E/yzh: onNext--4
01-30 09:06:36.813 10204-10204/com.stone.appuser.rxtest E/yzh: onNext--5
01-30 09:06:36.813 10204-10204/com.stone.appuser.rxtest E/yzh: onNext--6
01-30 09:06:36.813 10204-10204/com.stone.appuser.rxtest E/yzh: onNext--7
01-30 09:06:36.813 10204-10204/com.stone.appuser.rxtest E/yzh: onNext--8
01-30 09:06:36.813 10204-10204/com.stone.appuser.rxtest E/yzh: onNext--9
01-30 09:06:36.813 10204-10204/com.stone.appuser.rxtest E/yzh: onNext--10
01-30 09:06:36.813 10204-10204/com.stone.appuser.rxtest E/yzh: onNext--11
01-30 09:06:36.813 10204-10204/com.stone.appuser.rxtest E/yzh: onNext--12
01-30 09:06:36.813 10204-10204/com.stone.appuser.rxtest E/yzh: onComplete

相关文章

网友评论

本文标题:RxJava入门(2):基础操作符

本文链接:https://www.haomeiwen.com/subject/oukczxtx.html