美文网首页Android拾萃RxJava系列教程半栈工程师
Android拾萃 - RxJava2之创建操作符及其demo

Android拾萃 - RxJava2之创建操作符及其demo

作者: 三也视界 | 来源:发表于2017-10-19 02:07 被阅读142次

    根据下图,再一次回顾一下前面几章的内容,经典的三步法等内容。
    Android拾萃 - RxJava操作符和响应类型(二) 的图片。

    RxJava 2.x的5种响应类型

    一、创建操作符列表

    名称 解析
    just() 将一个或多个对象转换成发射这个或这些对象的一个Observable
    fromArray() 将一个Iterable, 一个Future, 或者一个数组转换成一个Observable
    repeat() 创建一个重复发射指定数据或数据序列的Observable
    repeatWhen() 创建一个重复发射指定数据或数据序列的Observable,它依赖于另一个Observable发射的数据
    create() 使用一个函数从头创建一个Observable
    defer() 只有当订阅者订阅才创建Observable;为每个订阅创建一个新的Observable
    range() 创建一个发射指定范围的整数序列的Observable
    interval() 创建一个按照给定的时间间隔发射整数序列的Observable
    timer() 创建一个在给定的延时之后发射单个数据的Observable
    empty() 创建一个什么都不做直接通知完成的Observable
    error() 创建一个什么都不做直接通知错误的Observable
    never() 创建一个入不发射任何数据的Observable

    二、创建操作符例子

    Observable + 操作符 + subscribe + observer,就构成了一个完整的例子。
    由于只是做展示,observer基本可以公用,这里写成一个静态变量,详细见代码。而对于操作符,我们可以简单理解,就是对onexst onerror oncomplete 还有一些传入参数和方法的封装,起到简化代码的作用。

    just.png

    just 可以发射一个或者多个对象,会依次回调onNext,并最后自动调用onComplete(可以理解为对上述多个onNext和一个onComplete操作的封装)

     private static  Observer observer = new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
            }
    
            @Override
            public void onNext(@NonNull String s) {
                Log.i(TAG, s);
            }
    
            @Override
            public void onError(@NonNull Throwable e) {
                Log.i(TAG, "onError");
            }
    
            @Override
            public void onComplete() {
                Log.i(TAG, "onCompleted");
            }
        };
    
    
     //just 发射了三个对象,会依次回调onNext,并最后自动调用onComplete
            Observable.just("A","B","C").subscribe(observer);
    

    执行程序,console打印信息如下:

    10-19 01:02:20.758 20699-20699/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: A
    10-19 01:02:20.758 20699-20699/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: B
    10-19 01:02:20.758 20699-20699/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: C
    10-19 01:02:20.758 20699-20699/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: onCompleted
    
    defer.png

    defer 创建的Observable将推迟到subscribe订阅的时候(这个时候会调用call()方法进行发射数据),但是这个时候监听到的数据可能和创建的时候的数据不一致了,如图上的两种颜色,代表两种不同数据。

    //final 数组只是引用地址不变,值还是可以改变的
            final String[] str={"hi, rxjava"};
            Observable observable =Observable.defer(new Callable<ObservableSource<?>>() {
                @Override
                public ObservableSource<?> call() throws Exception {
                    return Observable.just(str[0]);
                }
            });
    
            str[0]="hi, i have changed";
    
            observable.subscribe(observer);
    

    我们看到打印信息里面 str[0]确实是改变了的

    10-19 01:11:57.428 29288-29288/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: hi, i have changed
    10-19 01:11:57.428 29288-29288/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: onCompleted
    

    empty()/error()/never()

     //直接调用onComplete
            Observable.empty().subscribe(observer);
            //直接回调onError
            Observable.error(new Throwable("some error")).subscribe(observer);
            //啥事不干
            Observable.never().subscribe(observer);
    
    10-19 01:13:56.228 31014-31014/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: onCompleted
    10-19 01:13:56.228 31014-31014/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: onError
    
    fromArray.png

    rxjava1 是from 而rxjava2 应该是fromArray,图上是不对的。fromArray 发射的是一个数组对象,但是却可以依次接收到数组的各个元素,其实just最终调用的也是fromArray

            Observable.fromArray(new String[]{"A","B","C"}).subscribe(observer);
    
    10-19 01:17:20.618 1368-1368/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: A
    10-19 01:17:20.618 1368-1368/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: B
    10-19 01:17:20.618 1368-1368/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: C
    10-19 01:17:20.628 1368-1368/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: onCompleted
    

    repeat() 默认是Long.MAX_VALUE,即无限重复,repeat(5)表示重复5次。
    repeat 和 retry 的操作符都有相对应的操作符,但是两类操作符是有区别的。
    .repeat()是接收到.onCompleted()事件后触发重订阅。而.retry()接收到.onError()事件后触发重订阅。

            Observable.just("A").repeat().subscribe(observer);
            Observable.just("A","B","C").repeat(5).subscribe(observer);
    
    10-19 01:20:21.148 4994-4994/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: A
    10-19 01:20:21.148 4994-4994/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: B
    10-19 01:20:21.148 4994-4994/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: C
    10-19 01:20:21.148 4994-4994/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: A
    10-19 01:20:21.148 4994-4994/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: B
    10-19 01:20:21.148 4994-4994/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: C
    10-19 01:20:21.148 4994-4994/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: A
    10-19 01:20:21.148 4994-4994/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: B
    10-19 01:20:21.148 4994-4994/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: C
    10-19 01:20:21.
    
    repeatWhen.f.png

    range(1, 5) 从1开始发射到5
    repeatWhen 需要传入一个方法或者条件,包装方法我们可以使用flatMap

    我们就来实现一个定时器例子
    使用timer(5秒执行一次),即5秒repeat一次

    timer.p.png

    怎么写出以下代码,基本我们知道需求和使用什么操作符,按照代码提示就可以写了,多敲敲代码才是王道~

            Observable.range(1, 5).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(@NonNull Observable<Object> objectObservable) throws Exception {
    
                    return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
                        @Override
                        public ObservableSource<?> apply(@NonNull Object o) {
    
                            return Observable.timer(5, TimeUnit.SECONDS);
                        }
                    });
                }
            }).subscribe(observer);
    //range发射的是int值的,前面observer是String泛型的,所以这里也需要更改
     private static  Observer observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
            }
    
            @Override
            public void onNext(@NonNull Integer s) {
                Log.i(TAG, s+"");
            }
    
            @Override
            public void onError(@NonNull Throwable e) {
                Log.i(TAG, "onError");
            }
    
            @Override
            public void onComplete() {
                Log.i(TAG, "onCompleted");
            }
        };
    
    10-19 01:24:30.928 9114-9114/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: 1
    10-19 01:24:30.928 9114-9114/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: 2
    10-19 01:24:30.928 9114-9114/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: 3
    10-19 01:24:30.928 9114-9114/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: 4
    10-19 01:24:30.928 9114-9114/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: 5
    
    10-19 01:24:35.948 9114-9332/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: 1
    10-19 01:24:35.948 9114-9332/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: 2
    10-19 01:24:35.948 9114-9332/com.example.philos.rxjavademo I/Cre
    
    ateOperateExampleActiivty: 3
    10-19 01:24:35.948 9114-9332/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: 4
    10-19 01:24:35.948 9114-9332/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: 5
    
    interval.png

    interval 每秒发射一次,并且递增.这里发射的是long类型的数据,所以observer也需要重写

        private static  Observer observer = new Observer<Long>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
            }
    
            @Override
            public void onNext(@NonNull Long s) {
                Log.i(TAG, s+"");
            }
    
            @Override
            public void onError(@NonNull Throwable e) {
                Log.i(TAG, "onError");
            }
    
            @Override
            public void onComplete() {
                Log.i(TAG, "onCompleted");
            }
        };
    
            Observable.interval(1, TimeUnit.SECONDS).subscribe(observer);
    
    10-19 01:29:57.838 11742-12139/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: 0
    10-19 01:29:58.838 11742-12139/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: 1
    10-19 01:29:59.838 11742-12139/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: 2
    10-19 01:30:00.838 11742-12139/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: 3
    10-19 01:30:01.838 11742-12139/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: 4                 
     beginning of /dev/log/system
    10-19 01:30:02.838 11742-12139/com.example.philos.rxjavademo I/CreateOperateExampleActiivty: 5
    
    

    相关文章

      网友评论

        本文标题:Android拾萃 - RxJava2之创建操作符及其demo

        本文链接:https://www.haomeiwen.com/subject/tvmyuxtx.html