美文网首页
RxJava2.0学习笔记

RxJava2.0学习笔记

作者: MRYDM | 来源:发表于2017-09-07 17:15 被阅读0次

序言

由于我是先学习了1.0版本接着继续学习2.0,所以本文可能不太适合没有接触过RxJava的同学,所以可以先看一下,1.0的学习笔记,传送门 http://www.jianshu.com/p/a8cedc061ab1

首先要使用RxJava2,先要添加依赖:

compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

类介绍

与RxJava1相比:

  • 相同点:
    都有Observable,Observer,subscribe()
  • 不同点:
  1. 在RxJava2的Observable中重写的方法变为subscribe(ObservableEmitter<String> emitter),其中的ObservableEmitter: Emitter是发射器的意思,就是用来发出事件的。

  2. 在RxJava2的observer重写方法中新添加了一个方法onSubscribe(Disposable d),其中Disposable是一次性用品,用完就丢弃,对与这个参数可以理解为一个拦截器,将所有发送过来的数据拦截下俩,让observer不会收到。

  3. 在RxJava2中subscribe()重载方法的参数变了。

        public final Disposable subscribe() {}
        public final Disposable subscribe(Consumer<? super T> onNext) {}
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
        public final void subscribe(Observer<? super T> observer) {}
    

在实际项目中我们一般只关心onNext(),和onError(),所以我们一般只会重载两个参数的。

然后我们用代码来理解一下上面的相同点和不同点

Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            Log.e(TAG, "subscribe: 1" );
            e.onNext(1);
            Log.e(TAG, "subscribe: 2" );
            e.onNext(2);
            Log.e(TAG, "subscribe: 3" );
            e.onNext(3);
            e.onComplete();
        }
    });

    Observer<Integer> observer = new Observer<Integer>() {
        private Disposable disposable;

        @Override
        public void onSubscribe(Disposable d) {
            Log.e(TAG, "onSubscribe: " + d.isDisposed());
            disposable = d;
        }

        @Override
        public void onNext(Integer value) {
            Log.e(TAG, "onNext: " + value);
            if (value == 2) {
                disposable.dispose();
                Log.e(TAG, "onNext: " + disposable.isDisposed());
            }
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG, "onError: " + e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.e(TAG, "onComplete: ");
        }
    };

    observable.subscribe(observer);

运行结果如下所示:

com.example.cosima.rxjava2learn E/MainActivity: onSubscribe: false
com.example.cosima.rxjava2learn E/MainActivity: subscribe: 1
com.example.cosima.rxjava2learn E/MainActivity: onNext: 1
com.example.cosima.rxjava2learn E/MainActivity: subscribe: 2
com.example.cosima.rxjava2learn E/MainActivity: onNext: 2
com.example.cosima.rxjava2learn E/MainActivity: onNext: true
com.example.cosima.rxjava2learn E/MainActivity: subscribe: 3

可以看到在disposable为true的时候,Observable可以发送数据,但是在Observer没有接收到数据。

注:与RxJava1相同,当Observable发送onComplete之后,Observable在onComplete之后的数据可以发送,Observer在接收到onComplete之后不再继续接收事件。onError与onComplete的原理一样,但是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然。

创建方式

想对于RxJava1来说,Observable的创建方式心添加了几种方式:

  • fromIterable((Iterable<? extends T> list)方式
    遍历集合,发送每个item相当于多次调用onNext().

注Collection接口是Iterable接口的子接口,所以所有Collection接口的实现类都可以作为Iterable对象直接传入fromIterable()方法。

  • fromArray(T... items)方式
    遍历集合,发送每个item相当于多次调用onNext().

  • interval(long period, TimeUnit unit)方式
    创建一个按固定时间间隔发射整数序列的Observable,可用作定时器 period:时间间隔

  • interval(long initialDelay, long period, TimeUnit unit)方式
    initialDelay:开始值,默认为0。

  • range(final int start, final int count)方式
    创建一个发射特定整数序列的Observable,第一个参数为起始值,第二个为发送的个数,如果为0则不发送,负数则抛异常。

  • timer(long delay, TimeUnit unit)方式
    一个给定的延迟后发射一个特殊的值,即表示延迟2秒后,调用onNext()方法。

用于实现观察者模式方式有很多种:
<center>

</center>

现在我们来实现一个简单的倒计时功能:

封装Observable

private Observable<Integer> initEvent2(final int time) {
    return Observable.interval(0, 1, TimeUnit.SECONDS)
            .subscribeOn(AndroidSchedulers.mainThread())
            .observeOn(AndroidSchedulers.mainThread())
            .map(new Function<Long, Integer>() {
                @Override
                public Integer apply(Long aLong) throws Exception {
                    return time - aLong.intValue();
                }
            })
            .take(time + 1);//限制循环次数
}

使用如下:

initEvent2(5).doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            Log.e(TAG, "accept: 开始计时");
        }
    })
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.e(TAG, "accept: 当前计时" + integer);
                }
            });
