RxJava(三)操作符的使用

作者: 泅渡者 | 来源:发表于2017-03-07 15:52 被阅读73次
一:创建操作符
1.create:建议你在传递给create方法的函数中检查观察者的isUnsubscribed状态,以便在没有观察者的时候,让你的Observable停止发射数据或者做昂贵的运算。
       Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                if (!e.isDisposed()){
                  e.onNext("1");
                  e.onNext("2");
                  e.onNext("3");
                }
                e.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                Log.d(TAG,s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
2.Defer :操作符会一直等待直到有观察者订阅它,然后它使用Observable工厂方法生成一个Observable。它对每个观察者都这样做,因此尽管每个订阅者都以为自己订阅的是同一个Observable,事实上每个订阅者获取的是它们自己的单独的数据序列。

在某些情况下,等待直到最后一分钟(就是知道订阅发生时)才生成Observable可以确保Observable包含最新的数据。


public class CourseThreeActivity extends AppCompatActivity {
    private final static  String TAG = "CourseThreeActivity";
    private int num = 10;
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_course_three);
        Observable<Integer> observable = Observable.just(num);
        num =20;
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG,integer+"");
            }
        };
        observable.subscribe(subscriber);
    }
}

运行结果如下

20244-20244/com.pse.rxandroid E/CourseThreeActivity: 10

可见当我们调用Just 时该Observerable 的数据已经固定。即使在下面做出更改也不会造成影响。
那我们再来看看defer

public class CourseThreeActivity extends AppCompatActivity {
    private final static  String TAG = "CourseThreeActivity";
    private int num = 10;
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_course_three);
        Observable<Integer> observable = Observable.defer(new Func0<Observable<Integer>>() {
            @Override
            public Observable<Integer> call() {
                return Observable.just(num);
            }
        });
        num =20;
        Subscriber<Integer> subscriber = new Subscriber<Integer>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG,integer+"");
            }
        };
        observable.subscribe(subscriber);
    }
}

运行结果如下

21658-21658/com.pse.rxandroid E/CourseThreeActivity: 20

现在想必大家能够理解defer 的作用了吧(等待订阅,获取最新值)。

3.From :将其它种类的对象和数据类型转换为Observable

使用方法如下:

   Integer[] items = { 0, 1, 2, 3, 4, 5 };
        Observable myObservable = Observable.from(items);

        myObservable.subscribe(
                new Action1<Integer>() {
                    @Override
                    public void call(Integer item) {
                        System.out.println(item);
                    }
                },
                new Action1<Throwable>() {
                    @Override
                    public void call(Throwable error) {
                        System.out.println("Error encountered: " + error.getMessage());
                    }
                },
                new Action0() {
                    @Override
                    public void call() {
                        System.out.println("Sequence complete");
                    }
                }
        );

结果大家可想而知,不过在这里有一个新的操作符Action0,其实看过源码就会知道Action可以有多个参数,方便我们来重写,处理自己的需求,这里我们只是模拟Observer创建了 onNext(),onError(),onComplete().

4.Range:创建一个发射特定整数序列的Observable

示例代码如下

  Observable.range(5,10).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.e(TAG,integer+"");
            }
        });

这里我们只是为了测试,所以只处理onNext()事件。
运行结果如下

03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 5
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 6
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 7
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 8
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 9
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 10
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 11
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 12
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 13
03-07 14:01:34.029 7672-7672/com.pse.rxandroid E/CourseThreeActivity: 14
5.Repeat :创建一个发射特定数据重复多次的Observable
  Observable<String> observable = Observable.just("hell Word");
        observable.repeat(4).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.e(TAG,s);
            }
        });

运行结果如下,会连续发射4四次Observerable(同一个)

03-07 14:13:11.723 17500-17500/com.pse.rxandroid E/CourseThreeActivity: hell Word
03-07 14:13:11.725 17500-17500/com.pse.rxandroid E/CourseThreeActivity: hell Word
03-07 14:13:11.725 17500-17500/com.pse.rxandroid E/CourseThreeActivity: hell Word
03-07 14:13:11.725 17500-17500/com.pse.rxandroid E/CourseThreeActivity: hell Word
6.Timer:创建一个Observerable,并在一定的延时后发射一个特定的值“0”

示例如下

        Observable<Long> observable = Observable.timer(2, TimeUnit.SECONDS);
        observable.subscribe(new Action1<Long>() {
            @Override
            public void call(Long aLong) {
                Log.e(TAG,aLong+"");
            }
        });

运行结果如下

26279-26302/com.pse.rxandroid E/CourseThreeActivity: 0

