美文网首页
浅谈Rxjava

浅谈Rxjava

作者: MrWheat | 来源:发表于2017-06-12 22:21 被阅读0次

    最近项目中用到了异步操作,感觉android自身的AsyncTask和handler有点繁琐且不够简洁,于是学习了一下Rxjava,并没有特别深入,但有了基本的了解

    一.简介

    RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。

    RxJava 的优势就是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁。其原理在于,RxJava 的实现,是一条从上到下的链式调用,没有任何嵌套,这在逻辑的简洁性上是具有优势的。当需求变得复杂时,这种优势将更加明显。

    二.基本概念

    Observable:发射源,英文释义“可观察的”,在观察者模式中称为“被观察者”或“可观察对象”;

    Observer:接收源,英文释义“观察者”,没错!就是观察者模式中的“观察者”,可接收Observable、Subject发射的数据;

    Subscriber:“订阅者”,也是接收源,那它跟Observer有什么区别呢?Subscriber实现了Observer接口,比Observer多了一个最重要的方法unsubscribe( ),用来取消订阅,当你不再想接收数据了,可以调用unsubscribe( )方法停止接收,Observer 在 subscribe() 过程中,最终也会被转换成 Subscriber 对象,一般情况下,建议使用Subscriber作为接收源;

    Subscription :Observable调用subscribe( )方法返回的对象,同样有unsubscribe( )方法,可以用来取消订阅事件;

    Action0:RxJava中的一个接口,它只有一个无参call()方法,且无返回值,同样还有Action1,Action2...Action9等,Action1封装了含有 1 个参的call()方法,即call(T t),Action2封装了含有 2 个参数的call方法,即call(T1 t1,T2 t2),以此类推;

    Func0:与Action0非常相似,也有call()方法,但是它是有返回值的,同样也有Func0、Func1...Func9;

    RxJava 的观察者模式大致如下图:

    image.png

    与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。

    • onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
    • onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
    • 在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

    三.基本用法

    • 创建 Observer
    Observer<String> observer = new Observer<String>() {
        @Override
        public void onNext(String s) {
            Log.d(tag, "Item: " + s);
        }
    
        @Override
        public void onCompleted() {
            Log.d(tag, "Completed!");
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(tag, "Error!");
        }
    };```
    
    - **创建 Observable**
    1.使用create( )
    

    Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
    subscriber.onNext("Hello");
    subscriber.onCompleted();
    }
    });```
    可以看到,这里传入了一个 OnSubscribe 对象作为参数。OnSubscribe 会被存储在返回的 Observable 对象中,它的作用相当于一个计划表,当 Observable 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Subscriber 将会被调用一次 onNext() 和一次 onCompleted())。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。

    2.使用just( ),将为你创建一个Observable并自动为你调用onNext( )发射数据:

    justObservable = Observable.just("just1","just2");//依次发送"just1"和"just2"```
      3.使用from( ),遍历集合,发送每个item:
    

    List<String> list = new ArrayList<>();
    list.add("from1");
    list.add("from2");
    list.add("from3");
    fromObservable = Observable.from(list); //遍历list 每次发送一个
    /** 注意,just()方法也可以传list,但是发送的是整个list对象,而from()发送的是list的一个item** /```

    • Subscribe (订阅)
    observable.subscribe(observer);
    

    Observer的onNext方法将会依次收到来自justObservable的数据,另外,如果你不在意数据是否接收完或者是否出现错误,即不需要Observer的onCompleted()和onError()方法,可使用Action1,subscribe()支持将Action1作为参数传入,RxJava将会调用它的call方法来接收数据,代码如下:

    justObservable.subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
    
              ...
         }});```
    - **线程控制 —— Scheduler **
    在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。
    
      RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:
    
      - Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
     - Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
     - Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
      - Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
      - Schedulers.mainThread(),它指定的操作将在 Android 主线程运行。
    
      有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。 subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。 observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。
    
      比如说:
    
    Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            
        }
    });```
    

    上面这段代码中,由于 subscribeOn(Schedulers.io()) 的指定,被创建的事件的内容 1、2、3、4 将会在 IO 线程发出;而由于observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber 数字的打印将发生在主线程 。事实上,这种在 subscribe() 之前写上两句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常见,它适用于多数的 『后台线程取数据,主线程显示』的程序策略。

    相关文章

      网友评论

          本文标题:浅谈Rxjava

          本文链接:https://www.haomeiwen.com/subject/iilrqxtx.html