美文网首页
RxJava入门基础

RxJava入门基础

作者: Android流浪者 | 来源:发表于2019-08-22 23:39 被阅读0次

    RecctiveX与RxJava
     在讲到RxJava之前我们首先要了解什么是ReactiveX,因为RxJava是ReactiveX的一种java实现。ReactiveX是Reactive Extensions的缩写,一般简写为Rx。微软给的定义是,Rx是一个函数库,让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序。开发者可以用Observables表示异步数据流,用LINQ操作符查询异部数据流,用Schedulers参数化异步数据流的并发处理,Rx可以这样定义:Rx=Observables+LINQ+Schedulers。
     说到异步操作,我们会想到Android的AsyncTask和Handler。但是随着请求数量越来越多,代码逻辑将会变得将越来越复杂而RxJava却依旧能保持清晰的逻辑。RxJava的原理就是创建一个Observable对象来干活,然后使用各种操作符建立起来的链式操作,就如同流水线一样,把你想要处理的数据一步步的加工成你想要的成品,然后发射给Subscriber处理。
     RxJava的异步操作是通过扩展观察者模式来实现的,RxJava有4个角色Observable、Observer、Subscriber和Suject,这4个角色后面会具体讲解。Observable和Observer通过subscribe方法实现订阅关系,Observable就可以在需要的时候通知Observer,官方文档

    RxJava基本实现

     在使用RxJava前请先在Android Studio中配置gradle:

    dependencies{
      //最新版本
       //implementation "io.reactivex.rxjava3:rxjava:3.x.y"
       implementation 'io.reactivex.rxjava:1.2.0'
       implementation 'io.reactivex.rxandroid:1.2.1'
    }
    

     其中RxAndroid是RxJava在Android平台的扩展。它包含了一些能够简化Android开发的工具,比如特殊的调度器。
     RxJava的基本用法为3个步骤。

     1.创建Observer(观察者)

     它决定实践触发的时候将有怎样的行为。代码如下:

    Subscriber subscriber = new  Subscriber<String>(){
          @Overrride
          public void onCompleted(){
             Log.d(TAG,"onCompleted");
          }
          @Overrride
          public void onError(Throwable e){
             Log.d(TAG,"onError");                                                                                                                                                     
          }
          @Overrride
          public void onNext(String s){
             Log.d(TAG,"onNext"+s);
          }
          @Overrride
          public void onStart(){
             Log.d(TAG,"onNext"+s);
          }
      }
    

     其中onCompleted、onError和onNext是必须要实现的方法,其含义如下。

    • onCompleted:时间队列完结。RxJava不仅把每个事件单独处理,其还会把它们看作一个队列。当不会再有新的onNext发出时,需要触发onCompleted()方法作为完成标志。
    • onError:事件队列异常。在事件处理过程中出现异常时,onError()会被触发,同时队列自动终止,不允许再有事件发出。
    • onNext:普通的事件。将要处理的事件添加到事件队列中。
    • onStart:它会在事件还未发送之前被调用,它可以用于做一些准备工作。例如数据的清零或重置。这是一个可选的方法,默认情况下它的实现为空。
      当然,如果实现简单的功能,也可以用到Observer来创建观察者。Observer是一个接口,而上面用到的Subscriber是在Observer的基础上进行的扩展。在后文的Subscriber订阅过程中Observer也会先被转换为Subscriber来使用。用Observer创建观察者,如下所示:
    Observer<String> observer = new Observer<String> (){
        
          @Overrride
          public void onCompleted(){
             Log.d(TAG,"onCompleted");
          }
           @Overrride
          public void onError(Throwable e){
             Log.d(TAG,"onError");                                                                                                                                                     
          }
          @Overrride
          public void onNext(String s){
             Log.d(TAG,"onNext"+s);
          }
      }
    
     2.创建Observable(被观察者)

     它决定什么时候出发事以及怎样的事件。RxJava使用create方法来创建一个Observable,并为它定义事件触发规则,如下所示:

    Observable observable = Observable.create(new Observable.OnSubscribe<String>(){
        @Override
        public void call(Subsccribe<? super String> subscribe){
            subscribe.onNext("杨影枫")
            subscribe.onNext("月眉儿")
            subscribe.onCompleted();
        }  
    })
    

     通过调用Subscriber的方法,不断地事件添加到任务队列中。也可用just方法来实现:

    Observable observable  = Observable.just("杨影枫","月眉儿");
    

     上述代码会依次调用onNext("杨影枫")、onNext("月眉儿")、onCompleted()。还可以用from方法来实现,如下所示:

    String[] words = {"杨影枫","月眉儿"};
    Observable observabvle  = Observable.form(words);
    

     上述代码调用的方法顺序和just方法是一样的。

     3.Subscribe(订阅)

     订阅只需要一行代码就可以了,如下所示:

    observable.subscribe(subscriber);
    

     运行代码查看Log:
     D/RxJava:onStart
     D/RxJava:onNext杨影枫
     D/RxJava:onNext月眉儿
     D/RxJava:onCompleted
     和预想的一样调用onStart方法,接着调用两个onNext方法,调用onCompleted方法。

    RxJava的不完整定义回调

     前面介绍了回调的接收主要依赖于subscribe(Observer)和subscribe(Subscriber)。此外。RxJava还提供了另一种回调方式,也就是不完整回调。在讲到不完整回调之前我们首先要了解Action。查看RxJava源码,发现一堆Action。

    public interface Action0 extends Action{
        void call();
    }
    

     再看Action1:

    public interface Action1<T> extends Action{
          void call(T  t);
    }
    

     再看Action9:

    public interface Action9<T1,T2,T3,T4,T5,T6,T7,T8,T9> extends Action{
          void call(T1  t1,T2  t2,T3  t3,T4  t4,T5  t5,T6  t6,T7  t7,T8  t8,T9  t9);
    }
    

     很明显,Aciton后的数字代表回调的参数类型数量,之前创建Observer和订阅代码也就可以改写为下面的代码:

    Action1<String> onNextAction = new Action1<String>(){
        @Override
        public void call(String s){
            Log.d(TAG,"onNext"+s);
          }
    };
    
    Action1<Throwable> onErrorAction = new Action1<Throwable>(){
        @Override
        public void call(Throwable  throwable){
            
          }
    };
    Action0 onCompletedAction = new Action0(){
        @Override
        public void call(){
            Log.d(TAG,"onCompleted");
          }
    };
    observable.subscride(onNextAction,onErrorAction,onCompletedAction);
    


    我们定义了onNextAction来处理 onNext的回调,同理,我们还定义了onErrorAction和onCompletedAction来分别处理onError和onCompleted的回调,最后我们把它们传给subscribe方法。很显然这样写的灵活更大一些;同时我们也可以只传一个或者两个Action,如下所示:

    observable.subscride(onNextAction);
    observable.subscribe(onNextAction,onErrorAction);
    

     第一行之定义了onNextAction来处理onNext的回调;而第二行则定义了onNextAction处onNext的回调,onErrorAction来处理onError的回调

    作者:张三儿 邮箱:921849651@qq.com
    注:欢迎大佬指点江山,希望大家多多支持作者。持续技术分享!

    相关文章

      网友评论

          本文标题:RxJava入门基础

          本文链接:https://www.haomeiwen.com/subject/odzosctx.html