美文网首页RxJava
RxJava<第九篇>:创建操作符

RxJava<第九篇>:创建操作符

作者: NoBugException | 来源:发表于2019-03-14 15:48 被阅读21次

(1)create

    Observable.create();
    Flowable.create();
    Single.create();
    Completable.create();
    Maybe.create();

用来创建被观察者, 5种被观察者都可以使用create操作符创建。

(2)just

    Observable.just("1", "2", "3");
    Flowable.just("1", "2", "3");
    Single.just("1");
    Maybe.just("1");

只有四种被观察者可以使用just,一旦使用just就必须发射一条数据。

  • Observable 可以发射0个或多个数据,标准写法如下

    Observable.just("1");
    Observable.just("1", "2");
    Observable.just("1", "2", "3");
    
  • Flowable可以发射0个或多个数据,标准写法如下

    Flowable.just("1");
    Flowable.just("1", "2");
    Flowable.just("1", "2", "3");
    
  • Single只能发射一条数据,标准写法如下

      Single.just("1");
    
  • Completable不能发射数据,由于just至少要有一条数据,所以Completable没有just操作符;

  • Maybe只能发射0条或1条数据,由于just至少要有一条数据,所以标准写法如下

    Maybe.just("1");
    

另外,需要注意的是:

just操作符最多可以设置10个参数。

(3)from

from只要是将其他类种的对象和数据类型转成Observable

  • fromPublisher 将Publisher转成Observable

      Observable.fromPublisher(new Publisher<String>() {
    
          @Override
          public void subscribe(Subscriber<? super String> s) {
              s.onNext("A");
          }
      }).subscribe(new Consumer<String>() {
    
          @Override
          public void accept(String o) throws Exception {
              System.out.println(o);
          }
      });
    

Publisher将在后续介绍压背的时候重点提出。

  • fromArray 将数组转成Observable

      Observable.fromArray("A", "B", "C");
    

看一下源码

图片.png

在这里被我圈出来的可变长度参数,也就是说参数的个数是可变的。

  • fromIterable 将集合转成Observable

      List<String> list = new ArrayList<>();
      Observable.fromIterable(list);
    

fromIterable的参数是Iterable类型, Collection是Iterable的子接口,所以只要是最终实现Collection接口的集合都可以作为参数,以下的java集合框架图可以作为参考:

图片.png
  • fromCallable 将Callable转成Observable

      ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    
      Callable callable = new Callable<String>() {
    
          @Override
          public String call() throws Exception {
              return "A";
          }
      };
    
      singleThreadExecutor.submit(callable);
    
      Observable.fromCallable(callable).subscribe(new Consumer<String>() {
          @Override
          public void accept(String s) throws Exception {
              System.out.println(s);
          }
      });
    
  • fromFuture 将Future转成Observable

      ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    
      Future future = singleThreadExecutor.submit(new Runnable() {
          @Override
          public void run() {
    
          }
      }, "A");
    
      Observable.fromFuture(future).subscribe(new Consumer<String>() {
          @Override
          public void accept(String s) throws Exception {
              System.out.println(s);
          }
      });
    

注意:其他被观察者也可以使用from操作符

图片.png 图片.png 图片.png 图片.png

(4)range

发射0~9之间的整数,左闭右开[0,9)

    Observable.range(0, 9).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println(String.valueOf(integer));
        }
    });

其他方式

    Observable.rangeLong();
    Flowable.range();
    Flowable.rangeLong();

(5) defer

defer只有在订阅的时候才会创建被观察者,以保证每次发射的数据是最新的。

    Observable.defer(new Callable<ObservableSource<String>>() {
        @Override
        public ObservableSource<String> call() throws Exception {
            return Observable.just("A", "B");
        }
    }).subscribe(new Consumer<String>() {

        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    });

其他方式

    Flowable.defer();
    Single.defer();
    Completable.defer();
    Maybe.defer();

(6)interval

创建一个按固定时间间隔发射整数序列的Observable

图片.png
    //延迟initialDelay秒后,按period秒定时时间间隔发射整数序列,调度器为computation
    Observable.interval(1000, 1000, TimeUnit.MILLISECONDS, Schedulers.computation());

    //延迟period秒后,按period秒定时时间间隔发射整数序列,调度器为computation
    Observable.interval(1000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long aLong) throws Exception {
            System.out.println(String.valueOf(aLong));
        }
    });