打印结果为

在上述代码中我们使用到了map()操作符,接下来我们就一起来看下RxJava中的操作符

RxJava操作符

1.map()操作符:

把原来的Observable对象转换成另一个Observable对象
2.flatMap()操作符:


flatMap和Map的相同点就是把一个对象转化为另一个对象返回,但是不同的是flatMap()返回的是个Observable对象,并且这个对象并不是直接发送到了回调方法中,而是把这个对象激活,之后将他发送到回调方法中。
3.filter()操作符:


根据自己的需求加入判断逻辑,他的返回值是true或者是false,用于表示是否需要被过滤。
4.take()操作符:


再上面的代码中已经用到过,具体含义就是限制输出次数
5.doOnNext()操作符:


在输出前可以做一个额外的操作
6.delay()操作符:


延迟Observer的输出

线程控制Scheduler

该部分与RxJava1相比没有变换,在平常使用中较多的都是以下几个:

  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
  • Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

都是通过subscribeOn()observeOn() 两个方法来对线程进行控制。
具体可见RxJava1.0学习笔记

与Retrofit结合使用

与1.0版本总体相差不多,只需要修改相应的参数就可以了,比如subcribe()方法中的参数,请求如下所示:
主要内容为红框所示

接下来讲一下RxJava2特有的了。

Flowable及Backpressure

来由

Rxjava2中,还有一个很大的变化就是Backpressure(背压),何为背压,就是观察者来不及处理被观察者发出的事件(产生事件的速度大于处理事件的速度),导致事件被无限堆积,最后产生异常。Flowable就是由此产生,专门用来处理这类问题。将原来的Observable拆分成了新的Observable和Flowable,同时其他相关部分也同时进行了拆分。

注意:处理Backpressure的策略仅仅是处理Subscriber接收事件的方式,并不影响Flowable发送事件的方法。即使采用了处理Backpressure的策略,Flowable原来以什么样的速度产生事件,现在还是什么样的速度不会变化,主要处理的是Subscriber接收事件的方式。

处理Backpressure的策略

  • 产生原因:
    生产者和消费者不在同一线程下,生产者的速度大于消费者的速度,就会产生Backpressure问题。如果生产者和消费者在同一线程下,不会产生Backpressure问题,所以可以说成同步不会产生问题,异步可能产生问题。

  • 处理策略:
    1. ERROR策略
    产生Backpressure问题的时候直接抛出异常(MissingBackpressureException)

          Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
          @Override
          public void subscribe(FlowableEmitter<String> e) throws Exception {
              e.onNext("我");
              e.onNext("爱");
              e.onNext("你");
              e.onComplete();
          }
          }, BackpressureStrategy.ERROR);
    
          Subscriber<String> subscriber = new Subscriber<String>() {
              @Override
              public void onSubscribe(Subscription s) {
                  Log.e(TAG, "onSubscribe: ");
                  s.request(Long.MAX_VALUE);
              }
    
              @Override
              public void onNext(String s) {
                  Log.e(TAG, "onNext: " + s);
              }
    
              @Override
              public void onError(Throwable t) {
                  Log.e(TAG, "onError: " + t.getMessage());
              }
    
              @Override
              public void onComplete() {
                  Log.e(TAG, "onComplete: " );
              }
          };
          flowable.subscribeOn(Schedulers.io())
                  .observeOn(AndroidSchedulers.mainThread())
                  .subscribe(subscriber);
    

