美文网首页
RxJava 使用篇

RxJava 使用篇

作者: RobinYeung | 来源:发表于2018-09-12 18:08 被阅读154次

一、什么是RxJava

Rx = Reactive Extension

  • Reactive 响应

    • 响应式编程
    • 基于观察者模式
      • 注入回调
      • 调用回调
    • 事件序列
      • 不可预知、动态离散
      • 如用户点击、异步请求、Model更新
  • Extension 扩展

    • 函数式编程
      • 单一职责原则
      • 依赖倒置原则
    • 数据流
      • 现成的,静态的,连续的
      • 如处理字符串数据,文件数据等
    • 线程调度
      • 时间控制:延时、周期任务
      • 线程控制:前后台调度
    • 异常处理
      • 重试
      • onError/onErrorResumeNext/onErrorReturn
    • 轮子
      • 转换、过滤、防抖、组合、重复、重试等操作符
  • 三种编程思想的对比 举例把大象放进冰箱

    • 面向对象编程:
      1. 构建一个冰箱,具有开门,关门的方法
      2. 构建一个大象,具有走进冰箱的方法
      3. 实例化一个冰箱对象,实例化一个大象对象,冰箱对象调用开门方法,大象对象调用走进冰箱方法,冰箱对象调用关门方法,大象被成功装入冰箱。
    • 响应式编程:
      1. 构建一个发射源,发送大象
      2. 构建一个响应器,接受到大象后关进冰箱。
      3. 用这个响应器监听发射源。
    • 函数式编程:
      1. 构建一个函数,接收大象和冰箱两个参数
      2. 在此函数内部做实现,返回冰箱已经装入大象
      3. 调用此函数,将大象和冰箱作为参数传入

二、基本知识

观察源

观察源
  • Observable / Flowable

    • 基本数据流观察源
    • 可发射多个onNext事件
    • 可发射onError或onComplete事件来终止结束整个事件流
  • Single

    • 单独发射一个onSuccess或者onError事件
    • onSuccess相当于onNext
  • Completable

    • 单独发射一个onComplete或onError事件,一般用于单纯的调用,而没有数据处理的逻辑
    • 比Observable少了很多处理元素的操作符
    • 经常使用andThen来转换流到其他观察源
  • Maybe

    • Single和Completable的结合。有onSuccess、onComlete、onError三种事件,但只会发射其中一个。

事件源 Subject

  • PublishSubject
    • 观察者只能收到订阅之后的事件
  • BehaviorSubject
    • 粘性,订阅时会立即收到订阅前最后一个事件或默认事件
  • ReplaySubject
    • 无论什么时候订阅,都可以收到所有事件
    • 当然,可以指定Replay的初始容量(默认16),上限(默认无上限),或Replay的时间上限
  • AsyncSubject
    • subject的onComplete被调用时,才会把事件发射给观察者
  • SerialedSubject
    • 串行Subject,保证发射一个事件,消费完才会发射下一个事件

背压

  • Observable / Flowable 无背压处理
    • Observable数据流处理元素数量不要过多,否则容易OOM
    • Flowable专门用于处理大量数据流,如解析各种流等。
    • Flowable可以控制数据发出速度
  • Subject / Process 有背压处理
    • Subject消费速度不要低于生产速度,否则可能出现OOM
    • Process可以选择背压策略来处理消费速度低于生产速度的情况
  • 背压策略
    • 上游背压策略。通过 create 或 toFlowable 创建的时候可以选择5种策略
      • MISSING: 背压交由下游处理(通过onBackpressureXXX)
      • ERROR: 下游无法处理时,抛出MissingBackpressureException
      • BUFFER: 缓存起来,直到下游可以消费掉
      • DROP: 抛弃掉,如果下游无法处理
      • LATEST: 只保留最新的
    • 下游背压处理
      • onBackpressureDrop 方法处理背压,下游实现onDrop方法
      • onBackpressureLatest 方法处理背压,相当于上游选择了LATEST策略
      • onBackpressureBuffer 方法处理背压,此时可以选择ERROR DROP_OLDEST DROP_LATEST三种应对策略

事件流

  • 创建:创建事件流或数据流
  • 组合:使用链式操作符来变换所创建的事件流
  • 监听:订阅事件流并实现业务响应事件

操作符

  • 创建
    • create 用函数式创建观察源
    • just 用常量或变量创建观察源
    • formArray 用数组创建观察源,元素逐个发送
    • fromIterable 用可迭代对象创建观察源,元素逐个发送
    • range 用整数数列创建观察源,元素逐个发射
    • timer interval 时间类观察源,此类观察源默认使用computation调度器
    • merge concat 等组合类,可以合并多个观察源来创建一个观察源
  • 转换
    • map 变换元素
    • flatMap 从元素切换到新的观察源
  • 过滤
    • filter 符合条件的发射到下游
    • distinct 非重复的元素才发射到下游
    • take 指定允许发射到下游的个数
    • skip 忽略发射到下游的个数
    • ofType 只允许指令类型的元素发射到下游
  • 防抖
    • debounce throttle 防抖或取样
  • 组合
    • merge concat 合并多个流,并按规则分别发射这些流的元素
    • 各个流不会相互影响发射到下游的结果
  • 聚合
    • zip amb combineLast scan 合并多个流,并对这些流的元素合并处理后再发射到下游
    • 各个流会相互影响发射到下游的结果
  • 重复
    • repeat onComplete后自动重新订阅
    • retry onError后自动重新订阅
  • 异步阻塞转同步
    • blockingFirst等

