美文网首页
RxJava学习笔记操作符学习(2)

RxJava学习笔记操作符学习(2)

作者: 西红柿的眼泪 | 来源:发表于2018-06-04 15:29 被阅读0次
    创建操作符
    • 作用

    创建 被观察者( Observable) 对象 & 发送事件

    • 常见类型
    常见类型.png
    • 应用场景 & 对应操作符介绍
    1. create()
      作用
      完整创建1个被观察者对象(Observable),RxJava 中创建被观察者对象最基本的操作符.在上次已经使用过了。
    2. just()
      作用
      快速创建1个被观察者对象(Observable)最多只能发送10个参数,在上次也已经使用过了。
      发送事件的特点:直接发送 传入的事件。
    3. fromArray()
      作用
      快速创建1个被观察者对象(Observable),会将数组中的数据转换为Observable对象,可以发送10个以上的参数。
      发送事件的特点:直接发送 传入的数组数据,数组元素遍历.
      实例
    public void fromArrayOperator(View view){
            Integer[] items = { 0, 1, 2, 3, 4 };
            Observable.fromArray(items).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.e("sss", "开始采用subscribe连接");
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.e("sss", "接收到了事件"+ integer);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e("sss", "对Error事件作出响应");
                }
    
                @Override
                public void onComplete() {
                    Log.e("sss", "对Complete事件作出响应");
                }
            });
      }
    
    1. fromIterable()
      作用
      快速创建1个被观察者对象(Observable)会将集合中的数据转换为Observable对象,发送10个以上事件(集合形式)
      发送事件的特点:直接发送 传入的集合List数据,集合元素遍历.
      实例
    public void fromIterableOperator(View view){
            List<String> list=new ArrayList<>();
            list.add("A");
            list.add("B");
            list.add("C");
    
            Observable.fromIterable(list).subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.e("sss", "开始采用subscribe连接");
                }
    
                @Override
                public void onNext(String s) {
                    Log.e("sss", "接收到了事件"+ s);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e("sss", "对Error事件作出响应");
                }
    
                @Override
                public void onComplete() {
                    Log.e("sss", "对Complete事件作出响应");
                }
            });
        }
    

    4.快速创建额外的操作符

    // 下列方法一般用于测试使用
    
    <-- empty()  -->
    // 该方法创建的被观察者对象发送事件的特点:仅发送Complete事件,直接通知完成
    // 即观察者接收后会直接调用onCompleted()
     public void emptyOperator(View view){
            Observable.empty()
                    .subscribe(new Observer<Object>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.e("sss", "开始采用subscribe连接");
                        }
    
                        @Override
                        public void onNext(Object o) {
                            Log.e("sss", "接收到了事件"+ o);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.e("sss", "对Error事件作出响应");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.e("sss", "对Complete事件作出响应");
                        }
                    });
        }
    <-- error()  -->
    // 该方法创建的被观察者对象发送事件的特点:仅发送Error事件,直接通知异常
    // 可自定义异常
    Observable observable2=Observable.error(new RuntimeException())
    // 即观察者接收后会直接调用onError()
    
    <-- never()  -->
    // 该方法创建的被观察者对象发送事件的特点:不发送任何事件
    Observable observable3=Observable.never();
    // 即观察者接收后什么都不调用
    

    5.defer()
    作用
    直到有观察者(Observer )订阅时,才动态创建被观察者对象(Observable) & 发送事件
    通过 Observable工厂方法创建被观察者对象(Observable),每次订阅后,都会得到一个刚创建的最新的Observable对象,这可以确保Observable对象里的数据是最新的
    实例

        Integer i=6;
        public void deferOperator(View view){
    
            Observable<Integer> observable=Observable.defer(new Callable<ObservableSource<Integer>>() {
                @Override
                public ObservableSource<Integer> call() throws Exception {
                    return Observable.just(i);
                }
            });
            i=8;
            observable.subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.e("sss", "开始采用subscribe连接");
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.e("sss", "接收到了事件"+ integer);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e("sss", "对Error事件作出响应");
                }
    
                @Override
                public void onComplete() {
                    Log.e("sss", "对Complete事件作出响应");
                }
            });
        }
    

    6.timer()
    作用
    快速创建1个被观察者对象(Observable),发送事件的特点:延迟指定时间后,发送1个数值0(Long类型)。
    本质 = 延迟指定时间后,调用一次 onNext(0),一般用于检测。
    实例

    public void timerOperator(View view){
            Observable.timer(2, TimeUnit.SECONDS)
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.e("sss", "开始采用subscribe连接");
                        }
    
                        @Override
                        public void onNext(Long aLong) {
                            Log.e("sss", "接收到了事件"+ aLong);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.e("sss", "对Error事件作出响应");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.e("sss", "对Complete事件作出响应");
                        }
                    });
        }
    

    7.interval()
    作用
    快速创建1个被观察者对象(Observable),发送的事件序列 = 从0开始、无限递增1的的整数序列
    发送事件的特点:每隔指定时间 就发送 事件
    实例

     public void intervalOperator(View view){
            Observable.interval(5,2,TimeUnit.SECONDS)
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.e("sss", "开始采用subscribe连接");
                        }
    
                        @Override
                        public void onNext(Long aLong) {
                            Log.e("sss", "接收到了事件"+ aLong);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.e("sss", "对Error事件作出响应");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.e("sss", "对Complete事件作出响应");
                        }
                    });
    
        }
    

    8.intervalRange()
    作用
    快速创建1个被观察者对象(Observable),发送的事件序列 = 从0开始、无限递增1的的整数序列,作用类似于interval(),但可指定发送的数据和数量.
    发送事件的特点:每隔指定时间 就发送 事件,可指定发送的数据的数量
    实例

    public void intervalRangeOperator(View view){
            // 参数1 = 事件序列起始点;
            // 参数2 = 事件数量;
            // 参数3 = 第1次事件延迟发送时间;
            // 参数4 = 每个间隔时间;
            // 参数5 = 时间单位
            Observable.intervalRange(6,10,5,2,TimeUnit.SECONDS)
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.e("sss", "开始采用subscribe连接");
                        }
    
                        @Override
                        public void onNext(Long aLong) {
                            Log.e("sss", "接收到了事件"+ aLong);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.e("sss", "对Error事件作出响应");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.e("sss", "对Complete事件作出响应");
                        }
                    });
        }
    

    9.range()
    作用
    快速创建1个被观察者对象(Observable)发送的事件序列 = 从0开始、无限递增1的的整数序列,作用类似于intervalRange(),但区别在于:无延迟发送事件
    发送事件的特点:连续发送 1个事件序列,可指定范围
    实例

     public void range(View view){
            // 参数1 事件序列起始点
            // 参数2 事件数量
            Observable.range(6,10).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.e("sss", "开始采用subscribe连接");
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.e("sss", "接收到了事件"+ integer);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e("sss", "对Error事件作出响应");
                }
    
                @Override
                public void onComplete() {
                    Log.e("sss", "对Complete事件作出响应");
                }
            });
    
        }
    

    10.rangeLong()
    作用:类似于range(),区别在于该方法支持数据类型 = Long
    实例

     public void rangeLong(View view){
            // 参数1 事件序列起始点
            // 参数2 事件数量
            Observable.rangeLong(6,10).subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.e("sss", "开始采用subscribe连接");
                }
    
                @Override
                public void onNext(Long along) {
                    Log.e("sss", "接收到了事件"+ along);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e("sss", "对Error事件作出响应");
                }
    
                @Override
                public void onComplete() {
                    Log.e("sss", "对Complete事件作出响应");
                }
            });
    
        }
    
    • 实际开发中的应用

    1.不断轮询配合Retrofit查询网络数据
    客户端不断的轮询查询网络数据

    public void requestPolling(View view){
    
            Observable.interval(2,1,TimeUnit.SECONDS)
                    .doOnNext(new Consumer<Long>() {
                        @Override
                        public void accept(Long aLong) throws Exception {
                            Log.e("sss","第"+aLong+"次轮询");
                            Retrofit retrofit=new Retrofit.Builder()
                                    .baseUrl("http://fy.iciba.com/")
                                    .addConverterFactory(GsonConverterFactory.create())
                                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                                    .build();
                            GetRequest_Interface request=retrofit.create(GetRequest_Interface.class);
    
                            Observable<Translation> observable=request.getCall();
    
                            observable.subscribeOn(Schedulers.io())
                                    .observeOn(AndroidSchedulers.mainThread())
                                    .subscribe(new Observer<Translation>() {
                                        @Override
                                        public void onSubscribe(Disposable d) {
    
                                        }
                                        @Override
                                        public void onNext(Translation translation) {
                                            Log.e("sss", translation.getContent().getOut());
                                        }
    
                                        @Override
                                        public void onError(Throwable e) {
                                            Log.d("sss", "请求失败");
                                        }
    
                                        @Override
                                        public void onComplete() {
    
                                        }
                                    });
                        }
                    }).subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Long aLong) {
    //                Log.e("sss", "接受到事件"+aLong);
    
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e("sss", "对Error事件作出响应");
                }
    
                @Override
                public void onComplete() {
                    Log.e("sss", "对Complete事件作出响应");
                }
            });
        }
    

    相关文章

      网友评论

          本文标题:RxJava学习笔记操作符学习(2)

          本文链接:https://www.haomeiwen.com/subject/ocpxxftx.html