在日常中我们经常做倒计时操作,之前使用的是Handler 现在你可以尝试改用RX实现了。

7.MAP():最重要的变换操作符,对Observable发射的每一项数据应用一个函数,执行变换操作

大家可以试着理解下这个图:



上面这张图我可以举个例子,比如幼儿园举行活动,我们要给每个孩子穿上礼服,那么这个map()就是我们换衣服这个方法,所有的孩子经过map()后都会穿上华丽的礼服。这样的方式我们就会很快理解上图的意义。
示例如下

     Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                    Log.e(TAG,1+"");
                    subscriber.onNext(1);
                    subscriber.onCompleted();
            }
        }).map(new Func1<Integer, String>() {
            @Override
            public String call(Integer integer) {
                return integer+"||";
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.e(TAG,s);
            }
        });
03-07 14:38:08.358 7900-7900/? E/CourseThreeActivity: 1
03-07 14:38:08.358 7900-7900/? E/CourseThreeActivity: 1||

从上面看出来,我们发射的数据是整数类型,但经过Map()后就变为String类型,这个操作符默认不在任何特定的调度器上执行。(有序)

8.FlatMap():将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable,听着有点绕口,那么我们看下示例

我们先在页面建立以下代码

  Observable.just(1, 2, 3, 4, 5, 6, 7)
                .flatMap(new Func1<Integer, Observable<String>>() {
                    @Override
                    public Observable<String> call(Integer integer) {
                        return getObserverable(integer);
                    }
                }).subscribe(
                new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.e(TAG,s);
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        Log.e(TAG,throwable.getMessage());
                    }
                }
        );

    private Observable<String> getObserverable(final int id) {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("This is the" + id);
                subscriber.onCompleted();
            }
        });
    }

结果如下

om.pse.rxandroid E/CourseThreeActivity: This is the1 
om.pse.rxandroid E/CourseThreeActivity: This is the2 
om.pse.rxandroid E/CourseThreeActivity: This is the3 
om.pse.rxandroid E/CourseThreeActivity: This is the4 
om.pse.rxandroid E/CourseThreeActivity: This is the5 
om.pse.rxandroid E/CourseThreeActivity: This is the6 
om.pse.rxandroid E/CourseThreeActivity: This is the7 
9.Merge合并多个Observables的发射物

示例代码如下


Observable<Integer> odds = Observable.just(1, 3, 5).subscribeOn(someScheduler);
Observable<Integer> evens = Observable.just(2, 4, 6);

Observable.merge(odds, evens)
          .subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });

运行结果如下

Next: 1
Next: 3
Next: 5
Next: 2
Next: 4
Next: 6
Sequence complete.

以上是我们日常开发要用到的一些操作符,但是RxJava 可不止这些,如有需要可以去这里学习。
RxJava中文学习文档

相关文章

  • RxJava进阶一(创建类操作符)

    RxJava进阶一(创建类操作符)RxJava进阶二(转换类操作符)RxJava进阶三(过滤类操作符)RxJava...

  • RxJava进阶二(转换类操作符)

    RxJava进阶一(创建类操作符)RxJava进阶二(转换类操作符)RxJava进阶三(过滤类操作符)RxJava...

  • RxJava进阶三(过滤类操作符)

    RxJava进阶一(创建类操作符)RxJava进阶二(转换类操作符)RxJava进阶三(过滤类操作符)RxJava...

  • RxJava进阶四(组合类操作符)

    RxJava进阶一(创建类操作符)RxJava进阶二(转换类操作符)RxJava进阶三(过滤类操作符)RxJava...

  • RxJava

    其它文章 RxJava操作符大全 1、RxJava之一——一次性学会使用RxJava RxJava简单的使用和使用...

  • RxJava 操作符第二波

    RxJava操作符第二波啦,上篇RxJava 操作符第一波和本篇都只是简单介绍rxjava操作符的使用,有哪里写的...

  • Rxjava 操作符 2019-01-25

    title: Rxjava 操作符 参考:这是一份全面 & 详细 的RxJava操作符 使用攻略https://m...

  • RxJava操作符系列四

    RxJava操作符系列传送门 RxJava操作符源码RxJava操作符系列一RxJava操作符系列二RxJava操...

  • RxJava操作符系列五

    RxJava操作符系列传送门 RxJava操作符源码RxJava操作符系列一RxJava操作符系列二RxJava操...

  • RxJava操作符系列六

    RxJava操作符系列传送门 RxJava操作符源码RxJava操作符系列一RxJava操作符系列二RxJava操...

网友评论

    本文标题:RxJava(三)操作符的使用

    本文链接:https://www.haomeiwen.com/subject/scgygttx.html