线程调度

  • 分类与调度规则
    • Schedulers.trampoline
      • 默认。当前线程
    • Schedulers.single
      • 一个单例的后台线程
    • Schedulers.newThread
      • 总是启用新线程,并在新线程执行操作。
    • Schedulers.io
      • I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的调度器。和newThread() 类似,区别在于 io() 实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 性能消耗更低。
      • 不要把计算工作放在 io() 中,可以避免创建不必要的线程。
    • Schedulers.computation
      • 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 限制性能的操作,例如图形的计算,延时计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
      • timer、interval等很多时间相关操作符将其作为默认调度器
    • Schedulers.from()
      • 通过一个指定一个Executor来担当调度器
    • AndroidSchedulers.mainThread
      • RxAndroid提供的,它指定的操作将在 Android 主线程运行。

三、其他

扩展 Rx相关库

  • Retrofit
    • 网络请求响应
  • RxBinding
    • View响应
  • RxPermissions
    • Permission状态改变响应
  • RxLifeCycle
    • 作者并不推崇这个库
  • RxBus
    • 事件总线
// RxBinding
RxView.clicks(view)
    .throttleFirst(ms, TimeUnit.MILLISECONDS)
    .compose(RxLifecycleAndroid.bindView(view))
    .subscribe(x -> listener.onClick(view));
    
// RxPermissions
new RxPermissions(getActivity())
    .request(Manifest.permission.CAMERA)
    .subscribe(granded -> {
        if(granted) {
            // ...
        }
    });
    
// RxLifecycle
public class OneFragment extends RxFragment {
    @Override
    public void onCreate(Bundle savedInstanceState) {
        super.onCreate(saveInstanceState);
        PublishSubject.create()
            .compose(bindUntilEvent(FragmentEvent.DESTROY))
            .subscribe();
    }
}

// RxPreferences
RxPreferences.INSTANCE
    .<String>onPreferenceChanged()
    .filter(p -> TextUtils.equels(p.getKey(), SOME_PREFERENCE_KEY))
    .subscribe(p -> ...);
    
// RxNetwork
RxNetwork.INSTANCE
    .onConnectionChanged()
    .filter(info -> info.getNetworkType() == ConnectivityManager.TYPE_WIFI)
    .filter(info -> !info.isConnected())
    .observeOn(AndroidSchedulers.mainThread())
    .subcribe(info -> ...);

常见坑

  • 生命周期
    • dispose!
      • 由于很多事件流都不在主线程,避免线程泄露必须注意dispose
    • dispose?
      • dispose只是结束订阅事件流,首先不能立即停止最后一个异步事件,更不能停止操作符引入的线程
    • RxLifeCycle?
      • 其作者对这个库持怀疑态度
      • 如果订阅的地方发生在没有生命周期的类中,就需要组件去获取Activity的生命周期,然而这种行为是没有保障的,当订阅失败时也是模糊的,如果不是人为去执行,往往具有不确定性
      • 有些事件队列的生命周期和Activity等的生命周期不等价,依然需要手动处理,如果手动处理和自动处理并存则让人困惑
      • 你的Activity和Fragment需要继承库里面的相关基类
      • 建议使用AutoDispose,其比RxLifeCycle更优秀
  • CompositeDisposable
    • clear 对所有add进来的Dispsable执行dispose
    • dispose 在clear的基础上,让这个CompositeDisposable无法再使用,甚至add就会dispose你的事件流
  • 线程调度
    • subscribeOn的坑
// a()运行在computation b()运行在io c() d()运行在主线程
Observable.create(emitter -> a())
    .observeOn(Schedulers.io())
    .flatMap(x -> {
        b();
        return Observable
            .create(emitter -> c())
            .subscribeOn(AndroidSchedulers.mainThread());            
    })
    .map(p -> d())
    .subscribeOn(Schedulers.computation())
    .subscribe();
// 当把flatMap里面的subscribeOn移到主流程上,事情就变了
// a()运行在主线程上 b() c() d()运行在io。第二个subscribeOn不生效
Observable.create(emitter -> a())
    .observeOn(Schedulers.io())
    .flatMap(x -> {
        b();
        return Observable.create(emitter -> c())
    })
    .subscribeOn(AndroidSchedulers.mainThread());         
    .map(p -> d())
    .subscribeOn(Schedulers.computation())
    .subscribe();
  • 背压
    • 如果不注意处理背压就可能导致OOM
  • UnDeliverableException
    • 未复写subscribe()的onError,当上游抛出错误,整个流会直接抛异常
      • 记得写onError或处理异常的操作符,如retry、onErrorReturn等
    • 流已经被dispose了,此时上游抛异常
      • dispose与Thread.interrupt()类似,只起到通知的作用,不起到立即结束的作用
      • 判断流是否断开再抛异常

相关文章

网友评论

      本文标题:RxJava 使用篇

      本文链接:https://www.haomeiwen.com/subject/qqdbgftx.html