Rxjava 2.x

作者: Android_冯星 | 来源:发表于2019-03-26 11:09 被阅读20次

    官方网站
    github

    在ReactiveX中,一个观察者(Observer)订阅一个可观察对象(Observable)。观察者对Observable发射的数据或数据序列作出响应。这种模式可以极大地简化并发操作,因为它创建了一个处于待命状态的观察者哨兵,在未来某个时刻响应Observable的通知,不需要阻塞等待Observable发射数据。

    image.png
    1. 这是Observable的时间轴,从左到右。
    2. 这些是Obserable发出的事件
    3. 此垂直线表示Observable已成功发送处事件,事件发送完毕
    4. these dotted lines and this box indicate that a transformation is being applied to the Observable The text inside the box shows the nature of the transformation
      这些虚线和此框表示正在对Observable数据转换。
      框内的文本显示转换的性质
    5. this Observable is the result of the transformation
      对Observable转换的结果
    6. is for some reason the Observable terinates abnormally,with an error the vertical line is replaced by an x
      由于某种原因,Observable异常中断,垂直线x代替错误。

    创建Observables

    • Create — create an Observable from scratch by calling observer methods programmatically
    image.png

    创建一个默认的Observable

    • Defer — do not create the Observable until the observer subscribes, and create a fresh Observable for each observer
    image.png

    直到有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable

    Defer操作符会一直等待直到有观察者订阅它,然后它使用Observable工厂方法生成一个Observable。它对每个观察者都这样做,因此尽管每个订阅者都以为自己订阅的是同一个Observable,事实上每个订阅者获取的是它们自己的单独的数据序列。

    • Empty/Never/Throw — create Observables that have very precise and limited behavior

    Empty 创建一个不发射任何数据但是正常终止的Observable

    never 创建一个不发射数据也不终止的Observable

    Throw 创建一个不发射数据以一个错误终止的Observable

    • From — convert some other object or data structure into an Observable
      image.png

    一对多的关系

    将一个Iterable, 一个Future, 或者一个数组转换成一个Observable
    产生的Observable会发射Iterable或数组的每一项数据。

    • Interval — create an Observable that emits a sequence of integers spaced by a particular time interval
    image.png

    创建一个按固定时间间隔发射整数序列的Observable

    • Just — convert an object or a set of objects into an Observable that emits that or those objects
    image.png

    创建一个发射指定值的Observable

    Just类似于From,但是From会将数组或Iterable的数据取出然后逐个发射,而Just只是简单的原样发射,将数组或Iterable当做单个数据。

    注意:如果你传递null给Just,它会返回一个发射null值的Observable。不要误认为它会返回一个空Observable(完全不发射任何数据的Observable),如果需要空Observable你应该使用Empty操作符。

    • Range — create an Observable that emits a range of sequential integers

    创建一个发射指定范围的整数序列的Observable

    Range操作符发射一个范围内的有序整数序列,你可以指定范围的起始和长度。

    RxJava将这个操作符实现为range函数,它接受两个参数,一个是范围的起始值,一个是范围的数据的数目。如果你将第二个参数设为0,将导致Observable不发射任何数据(如果设置为负数,会抛异常)。

    • Repeat — create an Observable that emits a particular item or sequence of items repeatedly

    创建一个重复发射指定数据或数据序列的Observable

    RxJava将这个操作符实现为repeat方法。它不是创建一个Observable,而是重复发射原始Observable的数据序列,这个序列或者是无限的,或者通过repeat(n)指定重复次数。

    • Start — create an Observable that emits the return value of a function
    image.png

    创建一个Observable,它发出类似函数的指令的返回值

    • Timer — create an Observable that emits a single item after a given delay

    创建一个Observable,它在一个给定的延迟后发射一个特殊的值。

    image.png

    最基本使用

    • 创建 被观察者
    • 创建 观察者
    • 订阅事件 连接 被观察者 和观察者
    //创建 被观察者
            Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("1");
                    Log.d(TAG, "subscribe: 1");
    
                    emitter.onNext("2");
                    Log.d(TAG, "subscribe: 2");
    
                    emitter.onNext("3");
                    Log.d(TAG, "subscribe: 3");
    
                    emitter.onNext("4");
                    Log.d(TAG, "subscribe: 4");
    
                    emitter.onComplete();
                }
            }).subscribe(//订阅
                    new Observer<String>() {//观察者
    
                        private Disposable disposable;
    
    
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.d(TAG, "onSubscribe: " + d.toString());
                            this.disposable = d;
                        }
    
                        @Override
                        public void onNext(String s) {
                            Log.d(TAG, "onNext: " + s);
    
                            Log.d(TAG, "onNext: disposed 前" + disposable.isDisposed());
                            if (s.equals("2")) {
                                disposable.dispose();
                            }
                            Log.d(TAG, "onNext: disposed 后" + disposable.isDisposed());
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "onError: " + e);
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete: ");
                        }
                    });
    
    

    在 Observer观察者 中,多了一个回调方法:onSubscribe,传递参数为Disposable,Disposable 相当于 RxJava 1.x 中的 Subscription, 用于解除订阅。当s等于2时,解除订阅,观察者不会在接收到被观察者发出的消息,也包裹 onComplete事件。

    在请求的过程中Activity已经退出了, 这个时候如果回到主线程去更新UI, 那么APP肯定就崩溃了, 怎么办呢, 上一节我们说到了Disposable , 说它是个开关, 调用它的dispose()方法时就会切断水管, 使得下游收不到事件, 既然收不到事件, 那么也就不会再去更新UI了. 因此我们可以在Activity中将这个Disposable 保存起来, 当Activity退出时, 切断它即可.

    那如果有多个Disposable 该怎么办呢, RxJava中已经内置了一个容器CompositeDisposable, 每当我们得到一个Disposable时就调用CompositeDisposable.add()将它添加到容器中, 在退出的时候, 调用CompositeDisposable.clear() 即可切断所有的水管.

    上游可以发送无限个onNext, 下游也可以接收无限个onNext.
    当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件.
    当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件.
    上游可以不发送onComplete或onError.
    最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然

    下面是log信息。

        onNext: 1
        onNext: disposed 前false
        onNext: disposed 后false
        subscribe: 1
        onNext: 2
        onNext: disposed 前false
        onNext: disposed 后true
        subscribe: 2
        subscribe: 3
        subscribe: 4
    

    form just 输出一样

    Observable<Integer> form = Observable.fromArray(items);
    
    Observable<Integer> just = Observable.just(1, 2, 3, 4, 5);
    
    Observable<Integer> range = Observable.range(1, 10);
    
        onNext: 1
        onNext: 2
        onNext: 3
        onNext: 4
        onNext: 5
        onNext: 6
        onNext: 7
        onNext: 8
        onNext: 9
        onNext: 10
        onComplete: 
    

    subscribe 观察者

        public final Disposable subscribe() {}
        // 表示观察者不对被观察者发送的事件作出任何响应(但被观察者还是可以继续发送事件)
    
        public final Disposable subscribe(Consumer<? super T> onNext) {}
        // 表示观察者只对被观察者发送的Next事件作出响应
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
        // 表示观察者只对被观察者发送的Next事件 & Error事件作出响应
    
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
        // 表示观察者只对被观察者发送的Next事件、Error事件 & Complete事件作出响应
    
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
        // 表示观察者只对被观察者发送的Next事件、Error事件 、Complete事件 & onSubscribe事件作出响应
    
        public final void subscribe(Observer<? super T> observer) {}
        // 表示观察者对被观察者发送的任何事件都作出响应
    

    调度器 Scheduler

    • Schedulers.computation( )
      用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量

    • Schedulers.from(executor)
      使用指定的Executor作为调度器

    • Schedulers.immediate( )
      在当前线程立即开始执行任务

    • Schedulers.io( )
      用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器

    • Schedulers.newThread( )
      为每个任务创建一个新线程

    • Schedulers.trampoline( )
      当其它排队的任务完成后,在当前线程排队开始执行

    简单的来说, subscribeOn() 指定的是上游发送事件的线程, observeOn() 指定的是下游接收事件的线程.
    多次指定上游的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略.
    多次指定下游的线程是可以的, 也就是说每调用一次observeOn() , 下游的线程就会切换一次.

    相关文章

      网友评论

        本文标题:Rxjava 2.x

        本文链接:https://www.haomeiwen.com/subject/mihbvqtx.html