上述代码中在Flowable(被观察者)添加了一个参数,并且onSubscribe(Subscription s)中传给我们的不再是Disposable了, 而是Subscription。然而Subscription也可以用于切断观察者与被观察者之间的联系,调用Subscription.cancel()方法,并在此方法中添加了s.request(long count);这个方法就是用来向生产者申请可以消费的事件数量。这样就可以根据自身的消费能力进行消费。在异步调用时,RxJava中有个缓存池,用来缓存消费者处理不了暂时缓存下来的数据,缓存池的默认大小为128,即只能缓存128个事件。无论request()中传入的数字比128大或小,缓存池中在刚开始都会存入128个事件。当然如果本身并没有这么多事件需要发送,则不会存128个事件。
在ERROR策略下,如果缓存池溢出,就会立刻抛出MissingBackpressureException异常。

注:如果不调用request表示消费能力为0。如果不限制想request()中传入任意参数,超过消费能力,也会造成资源浪费,产生OOM。

2. BUFFER策略
BUFFER就是把RxJava中默认的只能存128个事件的缓存池换成一个大的缓存池,支持存很多很多的数据。
这样,消费者通过request()即使传入一个很大的数字,生产者也会生产事件,并将处理不了的事件缓存。
但是这种方式任然比较消耗内存,除非是我们比较了解消费者的消费能力,能够把握具体情况,不会产生OOM。BUFFER要慎用
3. DROP策略
当消费者处理不了事件,就丢弃。
消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。
4. LATEST策略
LATEST与DROP功能基本一致。
消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。
唯一的区别就是LATEST总能使消费者能够接收到生产者产生的最后一个事件。
还是以上述例子展示,唯一的区别就是Flowable不再无限发事件,只发送1000000个。

参考:http://www.jianshu.com/p/d149043d103a

源码地址:https://github.com/MrMJL/RxJava2Demo
由于只是学习笔记,源码可能会有点乱,又不对或者不明白欢迎评论多多交流。推荐一个Android实习&&经验交流群:541144061

相关文章

  • RxJava2.0学习笔记

    序言 由于我是先学习了1.0版本接着继续学习2.0,所以本文可能不太适合没有接触过RxJava的同学,所以可以先看...

  • RxJava2.0学习笔记

    核心:异步 观察者模式 作用:RxJava的特点就是可以非常简便的实现异步调用,可以在逻辑复杂的代码逻...

  • RxJava2.0学习笔记

    第一步:创建被观察者(observable) 第二步 创建观察者(observer) 被观察者与观察者建立联系ob...

  • RxJava2.0 - 文章一

    前言 自己在学习RxJava2.0时,参考了大神的博客,然后在这里做一个笔记为了方便自己以后复习和查看,同时也给需...

  • Rxjava系列(六) RxJava2.0操作符详解

    Rxjava2.0概述 通过前面的文章介绍,读者对RxJava2.0应该有了初步的认识。RxJava2.0相对1....

  • RXjave总结

    文章 给初学者的RxJava2.0教程(一)给初学者的RxJava2.0教程(二)

  • RxJava2.0入门系列一:观察者模式

    RxJava2.0最好的学习资料当然是官方资料传送门:https://github.com/ReactiveX/R...

  • RxJava

    教程 给初学者的RxJava2.0教程(一) 给初学者的RxJava2.0教程(二) 给初学者的RxJava2.0...

  • RxJava2.0源码初探

    RxJava2.0源码初探 RxJava2.0的源码相对于1.0发生了很大的变化, 命名方式也发生了很大变化, 下...

  • Rxjava2.0 发生订阅关系 的源码解析

    由于要做一场关于rxjava2.0 的内部分享,本人便怀着期待的心情去了解了下rxjava2.0,关于rxjava...

网友评论

      本文标题:RxJava2.0学习笔记

      本文链接:https://www.haomeiwen.com/subject/sxnijxtx.html