美文网首页RxJava
RxJava<第九篇>:创建操作符

RxJava<第九篇>:创建操作符

作者: NoBugException | 来源:发表于2019-03-14 15:48 被阅读21次

    (1)create

        Observable.create();
        Flowable.create();
        Single.create();
        Completable.create();
        Maybe.create();
    

    用来创建被观察者, 5种被观察者都可以使用create操作符创建。

    (2)just

        Observable.just("1", "2", "3");
        Flowable.just("1", "2", "3");
        Single.just("1");
        Maybe.just("1");
    

    只有四种被观察者可以使用just,一旦使用just就必须发射一条数据。

    • Observable 可以发射0个或多个数据,标准写法如下

      Observable.just("1");
      Observable.just("1", "2");
      Observable.just("1", "2", "3");
      
    • Flowable可以发射0个或多个数据,标准写法如下

      Flowable.just("1");
      Flowable.just("1", "2");
      Flowable.just("1", "2", "3");
      
    • Single只能发射一条数据,标准写法如下

        Single.just("1");
      
    • Completable不能发射数据,由于just至少要有一条数据,所以Completable没有just操作符;

    • Maybe只能发射0条或1条数据,由于just至少要有一条数据,所以标准写法如下

      Maybe.just("1");
      

    另外,需要注意的是:

    just操作符最多可以设置10个参数。

    (3)from

    from只要是将其他类种的对象和数据类型转成Observable

    • fromPublisher 将Publisher转成Observable

        Observable.fromPublisher(new Publisher<String>() {
      
            @Override
            public void subscribe(Subscriber<? super String> s) {
                s.onNext("A");
            }
        }).subscribe(new Consumer<String>() {
      
            @Override
            public void accept(String o) throws Exception {
                System.out.println(o);
            }
        });
      

    Publisher将在后续介绍压背的时候重点提出。

    • fromArray 将数组转成Observable

        Observable.fromArray("A", "B", "C");
      

    看一下源码

    图片.png

    在这里被我圈出来的可变长度参数,也就是说参数的个数是可变的。

    • fromIterable 将集合转成Observable

        List<String> list = new ArrayList<>();
        Observable.fromIterable(list);
      

    fromIterable的参数是Iterable类型, Collection是Iterable的子接口,所以只要是最终实现Collection接口的集合都可以作为参数,以下的java集合框架图可以作为参考:

    图片.png
    • fromCallable 将Callable转成Observable

        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
      
        Callable callable = new Callable<String>() {
      
            @Override
            public String call() throws Exception {
                return "A";
            }
        };
      
        singleThreadExecutor.submit(callable);
      
        Observable.fromCallable(callable).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });
      
    • fromFuture 将Future转成Observable

        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
      
        Future future = singleThreadExecutor.submit(new Runnable() {
            @Override
            public void run() {
      
            }
        }, "A");
      
        Observable.fromFuture(future).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });
      

    注意:其他被观察者也可以使用from操作符

    图片.png 图片.png 图片.png 图片.png

    (4)range

    发射0~9之间的整数,左闭右开[0,9)

        Observable.range(0, 9).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(String.valueOf(integer));
            }
        });
    

    其他方式

        Observable.rangeLong();
        Flowable.range();
        Flowable.rangeLong();
    

    (5) defer

    defer只有在订阅的时候才会创建被观察者,以保证每次发射的数据是最新的。

        Observable.defer(new Callable<ObservableSource<String>>() {
            @Override
            public ObservableSource<String> call() throws Exception {
                return Observable.just("A", "B");
            }
        }).subscribe(new Consumer<String>() {
    
            @Override
            public void accept(String s) throws Exception {
                System.out.println(s);
            }
        });
    

    其他方式

        Flowable.defer();
        Single.defer();
        Completable.defer();
        Maybe.defer();
    

    (6)interval

    创建一个按固定时间间隔发射整数序列的Observable

    图片.png
        //延迟initialDelay秒后,按period秒定时时间间隔发射整数序列,调度器为computation
        Observable.interval(1000, 1000, TimeUnit.MILLISECONDS, Schedulers.computation());
    
        //延迟period秒后,按period秒定时时间间隔发射整数序列,调度器为computation
        Observable.interval(1000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                System.out.println(String.valueOf(aLong));
            }
        });
    

    有关Schedulers的讲解:

    • Schedulers.computation() :
      用于cpu密集型计算任务,即不会被被I/O等操作限制性能的耗时操作,例如XML,JSON文件的解析,Bitmap图片的压缩取样等,具有固定的线程池,大小为CPU核数,不可以用于IO操作,因为IO操作的等待时间会浪费cpu

    • Schedulers.from(@NonNull Excutor excutor):指定一个线程调度器,由此调度器来控制任务的执行策略。

    • Schedulers.io():用于IO密集型的操作,例如写磁盘操作,查询数据库,网络访问,具有线程缓存机制,在此调度器接收到任务之后,先检查线程缓存池中是否有空闲的线程可用,如果有,复用,如果没有则 创建新的线程,并将其加入到线程池中,如果每次都没有空闲的线程使用,可以无上限的创建线程。

    • Schedulers.newThread(): 在每执行一个任务时创建一个新的线程,不具有线程缓存机制,由于创建一个线程比起复用一个线程更加耗时耗力,虽然使用Schedulers.io()的地方都可以使用Schedulers.newThread(),但是总体上的Schedulers.newThread()的效率没有Schedulers.io()的高。

    • Schedulers.trampoline():在当前线程立即执行任务,如果当前线程有任务在执行,则会将其暂停下来,等插入进来的任务执行完成之后,再将未完成的任务接着执行。

    • Schedulers.single():拥有一个线程单例,所有的任务都在这一个线程中执行,当此线程中有任务在执行的时候其他任务将按照队列先进先出的顺序依次执行。

    • AndroidSchedulers.mainThread():在Andriod UI线程中执行任务,属于Android的专属定制。

    注意:在Rxjava 2.x版本中,废弃了1.x版本Schedulers.immediate(),在1.x中,Schedulers.immediate的作用是在当前线程中立即执行任务,功能等同于Rxjava中的2.x版本中的Schedulers.trampoline(),而在Schedulers.trampoline()在1.x版本的时候,作用是:当其他排队的任务执行完成之后,在当前线程排队开始执行接收到的任务,有点像2.x版本的Schedulers.single(),但是也不完全相同,因为Schedulers.single()不是在当前线程而是在一个线程单例中排队执行任务。

    其他方式

        Flowable.interval();
    

    (7)timer

    实现延迟执行

        Observable.timer(1000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                System.out.println(String.valueOf(aLong));
            }
        });
    
        Observable.timer(1000, TimeUnit.MILLISECONDS, Schedulers.computation());
    

    其他方式

        Flowable.timer();
        Single.timer();
        Completable.timer();
        Maybe.timer();
    

    (8)empty

    使直接完成发射数据,也就是直接执行了onComplete。

        Observable.empty().subscribe(new Observer<Object>() {
    
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("11111111111111");
            }
    
            @Override
            public void onNext(Object o) {
                System.out.println("2222222222222");
            }
    
            @Override
            public void onError(Throwable e) {
                System.out.println("33333333333333");
            }
    
            @Override
            public void onComplete() {
                System.out.println("4444444444444444");
            }
        });
    

    其他方式

        Flowable.empty();
        Maybe.empty();
    

    (9)error

    使直接发生异常结束发射数据,也就是直接执行了onError。

        Observable.error(new Throwable("nullpoint exception")).subscribe(new Observer<Object>() {
    
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("11111111111111");
            }
    
            @Override
            public void onNext(Object o) {
                System.out.println("2222222222222");
            }
    
            @Override
            public void onError(Throwable e) {
                System.out.println(e.getMessage());
            }
    
            @Override
            public void onComplete() {
                System.out.println("4444444444444444");
            }
        });
    

    其他方式

        Flowable.error();
        Single.error();
        Completable.error();
        Maybe.error();
    

    (10)never

    不发射数据,也不结束发射。

        Observable.never().subscribe(new Observer<Object>() {
    
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("11111111111111");
            }
    
            @Override
            public void onNext(Object o) {
                System.out.println("2222222222222");
            }
    
            @Override
            public void onError(Throwable e) {
                System.out.println(e.getMessage());
            }
    
            @Override
            public void onComplete() {
                System.out.println("4444444444444444");
            }
        });
    

    其他方式

        Flowable.never();
        Single.never();
        Completable.never();
        Maybe.never();
    

    相关文章

      网友评论

        本文标题:RxJava<第九篇>:创建操作符

        本文链接:https://www.haomeiwen.com/subject/pagfmqtx.html