美文网首页
RxJava2.x 常用操作符(1)——Cretae 创建操作

RxJava2.x 常用操作符(1)——Cretae 创建操作

作者: DoubleThunder | 来源:发表于2017-05-05 14:49 被阅读300次

    1.Create 创建操作

    create( ) — 使用一个函数从头创建一个 Observable。
    defer( ) — 只有当订阅者订阅才创建 Observable;为每个订阅创建一个新的 Observable。
    empty( ) — 创建一个什么都不做直接通知完成的 Observable。
    error( ) — 创建一个什么都不做直接通知错误的 Observable。
    from( ) — 将一个 Iterable, 一个 Future, 或者一个数组转换成一个 Observable。
    interval( ) — 创建一个按照给定的时间间隔发射整数序列的 Observable。
    just( ) — 将一个或多个对象转换成发射这个或这些对象的一个 Observable。
    range( ) — 创建一个发射指定范围的整数序列的 Observable。
    repeat( ) — 创建一个重复发射指定数据或数据序列的 Observable。
    repeatWhen( ) — 创建一个重复发射指定数据或数据序列的 Observable,它依赖于另一个 Observable 发射的数据。
    never( ) — 创建一个不发射任何数据的 Observable。
    timer( ) — 创建一个在给定的延时之后发射单个数据的 Observable。

    1.1 create

    使用一个函数从头创建一个 Observable。


    rxjava_createrxjava_create

    示例代码:

     Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
            for (int i = 1; i < 5; i++) {
                emitter.onNext(i+"");
            }
            emitter.onComplete();
        }
    }).subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
    
        }
    
        @Override
        public void onNext(@NonNull String s) {
            System.out.println("Next: " + s);
        }
    
        @Override
        public void onError(@NonNull Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }
    
        @Override
        public void onComplete() {
            System.out.println("Sequence complete.");
        }
    

    输出:

    Next: 1
    Next: 2
    Next: 3
    Next: 4
    Sequence complete.
    

    其他

    ObservableOnSubscribe, ObservableEmitter, Cancellable
    

    1.2 defer

    只有当订阅者订阅才创建 Observable;才会为每个订阅创建一个新的 Observable。


    deferdefer

    示例代码:

    Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
            @Override
            public ObservableSource<? extends String> call() throws Exception {
                return Observable.just("String");
            }
        });
        observable.subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                System.out.println(s);
            }
        });
    

    输出:

    String
    

    1.3 empty

    创建一个什么都不做直接通知完成的 Observable。


    emptyempty

    示例代码:

     Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
            for (int i = 1; i < 5; i++) {
                emitter.onNext(i+"");
            }
            emitter.onComplete();
        }
    }).subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
    
        }
    
        @Override
        public void onNext(@NonNull String s) {
            System.out.println("Next: " + s);
        }
    
        @Override
        public void onError(@NonNull Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }
    
        @Override
        public void onComplete() {
            System.out.println("Sequence complete.");
        }
    });
    

    输出结果:

    onComplete
    

    1.3 error

    创建一个什么都不做直接通知错误的 Observable。


    throwthrow

    示例代码:

     Observable<String> observable = Observable.error(new Callable<Throwable>() {
        @Override
        public Throwable call() throws Exception {
            return new NullPointerException();
        }
    });
    observable.subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
    
        }
    
        @Override
        public void onNext(@NonNull String s) {
            System.out.println("Next: " + s);
        }
    
        @Override
        public void onError(@NonNull Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }
    
        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    });
    

    输出结果:

    Error: null
    

    1.4 from()

    将一个 Iterable, 一个 Future, 或者一个数组转换成一个 Observable。


    fromfrom

    示例代码1:

    //1.遍历集合
    List<String> items = new ArrayList<>();
    for (int i = 0; i < 3; i++) {
        items.add(i + "");
    }
    Observable<String> observable = Observable.fromIterable(items);
    //Observable<String> observable = Observable.fromArray(new String[]{"Hello", "world"});
    observable.subscribe(new Consumer<String>() {
        @Override
        public void accept(@NonNull String s) throws Exception {
             System.out.println("result:" + s);
        }
    });
    

    输出结果:

    result:0
    result:1
    result:2
    

    1.5 interval()

    创建一个按照给定的时间间隔发射整数序列的 Observable。


    intervalinterval

    示例代码:

    final CompositeDisposable disposable = new CompositeDisposable();
    disposable.add(Observable.interval(1, TimeUnit.SECONDS).subscribeWith(new DisposableObserver<Long>() {
        @Override
        public void onNext(@NonNull Long aLong) {
            System.out.println("Next: " + aLong);
        }
    
        @Override
        public void onError(@NonNull Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }
    
        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    }));
    //5秒后取消订阅
    try {
        Thread.sleep(4000);
        //取消订阅
        disposable.dispose();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    

    输出结果:

    Next: 0
    Next: 1
    Next: 2
    Next: 3
    

    1.6 just()

    将一个或多个对象转换成发射这个或这些对象的一个 Observable。


    justjust

    示例代码:

    Observable.just(1, 2, 3).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
    
        }
    
        @Override
        public void onNext(@NonNull Integer s) {
            System.out.println("Next: " + s);
        }
    
        @Override
        public void onError(@NonNull Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }
    
        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    });
    

    输出结果:

    Next: 1
    Next: 2
    Next: 3
    onComplete
    

    1.7 range()

    创建一个发射指定范围的整数序列的 Observable。
    RxJava将这个操作符实现为 range 函数,它接受两个参数,一个是范围的起始值,一个是范围的数据的数目。如果你将第二个参数设为 0,将导致 Observable 不发射任何数据(如果设置为负数,会抛异常)


    rangerange

    示例代码:

    // 依次发射 10、11、12
    Observable.range(10, 2).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
    
        }
    
        @Override
        public void onNext(@NonNull Integer s) {
            System.out.println("Next: " + s);
        }
    
        @Override
        public void onError(@NonNull Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }
    
        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    });
    

    输出结果:

    Next: 10
    Next: 11
    Next: 12
    onComplete
    

    1.8 repeat()

    创建一个重复发射指定数据或数据序列的 Observable。


    repeatrepeat

    示例代码:

    //重复三次,repeat()就是无限次
    Observable.just("hello", "world").repeat(3).subscribe(new Observer<Object>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
    
        }
    
        @Override
        public void onNext(@NonNull Object o) {
            System.out.println("onNext:" + o.toString());
        }
    
        @Override
        public void onError(@NonNull Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }
    
        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    });
    

    输出结果:

    onNext:hello
    onNext:world
    onNext:hello
    onNext:world
    onNext:hello
    onNext:world
    onComplete
    

    1.9 repeatWhen()

    创建一个重复发射指定数据或数据序列的 Observable,它依赖于另一个 Observable 发射的数据。

    1.10 never()

    创建一个不发射任何数据的 Observable。


    nevernever

    (ps:不太懂有何意义)

    1.11 timer()

    创建一个在给定的延时之后发射单个数据的 Observable。
    在 RxJava 1.0.0 及其之后的版本,官方已不再提倡使用.timer() 操作符,因为.interval() 具有同样的功能。


    timertimer

    示例代码:

    Observable.timer(1, TimeUnit.SECONDS)
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(@NonNull Long aLong) throws Exception {
                System.out.println("result:" + aLong);
            }
        });
    

    输出结果:

    result:0
    

    相关文章

      网友评论

          本文标题: RxJava2.x 常用操作符(1)——Cretae 创建操作

          本文链接:https://www.haomeiwen.com/subject/xpxstxtx.html