美文网首页
Observable的创建操作符

Observable的创建操作符

作者: 小城哇哇 | 来源:发表于2022-11-16 14:33 被阅读0次

    1、just

    ● Observable.just(1, 2, 3, 4):最多10个参数
    ● 相当于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4)、onComplete()

            // 1. 创建时传入整型1、2、3、4
            // 在创建后就会发送这些对象,相当于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4)、onComplete()
            Observable.just(1, 2, 3, 4)
                    .subscribe(new Observer<Integer>() {
    
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "开始采用subscribe连接");
                        }
                        // 默认最先调用复写的 onSubscribe()
    
                        @Override
                        public void onNext(Integer value) {
                            Log.d(TAG, "接收到了事件" + value);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "对Error事件作出响应");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "对Complete事件作出响应");
                        }
    
                    });
    

    2、fromArray

    ● Observable.fromArray(items)
    ● 相当于执行了onNext(0)、onNext(1)、onNext(2)、onNext(3)、onComplete()

            Integer[] items = {0, 1, 2, 3};
            Observable.fromArray(items)
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "开始采用subscribe连接");
                        }
    
                        @Override
                        public void onNext(Integer value) {
                            Log.d(TAG, "接收到了事件" + value);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "对Error事件作出响应");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "对Complete事件作出响应");
                        }
    
                    });
    

    3、fromIterable

    ● 相当于执行了onNext(1)、onNext(2)、onNext(3)、onComplete()

            List<Integer> list = new ArrayList<>();
            list.add(1);
            list.add(2);
            list.add(3);
    
            // 2. 通过fromIterable()将集合中的对象 / 数据发送出去
            Observable.fromIterable(list)
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "集合遍历");
                        }
    
                        @Override
                        public void onNext(Integer value) {
                            Log.d(TAG, "集合中的数据元素 = " + value);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "对Error事件作出响应");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "遍历结束");
                        }
                    });
    

    4、range

    ● 连续发送 1个事件序列,可指定范围

            // 参数说明:
            // 参数1 = 事件序列起始点;
            // 参数2 = 事件数量;
            // 注:若设置为负数,则会抛出异常
            Observable.range(3, 10)
                    // 该例子发送的事件序列特点:从3开始发送,每次发送事件递增1,一共发送10个事件
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "开始采用subscribe连接");
                        }
                        // 默认最先调用复写的 onSubscribe()
    
                        @Override
                        public void onNext(Integer value) {
                            Log.d(TAG, "接收到了事件" + value);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "对Error事件作出响应");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "对Complete事件作出响应");
                        }
    
                    });
    

    5、rangeLong

    ● 类似range,不过事件类型是Long

            // 参数说明:
            // 参数1 = 事件序列起始点;
            // 参数2 = 事件数量;
            // 注:若设置为负数,则会抛出异常
            Observable.rangeLong(3, 10)
                    // 该例子发送的事件序列特点:从3开始发送,每次发送事件递增1,一共发送10个事件
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "开始采用subscribe连接");
                        }
                        // 默认最先调用复写的 onSubscribe()
    
                        @Override
                        public void onNext(Long value) {
                            Log.d(TAG, "接收到了事件" + value);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "对Error事件作出响应");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "对Complete事件作出响应");
                        }
    
                    });
    

    6、interval

    ● 每隔指定时间 就发送事件
    ● interval默认在computation调度器上执行
    ● 也可自定义指定线程调度器(第3个参数):interval(long,TimeUnit,Scheduler)

            // 参数说明:
            // 参数1 = 第1次延迟时间;
            // 参数2 = 间隔时间数字;
            // 参数3 = 时间单位;
            // 该例子发送的事件序列特点:延迟3s后发送事件,每隔1秒产生1个数字(从0开始递增1,无限个)
            Observable.interval(3,1, TimeUnit.SECONDS)
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "开始采用subscribe连接");
                        }
                        // 默认最先调用复写的 onSubscribe()
    
                        @Override
                        public void onNext(Long value) {
                            Log.d(TAG, "接收到了事件"+ value  );
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "对Error事件作出响应");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "对Complete事件作出响应");
                        }
                    });
    

    7、intervalRange

    ● 每隔指定时间就发送 事件,可指定发送的数据的数量

            // 参数说明:
            // 参数1 = 事件序列起始点;
            // 参数2 = 事件数量;
            // 参数3 = 第1次事件延迟发送时间;
            // 参数4 = 间隔时间数字;
            // 参数5 = 时间单位
            // 该例子发送的事件序列特点:
            // 1. 从3开始,一共发送10个事件;
            // 2. 第1次延迟2s发送,之后每隔2秒产生1个数字(从3开始递增,3 4 5 6 7 8 9 10 11 12)
            Observable.intervalRange(3, 10, 2, 1, TimeUnit.SECONDS)
    
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "开始采用subscribe连接");
                        }
                        @Override
                        public void onNext(Long value) {
                            Log.d(TAG, "接收到了事件" + value);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "对Error事件作出响应");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "对Complete事件作出响应");
                        }
                    });
    

    8、timer

    ● 延迟指定时间后,发送1个数值0(Long类型)

            Observable.timer(2, TimeUnit.SECONDS)
                    .subscribe(new Observer<Long>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "开始采用subscribe连接");
                        }
    
                        @Override
                        public void onNext(Long value) {
                            Log.d(TAG, "接收到了事件" + value);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "对Error事件作出响应");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "对Complete事件作出响应");
                        }
    
                    });
    

    timer操作符默认运行在一个新线程上,也可自定义线程调度器(第3个参数)timer(long,TimeUnit,Scheduler)

    9、empty、error、never

    ● empty():仅发送Complete事件,直接通知完成

            // 仅发送Complete事件,直接通知完成
            Observable.empty().subscribe(new Observer<Object>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "开始采用subscribe连接");
                }
    
                @Override
                public void onNext(Object value) {
                    Log.d(TAG, "接收到了事件" + value);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "对Error事件作出响应");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "对Complete事件作出响应");
                }
            });
    

    error:仅发送Error事件,直接通知异常

            // 仅发送onError事件,直接通知异常
            Observable.error(new RuntimeException()).subscribe(new Observer<Object>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "开始采用subscribe连接");
                }
    
                @Override
                public void onNext(Object value) {
                    Log.d(TAG, "接收到了事件" + value);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "对Error事件作出响应"+e.toString());
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "对Complete事件作出响应");
                }
            });
    

    never:不发送任何事件

            Observable.never().subscribe(new Observer<Object>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "开始采用subscribe连接");
                }
    
                @Override
                public void onNext(Object value) {
                    Log.d(TAG, "接收到了事件" + value);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "对Error事件作出响应"+e.toString());
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "对Complete事件作出响应");
                }
            });
    

    10、defer

    ● 直到有观察者(Observer)订阅时,才动态创建被观察者对象(Observable)+发送事件

    // 第1次赋值
            final Bundle bundle = new Bundle();
            bundle.putInt("key",1);
    
            // 2. 通过defer 定义被观察者对象 注:此时被观察者对象还没创建
            Observable<Integer> observable = Observable.defer(new Supplier<ObservableSource<? extends Integer>>() {
                @Override
                public ObservableSource<? extends Integer> get() throws Throwable {
                    return Observable.just(bundle.getInt("key"));
                }
            });
    
            // 2. 第2次赋值
            bundle.putInt("key",2);
    
            // 3. 观察者开始订阅,注:此时,才会调用defer()创建被观察者对象(Observable)
            observable.subscribe(new Observer<Integer>() {
    
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "开始采用subscribe连接");
                }
    
                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "接收到的整数是" + value);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "对Error事件作出响应");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "对Complete事件作出响应");
                }
            });
    

    运行结果(订阅时才创建,此时的值是2,不是1)

    D/MainActivity: 开始采用subscribe连接
    D/MainActivity: 接收到的整数是2
    D/MainActivity: 对Complete事件作出响应
    

    https://www.yuque.com/xiaomaolv-pb4aw/rtx9u3/ggb8hs

    相关文章

      网友评论

          本文标题:Observable的创建操作符

          本文链接:https://www.haomeiwen.com/subject/pqbyxdtx.html