★42.RxJava2

作者: iDragonfly | 来源:发表于2017-06-29 00:08 被阅读0次

    简介

    • 相关网站:GitHub地址官方文档中文文档
    • RxJava2 通过一种扩展的 观察者模式 来实现 异步 执行任务。
    • RxJava2 能方便地进行 线程 切换。
    • RxJava2 能方便地把 异步 执行的代码写在一处。

    基础知识

    相关概念

    • Observable被观察者
    • Observer观察者
    • Subscriber订阅者 ,实现了Observer接口,多了unsubscribe(),用来取消订阅。
    • Subscription:类似SubscriberObservable调用subscribe()方法返回的对象。
    • Subject:可以当作ObservableObserver来用。
    • subscribe()订阅 方法。
    • Event:事件。
    • Scheduler调度器 ,相当于 线程控制器
    • Action0:接口,里面只有一个无返回值0参数的call(),同理有Action1Action2等,代表着1个、2个参数等。
    • Func0:接口,类似Action0,区别是Func0有返回值。

    相关方法

    • onNext():在 事件队列 中,进入下一个事件时调用,同时也是 事件处理方法
    • onCompleted()事件队列 完成时调用。
    • onError()事件队列 错误时调用。

    简单示例

    1. 普通示例

    1. 定义Observable

    Observable<Integer> observable = Observable.create(emitter -> {
        // Todo: 执行后台请求,请求后的结果通过onNext()发送给Observer
        // 发送消息
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        // 消息发送完毕
        emitter.onComplete();
    });
    

    2. 定义Observer

    Observer<Integer> observer = new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "subscribe");
            // 订阅时
            // Disposable对象可以保存,日后通过调用Disposable.dispose()来中断订阅。
        }
    
        @Override
        public void onNext(Integer value) {
            Log.d(TAG, "" + value);
            // 接收到事件时
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "error");
            // 接收到错误时
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "complete");
            // 事件接收完毕时
        }
    };
    

    3. 订阅

    注意 :此处代码的书写顺序看起来不符合直觉,这样设计是为了便于 链式调用

    observable.subscribe(observer);
    

    2. 链式示例

    Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
        // Todo: 执行后台请求,请求后的结果通过onNext()发送给Observer
        // 发送消息
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        // 消息发送完毕
        emitter.onComplete();
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d(TAG, "subscribe");
            // 订阅时
            // Disposable对象可以保存,日后通过调用Disposable.dispose()来中断订阅。
        }
    
        @Override
        public void onNext(Integer value) {
            Log.d(TAG, "" + value);
            // 接收到事件时
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(TAG, "error");
            // 接收到错误时
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "complete");
            // 事件接收完毕时
        }
    });
    

    3. 无Observer示例

    subscribe()的所有重载形式

    public final Disposable subscribe() {}
    public final Disposable subscribe(Consumer<? super T> onNext) {}
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
    public final void subscribe(Observer<? super T> observer) {}
    

    代码

    Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onComplete();
        emitter.onNext(4);
    }).subscribe(
            integer -> Log.d(TAG, "onNext: " + integer),
            throwable -> Log.d(TAG, "onError"),
            () -> Log.d(TAG, "onComplete"));
    

    示例解说

    • emitterObservableEmitter类型,用于发射onNext()onComplete()onError()消息。
    • Disposable对象可以保存,日后通过调用Disposable.dispose()中断订阅中断订阅 以后发送的消息无法被Observer接收到。
    • Observable可以不发送onComplete()onError()
    • Observable发送了一个onComplete()后,ObservableonComplete()之后的onNext()将会继续发送,而Observer收到onComplete()之后将不再继续接收onNext()
    • Observable发送了一个onError()后,ObservableonError()之后的onNext()将继续发送,而Observer收到onError()之后将不再继续接收onNext()
    • onComplete()onError()必须 唯一 并且 互斥 ,即只能发送一个onComplete()onError()

    事件流向图

    事件流向 示意图
    只发送onNext()事件
    发送onComplete()事件
    发送onError()事件

    线程相关

    切换线程示例

    • subscribeOn():控制observable在什么 线程 中发送事件。
    • observeOn():控制observer在什么 线程 中处理事件。
    // observable的链式操作中
    observable.subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(observer);
    

    线程种类

    线程种类 描述
    Schedulers.newThread() 线程
    Schedulers.computation() 用于CPU计算密集型的操作的 线程
    Schedulers.io() 用于IO操作的 线程 ,如网络IO,文件IO,数据库IO。
    AndroidSchedulers.mainThread() AndroidUI主线程

    注意事项

    • ObservableObserver默认是在 UI主线程 中运行的。
    • Observable多次切换线程的话,只有第一次有效。
    • Observer多次切换线程的话,只有最后一次生效。

    过滤操作符

    Sample

    简介

    • Sample操作符定时查看一个Observable,然后发射自上次采样以来它最近发射的数据。
    • 会导致某些事件丢失。

    示意图

    变换操作符

    Map

    示意图

    简介

    用于将一种类型的 事件 转换为另一种类型的 事件

    啰嗦示例

    Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
        emitter.onNext(1);
    }).map(new Function<Integer, String>() {
        @Override
        public String apply(@NonNull Integer integer) throws Exception {
            return "This is result " + integer;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(@NonNull String s) throws Exception {
            Log.d(TAG, s);
        }
    });
    

    简洁示例

    Observable.create((ObservableOnSubscribe<Integer>) emitter -> emitter.onNext(1))
            .map(integer -> "This is result " + integer)
            .subscribe(s -> Log.d(TAG, s));
    

    FlatMap

    示意图

    简介

    • FlatMap将单个 事件事件队列 变换为一个发送单个 事件事件队列Observable
    • 可以通过FlatMap事件 变换为Observable来实现链式中连接多个ObservableObservable->事件->Observable->事件...->Observer
    • FlatMap并不保证 事件 的顺序,如果需要保证顺序则使用ConcatMap,使用方法同FlatMap

    啰嗦示例

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
        }
    }).flatMap(new Function<Integer, Observable<String>>() {
        @Override
        public Observable<String> apply(Integer integer) throws Exception {
            final List<String> list = new ArrayList<>();
            for (int i = 0; i < 3; i++) {
                list.add("I am value " + integer);
            }
            return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            Log.d(TAG, s);
        }
    });
    

    简洁示例

    Observable.create((ObservableOnSubscribe<Integer>) emitter -> emitter.onNext(1))
            .flatMap(new Function<Integer, Observable<String>>() {
                @Override
                public Observable<String> apply(Integer integer) throws Exception {
                    final List<String> list = new ArrayList<>();
                    for (int i = 0; i < 3; i++) {
                        list.add("I am value " + integer);
                    }
                    return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
                }
            })
            .subscribe(s -> Log.d(TAG, s));
    

    ConcatMap

    • ConcatMap保证 事件 的顺序,使用方法同FlatMap
    • 示例省略,参考FlatMap

    Zip

    示意图

    简介

    • 组合多个Observable发送的 事件 ,然后发送这个 事件组合
    • 事件组合 的顺序是严格按照 事件 发送的顺序来进行的。
    • 发送 事件 的数量,与所有Observable事件 数量最少的那个一样。
    • 注意:这多个Observable在同一个 线程 时无法将 事件 组合发送,即发送一个Observable的所有 事件 以后,再发送另一个Observable
    • 应用场景:一个界面需要展示用户的一些信息,而这些信息分别要从两个服务器接口中获取,而只有当两个都获取到了之后才能进行展示。

    啰嗦示例

    Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            Log.d(TAG, "emit 1");
            emitter.onNext(1);
    
            Log.d(TAG, "emit complete1");
            emitter.onComplete();
        }
    }).subscribeOn(Schedulers.io());
    
    Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            Log.d(TAG, "emit A");
            emitter.onNext("A");
    
            Log.d(TAG, "emit complete2");
            emitter.onComplete();
        }
    }).subscribeOn(Schedulers.io());
    
    Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
        @Override
        public String apply(@NonNull Integer integer, @NonNull String s) throws Exception {
            return integer + s;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(@NonNull String s) throws Exception {
            Log.d(TAG, "onNext: " + value);
        }
    });
    

    简洁示例

    Observable<Integer> observable1 = Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
        Log.d(TAG, "emit 1");
        emitter.onNext(1);
    
        Log.d(TAG, "emit complete1");
        emitter.onComplete();
    }).subscribeOn(Schedulers.io());
    
    Observable<String> observable2 = Observable.create((ObservableOnSubscribe<String>) emitter -> {
        Log.d(TAG, "emit A");
        emitter.onNext("A");
    
        Log.d(TAG, "emit complete2");
        emitter.onComplete();
    }).subscribeOn(Schedulers.io());
    
    Observable.zip(observable1, observable2, (integer, s) -> integer + s).subscribe(s -> Log.d(TAG, "onNext: " + s));
    

    特殊情况处理

    对于ObserverUI主线程 进行 事件 处理,但是接收到 事件 时,所在的ActivityFragment已经退出的情况,需要通过调用Disposable.dispose()Subscription.cancel()中断订阅

    1. 创建CompositeDisposable

    • CompositeDisposable对象是储存Disposable对象的容器。
    • 作为ActivityFragment的成员:
      CompositeDisposable mCompositeDisposable = new CompositeDisposable();
      

    2. 保存Disposable

    Observer.onSubscribe(...)中:

    @Override
    public void onSubscribe(Disposable d) {
        mCompositeDisposable.add(d);
    }
    

    3. 中断订阅

    ActivityFragmentonDestroy()中:

    mCompositeDisposable.clear();
    

    Flowable

    简介

    • Flowable类似Observable,但是性能不如Observable
    • Flowable为解决 Backpressure问题 而生。

    Backpressure问题简介

    • Backpressure问题 :当 事件 的发送远远快于 事件 的消耗时,未消耗的 事件 会堆积起来,最终可能发生 OOM
    • 出现 Backpressure问题 可能是因为两个原因:
      • 某个Observable发送 事件 的速度太快,或数量太多。
      • 某个Observer处理 事件 的速度太慢。
    • 事件 的发送和接收在同一个 线程 的时候不会出现这种问题,因为必定会处理完一个 事件 以后才能继续发送下一个 事件

    简洁示例

    Flowable<Integer> upstream = Flowable.create(emitter -> {
        // 直到下游开始请求事件
        while (emitter.requested() == 0) {
            if (emitter.isCancelled())
                break;
        }
        Log.d(TAG, "emit 1");
        emitter.onNext(1);
        Log.d(TAG, "emit complete");
        emitter.onComplete();
    }, BackpressureStrategy.ERROR);
    
    Subscriber<Integer> downstream = new Subscriber<Integer>() {
        @Override
        public void onSubscribe(Subscription s) {
            Log.d(TAG, "onSubscribe");
            s.request(10);
        }
    
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "onNext: " + integer);
        }
    
        @Override
        public void onError(Throwable t) {
            Log.w(TAG, "onError: ", t);
        }
    
        @Override
        public void onComplete() {
            Log.d(TAG, "onComplete");
        }
    };
    

    示例解说

    • BackpressureStrategy背压策略 ),表示 事件 发送速度快于消耗速度时说使用的策略,有以下几种:
      • MISSING:多出来的 事件 直接丢弃。
      • ERROR:抛出MissingBackpressureException异常。
      • BUFFER:全部 事件 缓存直到被消耗,注意 OOM 问题。也可以通过Flowable.onBackpressureBuffer()设置。
      • DROP:丢弃最近发送的 事件 。也可以通过Flowable.onBackpressureDrop()设置。
      • LATEST:只保留一个最近的 事件 覆盖前一个 事件 。也可以通过Flowable.onBackpressureLatest()设置。
    • Subscription.cancel():可以用于 取消订阅关系
    • Subscription.request(..):告知FlowableObserver事件请求量 ,可以多次调用,每次调用累加 事件请求量 计数器。
    • FlowableEmitter.requested():得知Observer事件请求量

    相关文章

      网友评论

        本文标题:★42.RxJava2

        本文链接:https://www.haomeiwen.com/subject/gnbpcxtx.html