Rxjava Obserable的创建操作符

作者: 666swb | 来源:发表于2017-03-16 10:53 被阅读475次

    创建操作符目录

    creating_observables.png

    由ReactiveX: http://reactivex.io/documentation/operators.html 支持。

    1: Create

    Create操作符: 使用OnSubscribe从头创建一个Observable。OnSubscribe会在订阅发生时,调用内部的call方法,实现调用onnext等方法。

    Observable.create(new Observable.OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    subscriber.onNext("hello Rxjava");
                }
            }).subscribe(new Subscriber<String>() {
                @Override
                public void onCompleted() {
                    Log.d(TAG, "onCompleted: ");
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "onError: ");
                }
    
                @Override
                public void onNext(String s) {
                    Log.d(TAG, "onNext: "+s);
                }
            });
    

    结果:

    03-15 16:35:18.191 27050-27050/com.nnews D/RxOperatorsTest: onNext: hello Rxjava
    
    

    Observable.create中传入OnSubscribe作为对象,当Observable.subscribe()订阅时,会调用OnSubscribe的回调方法Call, 然后可以在Call方法中通过参数subscriber观察者的onNext(), onError(), onCompleted()等方法执行,就传递到订阅的观察者回调方法中。

    看图:

    create.c.png

    create() 方法是 RxJava 最基本的创造事件序列的方法,

    2: Just

    just操作符:将指定类型的object创建为Observable对象,按照顺序发射给Observer,调用onNext(),onNext()...,最后onComplete()

    Observable.just("111","222").subscribe(new Action1<String>() {
                @Override
                public void call(String string) {
                    Log.d(TAG, "call: "+string+",Thead="+Thread.currentThread().getName());
                }
            });
    

    结果:

    03-15 16:27:47.644 18868-18868/com.nnews D/RxOperatorsTest: call: 111,Thead=main
    03-15 16:27:47.644 18868-18868/com.nnews D/RxOperatorsTest: call: 222,Thead=main
    
    just.c.png

    源码:

    public static <T> Observable<T> just(T t1, T t2) {
            return from((T[])new Object[] { t1, t2 });
        }
    

    可以看出,调用了from的方法,下面介绍from

    3:From

    From操作符:将传入的数组或 Iterable 拆分成具体对象Observable后,依次发射出来,调用onNext(), onNext() ..., 最后onComplete()

            String[] mArray = {"111","222"};
            Observable.from(mArray)
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            Log.d(TAG, "call: "+s+",Thead="+Thread.currentThread().getName());
                        }
                    });
    

    结果:

    03-15 16:48:28.389 10123-10123/com.nnews D/RxOperatorsTest: call: 111,Thead=main
    03-15 16:48:28.389 10123-10123/com.nnews D/RxOperatorsTest: call: 222,Thead=main
    

    源码:

    public static <T> Observable<T> from(T[] array) {
            int n = array.length;
            if (n == 0) {
                return empty();
            } else
            if (n == 1) {
                return just(array[0]);
            }
            return create(new OnSubscribeFromArray<T>(array));
        }
    

    可以看出,最后调用了create方法,到这里明白了,just,from操作符最终都是调用create操作符实现的!

    from.c.png

    4: Interval

    Interval操作符:创建一个按照一定时间间隔, 发射出来的Observable.

    场景:定时去获取推送接口的消息,提通知用户

    Observable.interval(2, TimeUnit.SECONDS)
                    .subscribe(new Action1<Long>() {
                        @Override
                        public void call(Long aLong) {
                            Log.d(TAG, "call: "+aLong+",Thead="+Thread.currentThread().getName());
                        }
                    });
    

    结果:

    03-15 17:05:51.383 28561-28621/com.nnews D/RxOperatorsTest: call: 0,Thead=RxComputationScheduler-1
    03-15 17:05:53.383 28561-28621/com.nnews D/RxOperatorsTest: call: 1,Thead=RxComputationScheduler-1
    03-15 17:05:55.383 28561-28621/com.nnews D/RxOperatorsTest: call: 2,Thead=RxComputationScheduler-1
    03-15 17:05:57.383 28561-28621/com.nnews D/RxOperatorsTest: call: 3,Thead=RxComputationScheduler-1
    03-15 17:05:59.383 28561-28621/com.nnews D/RxOperatorsTest: call: 4,Thead=RxComputationScheduler-1
    03-15 17:06:01.383 28561-28621/com.nnews D/RxOperatorsTest: call: 5,Thead=RxComputationScheduler-1
    

    可以看到,每隔2秒,打印一次,注意到不是工作在主线程了哦!

    interval相当于定时器,定时创建Obserable,然后发射, 调用onNext()...

    interval.c.png

    5:Range

    range操作符: 根据初始值n和数目m, 发射n,n+1,n+2...,n+m-1,范围的数值

    Android场景:获取数据的前20条展示等

    Sample:

    Observable.range(16,3)
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            Log.d(TAG, "call: "+integer+",Thead="+Thread.currentThread().getName());
                        }
                    });
    

    结果:

    03-15 17:20:38.929 13010-13010/com.nnews D/RxOperatorsTest: call: 16,Thead=main
    03-15 17:20:38.929 13010-13010/com.nnews D/RxOperatorsTest: call: 17,Thead=main
    03-15 17:20:38.929 13010-13010/com.nnews D/RxOperatorsTest: call: 18,Thead=main
    
    range.c.png

    6: Repeat

    repeat操作符:创建一个observable,然后重复发射n次

    Observable.just("1")
                    .repeat(5)
                    .subscribe(new Subscriber<String>() {
                        @Override
                        public void onCompleted() {
                            Log.d(TAG, "onCompleted: ");
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError: ");
                        }
    
                        @Override
                        public void onNext(String s) {
                            Log.d(TAG, "onNext: "+s);
                        }
                    });
    

    结果:

    03-15 17:26:32.157 19799-19799/com.nnews D/RxOperatorsTest: onNext: 1
    03-15 17:26:32.158 19799-19799/com.nnews D/RxOperatorsTest: onNext: 1
    03-15 17:26:32.158 19799-19799/com.nnews D/RxOperatorsTest: onNext: 1
    03-15 17:26:32.159 19799-19799/com.nnews D/RxOperatorsTest: onNext: 1
    03-15 17:26:32.159 19799-19799/com.nnews D/RxOperatorsTest: onNext: 1
    03-15 17:26:32.159 19799-19799/com.nnews D/RxOperatorsTest: onCompleted: 
    
    repeat.c.png

    7: Timer

    timer操作符既可以延迟执行一段逻辑,也可以间隔执行一段逻辑。

    场景: Android 实际中,app开启时,延时加载初始化数据,然后跳到新页面

    **Sample: **

    /***
             * 延时5秒,app启动时,加载图片,然后开启新页面
             */
            Observable.timer(5, TimeUnit.SECONDS)
                    .subscribe(new Action1<Long>() {
                        @Override
                        public void call(Long aLong) {
                            mActivity.startActivity(new Intent(mActivity, newActivity));
                        }
                    });
    

    第二个方法,还可以调度线程


    Paste_Image.png Paste_Image.png

    8.empty/Never/Throw

    Empty:创建一个不发射任何数据但是正常终止的Observable, 只调用onComplete方法

    Paste_Image.png

    Never:创建一个不发射数据也不终止的Observable, 不调用observer的方法

    Paste_Image.png

    Throw :创建一个不发射数据以一个错误终止的Observable,只调用onError方法

    Paste_Image.png

    总结:上述的创建操作符看源码会发现,最终都是创建Observale然后发射给observer,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。

    到这里常用的 “创建操作符” 就好了,有时看了还是人家的,多敲敲代码,看看Api,成为自己的,用到项目中,实现它的价值!

    相关文章

      网友评论

        本文标题:Rxjava Obserable的创建操作符

        本文链接:https://www.haomeiwen.com/subject/ogylnttx.html