有关Schedulers的讲解:

  • Schedulers.computation() :
    用于cpu密集型计算任务,即不会被被I/O等操作限制性能的耗时操作,例如XML,JSON文件的解析,Bitmap图片的压缩取样等,具有固定的线程池,大小为CPU核数,不可以用于IO操作,因为IO操作的等待时间会浪费cpu

  • Schedulers.from(@NonNull Excutor excutor):指定一个线程调度器,由此调度器来控制任务的执行策略。

  • Schedulers.io():用于IO密集型的操作,例如写磁盘操作,查询数据库,网络访问,具有线程缓存机制,在此调度器接收到任务之后,先检查线程缓存池中是否有空闲的线程可用,如果有,复用,如果没有则 创建新的线程,并将其加入到线程池中,如果每次都没有空闲的线程使用,可以无上限的创建线程。

  • Schedulers.newThread(): 在每执行一个任务时创建一个新的线程,不具有线程缓存机制,由于创建一个线程比起复用一个线程更加耗时耗力,虽然使用Schedulers.io()的地方都可以使用Schedulers.newThread(),但是总体上的Schedulers.newThread()的效率没有Schedulers.io()的高。

  • Schedulers.trampoline():在当前线程立即执行任务,如果当前线程有任务在执行,则会将其暂停下来,等插入进来的任务执行完成之后,再将未完成的任务接着执行。

  • Schedulers.single():拥有一个线程单例,所有的任务都在这一个线程中执行,当此线程中有任务在执行的时候其他任务将按照队列先进先出的顺序依次执行。

  • AndroidSchedulers.mainThread():在Andriod UI线程中执行任务,属于Android的专属定制。

注意:在Rxjava 2.x版本中,废弃了1.x版本Schedulers.immediate(),在1.x中,Schedulers.immediate的作用是在当前线程中立即执行任务,功能等同于Rxjava中的2.x版本中的Schedulers.trampoline(),而在Schedulers.trampoline()在1.x版本的时候,作用是:当其他排队的任务执行完成之后,在当前线程排队开始执行接收到的任务,有点像2.x版本的Schedulers.single(),但是也不完全相同,因为Schedulers.single()不是在当前线程而是在一个线程单例中排队执行任务。

其他方式

    Flowable.interval();

(7)timer

实现延迟执行

    Observable.timer(1000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
        @Override
        public void accept(Long aLong) throws Exception {
            System.out.println(String.valueOf(aLong));
        }
    });

    Observable.timer(1000, TimeUnit.MILLISECONDS, Schedulers.computation());

其他方式

    Flowable.timer();
    Single.timer();
    Completable.timer();
    Maybe.timer();

(8)empty

使直接完成发射数据,也就是直接执行了onComplete。

    Observable.empty().subscribe(new Observer<Object>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("11111111111111");
        }

        @Override
        public void onNext(Object o) {
            System.out.println("2222222222222");
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("33333333333333");
        }

        @Override
        public void onComplete() {
            System.out.println("4444444444444444");
        }
    });

其他方式

    Flowable.empty();
    Maybe.empty();

(9)error

使直接发生异常结束发射数据,也就是直接执行了onError。

    Observable.error(new Throwable("nullpoint exception")).subscribe(new Observer<Object>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("11111111111111");
        }

        @Override
        public void onNext(Object o) {
            System.out.println("2222222222222");
        }

        @Override
        public void onError(Throwable e) {
            System.out.println(e.getMessage());
        }

        @Override
        public void onComplete() {
            System.out.println("4444444444444444");
        }
    });

其他方式

    Flowable.error();
    Single.error();
    Completable.error();
    Maybe.error();

(10)never

不发射数据,也不结束发射。

    Observable.never().subscribe(new Observer<Object>() {

        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("11111111111111");
        }

        @Override
        public void onNext(Object o) {
            System.out.println("2222222222222");
        }

        @Override
        public void onError(Throwable e) {
            System.out.println(e.getMessage());
        }

        @Override
        public void onComplete() {
            System.out.println("4444444444444444");
        }
    });

其他方式

    Flowable.never();
    Single.never();
    Completable.never();
    Maybe.never();

相关文章

  • RxJava<第九篇>:创建操作符

    (1)create 用来创建被观察者, 5种被观察者都可以使用create操作符创建。 (2)just 只有四种被...

  • MongoDB 条件操作符

    MongoDB中条件操作符有: (>) 大于 - $gt (<) 小于 - $lt (>=) 大于等于 - $gt...

  • RxJava操作符-->创建

    引言 该篇文章主要是关于RxJava的创建操作符使用的代码讲解。关于创建被观察者操作符共有以下几种: create...

  • shell-判断

    文件测试-操作符:-gt 大于 | -lt 小于 | -eq 等于 | -ne 不等于 | ...

  • MongoDB 条件操作符

    1.比较操作符: 基本语法对象的键值对为:{key:num}而使用条件操作符则为:{key:{$gt:num}} ...

  • 创建新分支

    git checkout -b gt210-pool gt210创建gt210-pool 基于 gt210分支 g...

  • Rxjava2-二、操作符

    Rxjava记录总结操作符:创建操作符、转换操作符、合并操作符、过滤操作符、其他操作符、条件操作符. 创建操作符 ...

  • MongoDB query notes

    基础 无条件查询: (多)条件查询 条件操作符 $lt $lte $gt $gte $ne $in 包含 $...

  • RxJava<第三篇>:do操作符

    基本代码如下: 执行效果如下: 基本说明 (1)doOnSubscribe:在被观察者和观察者产生关联的时候被调用...

  • RxJava<第十一篇>:变换操作符

    (1)map 从发射数据到接收数据之间的数据变换。 以上代码的意思是,发射的数据是Integer类型的, 将Int...

网友评论

    本文标题:RxJava<第九篇>:创建操作符

    本文链接:https://www.haomeiwen.com/subject/pagfmqtx.html