美文网首页
RxJava基础-创建操作符

RxJava基础-创建操作符

作者: 六_六 | 来源:发表于2018-04-27 16:22 被阅读0次

    前言:上篇文章我们讲解了RxJava最基本的基础知识原理,这篇呢我打算讲解下怎么来创建一个observable被观察者,各个api的使用情况。

    一.创建操作符

    1.基本创建:create()

    RxJava中最基本的创建被观察者(Observable)的操作符

    • 具体使用

           /**
           * 通过Observable.create()来创建一个被观察者
           */
          Observable observable=Observable.create(new ObservableOnSubscribe<String>() {
              /**
               * 在复写的subscribe方法中定义要发送的事件
               * @param e
               * @throws Exception
               */
              @Override
              public void subscribe(ObservableEmitter<String> e) throws Exception {
                  //在这个方法里面开始向外发射事件
                  e.onNext("hello");
                  e.onNext("RxJava");
                  e.onComplete();
              }
          });        
          
          这样一个被观察者(ObServable)就被创建出来了,接下来我们用完整的链式将这个Observable订阅起来使用
          
           /**
           * 通过Observable.create()来创建一个被观察者
           */
          Observable.create(new ObservableOnSubscribe<String>() {
              /**
               * 在复写的subscribe方法中定义要发送的事件
               * @param e
               * @throws Exception
               */
              @Override
              public void subscribe(ObservableEmitter<String> e) throws Exception {
                  //在这个方法里面开始向外发射事件
                  e.onNext("hello");
                  e.onNext("RxJava");
                  e.onComplete();
              }
          }).subscribe(new Observer<String>() {
              @Override
              public void onSubscribe(Disposable d) {
                  Log.d(TAG, "开始采用subscribe连接");
              }
      
              @Override
              public void onNext(String s) {
                  Log.d(TAG,"接收结果:"+s);
              }
      
              @Override
              public void onError(Throwable e) {
                  Log.d(TAG, "对Error事件作出响应");
              }
      
              @Override
              public void onComplete() {
                  Log.d(TAG, "对Complete事件作出响应");
              }
          });
      
          打印结果如下:
          04-26 19:58:53.100 11189-11189/? D/MainTwoActivity: 开始采用subscribe连接
          04-26 19:58:53.100 11189-11189/? D/MainTwoActivity: 接收结果:hello
          04-26 19:58:53.100 11189-11189/? D/MainTwoActivity: 接收结果:RxJava
          04-26 19:58:53.100 11189-11189/? D/MainTwoActivity: 对Complete事件作出响应
      

    2.快速创建&发送事件

    2.1 just():
    • 快速创建一个被观察者对象(observable)

    • 发送事件的特点:直接发送 传入的事件

    • 最多只能发送10个参数

    • 具体使用:

           // 1. 创建时传入整型1、2、3、4
          // 在创建后就会发送这些对象,相当于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4)
          Observable.just(1, 2, 3,4)   
              // 至此,一个Observable对象创建完毕,以下步骤仅为展示一个完整demo,可以忽略
              // 2. 通过通过订阅(subscribe)连接观察者和被观察者
              // 3. 创建观察者 & 定义响应事件的行为
           .subscribe(new Observer<Integer>() {
              
              @Override
              public void onSubscribe(Disposable d) {
                  Log.d(TAG, "开始采用subscribe连接");
              }
              // 默认最先调用复写的 onSubscribe()
      
              @Override
              public void onNext(Integer value) {
                  Log.d(TAG, "接收到了事件"+ value  );
              }
      
              @Override
              public void onError(Throwable e) {
                  Log.d(TAG, "对Error事件作出响应");
              }
      
              @Override
              public void onComplete() {
                  Log.d(TAG, "对Complete事件作出响应");
              }
      
          });
      }
      
      打印结果:
      04-27 00:42:48.403 13505-13505/? D/MainTwoActivity: 开始采用subscribe连接
      04-27 00:42:48.403 13505-13505/? D/MainTwoActivity: 接收到了事件1
      04-27 00:42:48.403 13505-13505/? D/MainTwoActivity: 接收到了事件2
      04-27 00:42:48.403 13505-13505/? D/MainTwoActivity: 接收到了事件3
      04-27 00:42:48.403 13505-13505/? D/MainTwoActivity: 接收到了事件4
      04-27 00:42:48.403 13505-13505/? D/MainTwoActivity: 对Complete事件作出响应
      
    2.2 fromArray():
    • 快速创建1个被观察者对象(Observable)

    • 发送事件的特点:直接发送 传入的数组数据 (会将数组中的数据转换为Observable对象)

    • 没有限制个数

    • 具体demo使用如下:

          // 1. 创建时传入整型1、2、3、4
          // 在创建后就会发送这些对象,相当于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4)
          Observable.fromArray(1, 2, 3, 4,5,6,7,8,9,10,11,12,13)
                  // 至此,一个Observable对象创建完毕,以下步骤仅为展示一个完整demo,可以忽略
                  // 2. 通过通过订阅(subscribe)连接观察者和被观察者
                  // 3. 创建观察者 & 定义响应事件的行为
                  .subscribe(new Observer<Integer>() {
      
                      @Override
                      public void onSubscribe(Disposable d) {
                          Log.d(TAG, "开始采用subscribe连接");
                      }
                      // 默认最先调用复写的 onSubscribe()
      
                      @Override
                      public void onNext(Integer value) {
                          Log.d(TAG, "接收到了事件" + value);
                      }
      
                      @Override
                      public void onError(Throwable e) {
                          Log.d(TAG, "对Error事件作出响应");
                      }
      
                      @Override
                      public void onComplete() {
                          Log.d(TAG, "对Complete事件作出响应");
                      }
      
                  });
      
      打印结果如下:
      04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 开始采用subscribe连接
      04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件1
      04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件2
      04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件3
      04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件4
      04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件5
      04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件6
      04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件7
      04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件8
      04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件9
      04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件10
      04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件11
      04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件12
      04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 接收到了事件13
      04-27 00:48:59.468 13653-13653/? D/MainTwoActivity: 对Complete事件作出响应
      
    2.3 formIterable():
    • 快速创建1个被观察者对象(Observable)

    • 发送事件的特点:直接发送 传入的集合List数据

    • 没有限制个数

    • 具体demo使用如下:

         // 1. 设置一个集合
         List<Integer> list = new ArrayList<>();
         list.add(1);
         list.add(2);
         list.add(3);
         // 在创建后就会发送这些对象,相当于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4)
         Observable.fromIterable(list)
                 // 至此,一个Observable对象创建完毕,以下步骤仅为展示一个完整demo,可以忽略
                 // 2. 通过通过订阅(subscribe)连接观察者和被观察者
                 // 3. 创建观察者 & 定义响应事件的行为
                 .subscribe(new Observer<Integer>() {
      
                     @Override
                     public void onSubscribe(Disposable d) {
                         Log.d(TAG, "开始采用subscribe连接");
                     }
                     // 默认最先调用复写的 onSubscribe()
      
                     @Override
                     public void onNext(Integer value) {
                         Log.d(TAG, "接收到了事件" + value);
                     }
      
                     @Override
                     public void onError(Throwable e) {
                         Log.d(TAG, "对Error事件作出响应");
                     }
      
                     @Override
                     public void onComplete() {
                         Log.d(TAG, "对Complete事件作出响应");
                     }
      
                 });
      
       打印结果:
      04-27 00:53:14.695 13773-13773/? D/MainTwoActivity: 开始采用subscribe连接
      04-27 00:53:14.695 13773-13773/? D/MainTwoActivity: 接收到了事件1
      04-27 00:53:14.695 13773-13773/? D/MainTwoActivity: 接收到了事件2
      04-27 00:53:14.695 13773-13773/? D/MainTwoActivity: 接收到了事件3
      04-27 00:53:14.695 13773-13773/? D/MainTwoActivity: 对Complete事件作出响应
      

    3.延迟创建

    3.1 defer():
    • 直到有观察者(Observer )订阅时,才动态创建被观察者对象(Observable) & 发送事件

    • 通过 Observable工厂方法创建被观察者对象(Observable)

    • 每次订阅后,都会得到一个刚创建的最新的Observable对象,这可以确保Observable对象里的数据是最新的

    • 具体demo使用:

      <-- 1. 第1次对i赋值 ->>
          Integer i = 10;
      
          // 2. 通过defer 定义被观察者对象
          // 注:此时被观察者对象还没创建
          Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
              @Override
              public ObservableSource<? extends Integer> call() throws Exception {
                  return Observable.just(i);
              }
          });
      
          <-- 2. 第2次对i赋值 ->>
          i = 15;
      
          <-- 3. 观察者开始订阅 ->>
          // 注:此时,才会调用defer()创建被观察者对象(Observable)
          observable.subscribe(new Observer<Integer>() {
      
              @Override
              public void onSubscribe(Disposable d) {
                  Log.d(TAG, "开始采用subscribe连接");
              }
      
              @Override
              public void onNext(Integer value) {
                  Log.d(TAG, "接收到的整数是"+ value  );
              }
      
              @Override
              public void onError(Throwable e) {
                  Log.d(TAG, "对Error事件作出响应");
              }
      
              @Override
              public void onComplete() {
                  Log.d(TAG, "对Complete事件作出响应");
              }
          });
      
      打印结果:最后打印的值15:因为每次订阅后,都会得到一个刚创建的最新的Observable对象,这可以确保Observable对象里的数据是最新的,所以i值会取第2次的赋值
      
      04-27 01:00:55.319 13898-13898/lingan.test.com.myapplication D/MainTwoActivity: 开始采用subscribe连接
      04-27 01:00:55.319 13898-13898/lingan.test.com.myapplication D/MainTwoActivity: 接收到的整数是15
      04-27 01:00:55.319 13898-13898/lingan.test.com.myapplication D/MainTwoActivity: 对Complete事件作出响应
      
      
    3.2 timer():
    • 快速创建1个被观察者对象(Observable)

    • 发送事件的特点:延迟指定时间后,发送1个数值0(Long类型)

    • 本质 = 延迟指定时间后,调用一次 onNext(0) 延迟指定事件,发送一个0,一般用于检测

    • 具体demo使用:

      Observable.timer(10, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
              @Override
              public void onSubscribe(Disposable d) {
                  Log.d(TAG,"onSubscribe: ");
              }
      
              @Override
              public void onNext(Long aLong) {
                  Log.d(TAG,"onNext: "+aLong);
              }
      
              @Override
              public void onError(Throwable e) {
                  Log.d(TAG,"onError: ");
              }
      
              @Override
              public void onComplete() {
                  Log.d(TAG,"onComplete: ");
              }
          });
      
      打印结果:.645 19748-19763/lingan.test.com.myapplication D/MainActivity: onNext: 0
      

    3.3 interval():

    • 快速创建1个被观察者对象(Observable)

    • 发送事件的特点:每隔指定时间 就发送 事件

    • 发送的事件序列 = 从0开始、无限递增1的的整数序列

    • 具体demo使用测试:

      Observable.interval(10,5, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
              @Override
              public void onSubscribe(Disposable d) {
                  Log.d(TAG,"onSubscribe: ");
              }
      
              @Override
              public void onNext(Long aLong) {
                  Log.d(TAG,"onNext: "+aLong);
              }
      
              @Override
              public void onError(Throwable e) {
                  Log.d(TAG,"onError: ");
              }
      
              @Override
              public void onComplete() {
                  Log.d(TAG,"onComplete: ");
              }
          });
      
      打印结果:第一次间隔10秒开始发送一次事件,第一次事件值为0,然后每次间隔5秒发送一次事件,每次事件值增加1
      

    3.4 intervalRange():

    • 快速创建1个被观察者对象(Observable)

    • 发送事件的特点:每隔指定时间 就发送 事件,可指定发送的数据的数量

    • 发送的事件序列 = 从指定一个数开始、无限递增1的的整数序列,到指定的第二个参数大小停止

    • 作用类似于interval(),但可指定发送的数据的数量

    • 具体demo使用

      Observable.intervalRange(1,10,10,5, TimeUnit.SECONDS).subscribe(new Observer<Long>() {
              @Override
              public void onSubscribe(Disposable d) {
                  Log.d(TAG,"onSubscribe: ");
              }
      
              @Override
              public void onNext(Long aLong) {
                  Log.d(TAG,"onNext: "+aLong);
              }
      
              @Override
              public void onError(Throwable e) {
                  Log.d(TAG,"onError: ");
              }
      
              @Override
              public void onComplete() {
                  Log.d(TAG,"onComplete: ");
              }
          });
      
      打印结果:发送事件值1开始叠加,最大发送10次,第一次间隔10秒开始发送,第二次是每次间隔5秒开始
      04-14 02:48:30.422 20204-20219/lingan.test.com.myapplication D/MainActivity: onNext: 1
      04-14 02:48:35.421 20204-20219/lingan.test.com.myapplication D/MainActivity: onNext: 2
      04-14 02:48:40.421 20204-20219/lingan.test.com.myapplication D/MainActivity: onNext: 3
      04-14 02:48:45.421 20204-20219/lingan.test.com.myapplication D/MainActivity: onNext: 4
      04-14 02:48:50.422 20204-20219/lingan.test.com.myapplication D/MainActivity: onNext: 5
      04-14 02:48:55.422 20204-20219/lingan.test.com.myapplication D/MainActivity: onNext: 6
      04-14 02:49:00.421 20204-20219/lingan.test.com.myapplication D/MainActivity: onNext: 7
      04-14 02:49:05.422 20204-20219/lingan.test.com.myapplication D/MainActivity: onNext: 8
      04-14 02:49:10.421 20204-20219/lingan.test.com.myapplication D/MainActivity: onNext: 9
      04-14 02:49:15.421 20204-20219/lingan.test.com.myapplication D/MainActivity: onNext: 10
      04-14 02:49:15.422 20204-20219/lingan.test.com.myapplication D/MainActivity: onComplete:
      

    3.5 range():

    • 快速创建1个被观察者对象(Observable)

    • 发送事件的特点:连续发送 1个事件序列,可指定范围

    • 发送的事件序列 = 从指定一个数开始、无限递增1的的整数序列,到指定的第二个参数大小停止

    • 作用类似于intervalRange(),但区别在于:无延迟发送事件

    • 具体demo使用

      Observable.range(1,10).subscribe(new Observer<Integer>() {
              @Override
              public void onSubscribe(Disposable d) {
                  Log.d(TAG,"onSubscribe: ");
              }
      
              @Override
              public void onNext(Integer aLong) {
                  Log.d(TAG,"onNext: "+aLong);
              }
      
              @Override
              public void onError(Throwable e) {
                  Log.d(TAG,"onError: ");
              }
      
              @Override
              public void onComplete() {
                  Log.d(TAG,"onComplete: ");
              }
          }); 
          
      
      打印结果:没有延迟,快速打印出1到10的值。
      04-14 02:52:32.563 20305-20305/? D/MainActivity: onSubscribe: 
      04-14 02:52:32.564 20305-20305/? D/MainActivity: onNext: 1
      04-14 02:52:32.564 20305-20305/? D/MainActivity: onNext: 2
      04-14 02:52:32.564 20305-20305/? D/MainActivity: onNext: 3
      04-14 02:52:32.564 20305-20305/? D/MainActivity: onNext: 4
      04-14 02:52:32.564 20305-20305/? D/MainActivity: onNext: 5
      04-14 02:52:32.564 20305-20305/? D/MainActivity: onNext: 6
      04-14 02:52:32.564 20305-20305/? D/MainActivity: onNext: 7
      04-14 02:52:32.564 20305-20305/? D/MainActivity: onNext: 8
      04-14 02:52:32.564 20305-20305/? D/MainActivity: onNext: 9
      04-14 02:52:32.564 20305-20305/? D/MainActivity: onNext: 10
      04-14 02:52:32.564 20305-20305/? D/MainActivity: onComplete:
      

    3.6 rangeLong():

    • 作用:类似于range(),区别在于该方法支持数据类型 = Long
    • 与range()类似,此处不作过多描述至此.

    总结:基本常用的创建符就是这几种,非常简单,下篇文章会讲解下,RxJava的转换操作符。

    相关文章

      网友评论

          本文标题:RxJava基础-创建操作符

          本文链接:https://www.haomeiwen.com/subject/qffilftx.html