美文网首页
RxJava使用指南

RxJava使用指南

作者: 兰兰笑笑生 | 来源:发表于2016-09-11 00:13 被阅读368次

    前言

    RxJava的开发模式越来越受开发者欢迎,响应式的编程不仅可以减少代码量,还增加了代码的可读性,让开发维护再加简单。RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。

    一.什么是RxJava

    RxJava其实就是一个工具库,使用的就是通用形式的观察者模式。RxJava最核心的两个东西是Observables(被观察者,事件源)和Subscribers(观察者)。Observables发出一系列事件,Subscribers处理这些事件。这里的事件可以是任何你感兴趣的东西(触摸事件,web接口调用返回的数据。。。)一个Observable可以发出零个或者多个事件,知道结束或者出错。每发出一个事件,就会调用它的Subscriber的onNext方法,最后调用Subscriber.onComplete()或者Subscriber.onError()结束。
    Rxjava的看起来很想设计模式中的观察者模式,但是有一点明显不同,那就是如果一个Observerble没有任何的的Subscriber,那么这个Observable是不会发出任何事件的。

    二.为什么要用RxJava

    RxJava 优点主要是让代码简洁,增加代码的可读性,随着程序逻辑变得越来越复杂,它依然能够保持简洁。
    我们想你一下这么一个场景:我们要从外部存储器中加载一张图片,并把图片转化成Bitmap并显示到ImageView中,如果我们按照非RxJava的开发方式,我们就要先创建一个异步线程(Thread、AsyncTask等)用来执行耗时的图片加载任务,之后再在主线程新建一个Handler用来更新ImageView。大概的代码是这样实现的:

    new Thread() { 
      @Override 
      public void run() { 
        super.run(); 
        final Bitmap bitmap = getBitmapFromFile(file); 
        getActivity().runOnUiThread(new Runnable() { 
          @Override 
          public void run() { 
            imageCollectorView.addImage(bitmap); 
          } 
        }); 
      }
    }.start();
    

    而如果使用 RxJava ,实现方式是这样的:

    Observable.from(folders)
      .map(new Func1<File, Bitmap>() { 
        @Override 
        public Bitmap call(File file) { 
          return getBitmapFromFile(file);
       }
      })
     .subscribeOn(Schedulers.io())
     .observeOn(AndroidSchedulers.mainThread())
     .subscribe(new Action1<Bitmap>() {
        @Override 
        public void call(Bitmap bitmap) { 
          imageCollectorView.addImage(bitmap); 
        } 
      });
    

    虽然代码量并没有减少,但是用了RxJava之后代码的逻辑会很清晰, RxJava 的这个实现,是一条从上到下的链式调用,没有任何嵌套,这在逻辑的简洁性上是具有优势的。当需求变得复杂时,这种优势将更加明显。假设你要加载10张图片,并且只加载png格式的,加载完还要切割成圆角,在非RxJava的实现上,势必会添加10张图的循环嵌套,再增加一个isPNG()的判断函数,一个圆角切割函数toRoundBitmap(),势必会对代码逻辑一个大修改,而使用RxJava,我们只需要在链式调用上添加必要的处理函数就行了,添加代码如下:

    .flatMap(new Func1<File, Observable<File>>() { 
      @Override 
      public Observable<File> call(File file) { 
        return Observable.from(file.listFiles()); 
      } 
    }) 
    .filter(new Func1<File, Boolean>() { 
      @Override 
      public Boolean call(File file) { 
        return file.getName().endsWith(".png"); 
      } 
    })
    .map(new Func1<Bitmap, Bitmap>() { 
        @Override 
        public Bitmap call(Bitmap origin) { 
          return toRoundBitmap(origin);
       }
      })
    

    特别是如果你的 IDE 是 Android Studio ,其实每次打开某个 Java 文件的时候,你会看到被自动 Lambda 化的预览,这将让你更加清晰地看到程序逻辑。

    三.如何使用RxJava

    1.添加依赖库

    compile 'io.reactivex:rxjava:1.1.6'
    compile 'io.reactivex:rxandroid:1.2.1'
    RxAndroid 是RxJava在Android运行环境的补充库。

    1.创建 Observer
    Observer<String> observer = new Observer<String>() {    
      @Override    
      public void onNext(String s) {        
          Log.d(tag, "Item: " + s);    
      }    
      @Override    
      public void onCompleted() {        
          Log.d(tag, "Completed!");    
      }    
      @Override    
      public void onError(Throwable e) {        
          Log.d(tag, "Error!");    
      }
    };
    
    • onCompleted() : 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的
    • onNext() 发出时,需要触发 onCompleted() 方法作为标志。
    • onError() : 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。在一个正确运行的事件序列中, onCompleted() 和 onError()有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

    除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber。 Subscriber
    对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的。选择 Observer 和 Subscriber 是完全一样的。它们的区别对于使用者来说主要有两点:

    • onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法,具体可以在后面的文中看到。
    • unsubscribe(): 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed() 先判断一下状态。 unsubscribe() 这个方法很重要,因为在subscribe() 之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。
    2.创建 Observable

    Observable 即被观察者,它决定触发事件的逻辑,即什么时候调用onNext,什么时候调用onComplete。 RxJava 使用 create() 方法来创建一个 Observable ,并为它定义事件触发规则:

    Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
      @Override 
      public void call(Subscriber<? super String> subscriber) { 
          subscriber.onNext("Hello"); 
          subscriber.onNext("Hi"); 
          subscriber.onNext("Aloha"); 
          subscriber.onCompleted(); ]\
      }
    });
    

    RxJava 还提供了一些方法用来快捷创建事件队列,例如:
    just(T...): 将传入的参数依次发送出来。

    Observable observable = Observable.just("Hello", "Hi", "Aloha");
    // 将会依次调用:
    // onNext("Hello");
    // onNext("Hi");
    // onNext("Aloha");
    // onCompleted();
    

    from(T[]) / from(Iterable<? extends T>): 将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。

    String[] words = {"Hello", "Hi", "Aloha"};
    Observable observable = Observable.from(words);
    // 将会依次调用:
    // onNext("Hello");
    // onNext("Hi");
    // onNext("Aloha");
    // onCompleted();
    
    3. Subscribe (订阅)

    创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来,整条链子就可以工作了。代码形式很简单:

    observable.subscribe(observer);
    // 或者:observable.subscribe(subscriber);
    

    订阅之后 ,将会依次输出 "Item: Hello", "Item: Hi", "Item: Aloha" , "Completed!"。

    4.线程控制 Scheduler

    默认情况下,即在没有任务设置的时候,Observable和Observer都在同一线程中执行,即创建了这两个对象的线程,相当于设置了Schedulers.immediate()。如果想在异步线程中执行,就要设置成Schedulers.newThread()Schedulers.io() , 他们两个的区别是io()的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,而Schedulers.newThread() 总是启用新线程,并在新线程执行操作因此多数情况下io()newThread()更有效率。不要把计算工作放 io() 中,可以避免创建不必要的线程。

    • subscribeOn: 设置OnSubscribe运行的线程
    • observeOn: 设置Observer运行的线程
    5.常用函数解析
    • map : 是转化成另一个类型的输出
    • compose : 也是一个转化,传的是一个Transformer,可以转化为Obserables添加参数,这个是高级的转化,不像flatmap,如果Obserables传的是数组,那会一个一个传,而compose会直接传数组。
    • filter : 过滤
    • interval , timer : 定时执行任务
    • delay : 延时发送结果

    5.在什么场景下使用RxJava

    • 可以用Observable.concat(memory, disk, network).first(new Fun1(){}) 实现类似事件拦截机制。比如取数据先检查缓存的场景。注意此时的onCompleted只会调用一次。
    • 可以用Observable.merge(observable1, observable2) 实现需要等到多个接口并发取完数据,再更新的情况。注意此时的onCompleted只会调用一次。
    • flatmap 主要转化成另一个Obserables,可以用flatmap实现一个接口的请求依赖另一个API请求返回的数据的场景。比如我们从网络获取到的列表,还要从本地数据库中取得缓存的列表,对比后再刷新出来。利用flatmap还能创建出更多嵌套的代码,而保持代码的简洁性,还有比如我们从SD搜索图片并加载成Bitmap,就可以用flatmap对每个子目录再搜索。
    • 可以用filtermap执行一些复杂的数据变换。

    参考文章

    相关文章

      网友评论

          本文标题:RxJava使用指南

          本文链接:https://www.haomeiwen.com/subject/ivxuettx.html