美文网首页
Android RxJava之创建操作符(二)

Android RxJava之创建操作符(二)

作者: AR7_ | 来源:发表于2018-04-23 11:35 被阅读3次

    作用

    • 用于创建被观察者(Observable)对象和发送事件。

    类型

    create()

            
            //1. 通过create()创建被观察者对象
            Observable.create(new ObservableOnSubscribe<Integer>() {
                //2. 在复写的subscribe()里定义需要发送的事件
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    
                } //3. 至此一个观察者对象(Observable)就创建完毕
            }).subscribe(new Observer<Integer>() {
                //4. 以下步骤仅为展示一个完整的demo,可以忽略。
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "开始采用subscribe连接");
                }
    
                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "接收到了事件" + value);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "对Error事件作出响应");
    
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "对Complete事件作出响应");
                }
            });
    

    just()

            // 1. 创建时传入整型1、2、3、4
            //在创建后就会发送这些对象,相当于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4)
            Observable.just(1, 2, 3, 4)
                    // 2. 至此,一个Observable对象创建完毕,以下步骤仅为展示一个完整demo,可以忽略。
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "开始采用subscribe连接");
                        }
    
                        @Override
                        public void onNext(Integer value) {
                            Log.d(TAG, "接收到了事件" + value);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "对Error事件作出响应");
    
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "对Complete事件作出响应");
                        }
                    });
    

    fromArray()

            //方式一:发送事件
            
            //1. 设置需要传入的数组
            Integer[] items1 = {1, 2, 3, 4};
            //2. 创建被观察者对象(Observable)时传入数组
            //在创建后就会将该数组转换成Observable和发送该对象中的所有数据
            //3. 可发送10个以上参数
            //若直接传递一个List集合进去,会直接把List当做一个数据元素发送。
            Observable.fromArray(items1)
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "开始采用subscribe连接");
                        }
    
                        @Override
                        public void onNext(Integer value) {
                            Log.d(TAG, "接收到了事件" + value);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "对Error事件作出响应");
    
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "对Complete事件作出响应");
                        }
                    });
            //方式二:数组遍历
            
            //1. 设置需要传入的数组
            Integer[] items2 = {1, 2, 3, 4};
            //2. 创建被观察者对象(Observable)时传入数组
            //在创建后就会将该数组转换成Observable和发送该对象中的所有数据
            //3. 可发送10个以上参数
            //若直接传递一个List集合进去,会直接把List当做一个数据元素发送。
            Observable.fromArray(items2)
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "数组遍历");
                        }
    
                        @Override
                        public void onNext(Integer value) {
                            Log.d(TAG, "数组中的元素" + value);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "对Error事件作出响应");
    
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "遍历结束");
                        }
                    });
    
    image

    fromIterable()

            
            //方式一:快速发送集合
            
            //1. 设置一个集合
            List<Integer> list1 = new ArrayList<>();
            list1.add(1);
            list1.add(2);
            list1.add(3);
            //2. 通过fromIterable()将集合中的对象/数据发送出去。
            Observable.fromIterable(list1)
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "开始采用subscribe连接");
                        }
    
                        @Override
                        public void onNext(Integer value) {
                            Log.d(TAG, "接收到了事件" + value);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "对Error事件作出响应");
    
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "对Complete事件作出响应");
                        }
                    });
    
            //方式二:集合遍历
    
            //1. 设置一个集合
            List<Integer> list2 = new ArrayList<>();
            list2.add(1);
            list2.add(2);
            list2.add(3);
            //2. 通过fromIterable()将集合中的对象/数据发送出去。
            Observable.fromIterable(list2)
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "集合遍历");
                        }
    
                        @Override
                        public void onNext(Integer value) {
                            Log.d(TAG, "集合中的数据元素" + value);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "对Error事件作出响应");
    
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "遍历结束");
                        }
                    });
    
    image

    empty()

            //1. 一般用于测试使用
            //2. 该方法创建的被观察者对象发送事件的特点:仅发送Complete事件,直接通知完成
            //3. 即观察者接收后会直接调用onCompleted()方法
            Observable observable = Observable.empty();
    

    error()

            //1. 一般用于测试使用
            //2. 该方法创建的被观察者对象发送事件的特点:仅发送Error事件,直接通知异常
            //3. 即观察者接收后会直接调用onError()方法
            Observable observable = Observable.error(new RuntimeException());
    

    never()

            //1. 一般用于测试使用
            //2. 该方法创建的被观察者对象发送事件的特点:不发送任何事件
            //3. 即观察者接收后什么都不调用
            Observable observable = Observable.never();
    

    defer()

            // 1. 第1次对i赋值 
            Integer i = 10;
            // 通过defer 定义被观察者对象
            //注:此时的被观察者对象还没创建
             Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
                 @Override
                 public ObservableSource<? extends Integer> call() throws Exception {
                     return Observable.just(i);
                 }
             });
            // 2. 第2次对i赋值
            i = 15;
            
            // 3. 观察者开始订阅
            // 注,此时才会调用defer()创建被观察者对象(Observable)
            // 当观察者调订阅时,才创建Observable,并且针对每个观察者创建都是一个新的
            // Observable。以何种方式创建这个Observable对象,当满足回调条件后,就会进行
            // 相应的回调
            observable.subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "开始采用subscribe连接");
                }
    
                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "接收到的整数是" + value);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "对Error事件作出响应");
    
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "对Complete事件作出响应");
                }
            });
    

    timer()

            // 该例子 = 延迟2s后,发送一个long类型数值
            // timer操作符默认运行在一个新线程上
            // 也可以自定义线程调度器(第3个参数):timer(long,TimeUnit,Scheduler)
            Observable.timer(2, TimeUnit.SECONDS)
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "开始采用subscribe连接");
                        }
    
                        @Override
                        public void onNext(Long value) {
                            Log.d(TAG, "接收到了事件" + value);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "对Error事件作出响应");
    
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "对Complete事件作出响应");
                        }
                    });
    

    interval()

            // 参数说明
            // 参数1 = 第一次延迟时间
            // 参数2 = 间隔时间数字
            // 参数3 = 时间单位
            // 注:intervalm默认在computation调度器上执行
            //也可以自定义执行线程调度器(第3个参数):interval(long,TimeUnit,Scheduler)
            
            Observable.interval(3, 1, TimeUnit.SECONDS)
                    // 该例子发送到事件序列特点:延迟3后发送,每隔1产生1个数字(从0开始递增,无限个)
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "开始采用subscribe连接");
                        }
                        // 默认最先调用复写的 onSubscribe()
    
                        @Override
                        public void onNext(Long value) {
                            Log.d(TAG, "接收到了事件"+ value  );
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "对Error事件作出响应");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "对Complete事件作出响应");
                        }
                    });
    ![](https://img.haomeiwen.com/i944365/3db189b868dc2463.gif)
    
    

    intervalRange()

            /*
            * 参数1 = 事件序列起始点
            * 参数2 = 事件数量
            * 参数3 = 第一次事件延迟发送时间
            * 参数4 = 间隔时间数字
            * 参数5 = 时间单位
            * */
            Observable
                    .intervalRange(3, 10, 2, 1, TimeUnit.SECONDS)
                    /*
                    * 该例子发送到事件序列特点:
                    * 1. 从3开始,一共发送10个事件
                    * 2. 第一次延迟2s发送,之后每隔2s产生1个数字(从0开始递增1,无限个)
                    * */
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "开始采用subscribe连接");
                        }
                        // 默认最先调用复写的 onSubscribe()
    
                        @Override
                        public void onNext(Long value) {
                            Log.d(TAG, "接收到了事件"+ value  );
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "对Error事件作出响应");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "对Complete事件作出响应");
                        }
                    });
    

    range()

            /*
            * 参数1 = 事件序列起始点
            * 参数2 = 事件数量
            * 注:若设置为负数,则会怕抛出异常
            * */
            Observable
                    .range(3,10)
                    // 该例子发送的事件序列特点:从3开始发送,每次发送事件递增,一共发送10个事件
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            
                        }
    
                        @Override
                        public void onNext(Integer integer) {
    
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    

    rangeLong()

    • 类似range(),区别在于该方法支持数据类型Long

    相关文章

      网友评论

          本文标题:Android RxJava之创建操作符(二)

          本文链接:https://www.haomeiwen.com/subject/zfiklftx.html