美文网首页自定义view
RxJava 3.x系列(一)简介及其Observable订阅

RxJava 3.x系列(一)简介及其Observable订阅

作者: 文泰ChrisTwain | 来源:发表于2020-09-20 16:35 被阅读0次
优先参考:给初学者的RxJava2.0教程 1-10系列

RxJava 3.x系列(二)操作符 & 背压(BackPressure)

一、RxJava简介

  • RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

    2012年启动,2013.2发布距今已7年,在Java和Android异步事件处理、数据过滤与变换提供函数响应式编程能力.

    缘由:Netflix为增加服务器性能和吞吐量编写RxJava并开源,主要开发维护人员:David Karnok、Ben Christensen、Shixiong Zhu、Jake Wharton...

  • 函数响应式编程:函数式编程+响应式编程,面向数据流和变化传播的编程范式(领域中主流的行事套路、规范化的关系);

  • 适用场景以及与AsyncTask、Handler、Executor适用场景异同:复杂网络数据流请求以及与操作符对于的数据变换处理,同时包含对OkHttp、Retrofit、Gson支持;

  • RxAndroid关系:RxJava针对Android平台的扩展,为Android开发提供响应式扩展组件;

  • Kotlin协程对比(待进一步分析):RxJava基于Rx特性封装的Java函数库,协程是Kotlin编译器级别概念,理解为基于编译器or虚拟机设计的轻量级线程。两者都有数据变换、异步数据流处理能力;

  • RxKotlin:适配Kotlin的ReactiveX版本;

  • 内存占用、代码混淆、包体积(待进一步分析):内存占用RxJava > 协程,回收RxJava创建的对象,需给予更高的 CPU占用,也会转变为更高的电量损耗.

二、RxJava第一行代码与Observable订阅过程

  • RxJava第一行代码
Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onComplete();
    }
}).subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "subscribe");
    }

    @Override
    public void onNext(Integer value) {
        Log.d(TAG, "onNext: " + value);
    }

    @Override
    public void onError(Throwable e) {
        Log.d(TAG, "error");
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "complete");
    }
});
  • Obserbable订阅过程
1.整体流程:上游创建、下游创建、订阅
2.Observable create后subscribe下游Observer,使得Observable触发subscribeActual()方法
3.ObservableCreate类中subscribeActual()方法通过ObservableOnSubscribe类的subscribe()方法订阅CreateEmitter
4.Observer作为成员变量暴露给ObservableEmitter,ObservableEmitter调用onNext()、onError()、onComplete()将回调下游Observer的onNext()、onError()、onComplete()方法
注:RxJava基于观察者模式,而观察者模式本质是回调。上游订阅下游实际表现为:CreateEmitter传入Observer作为其成员变量,CreateEmitter调用onNext()发射数据,将立即回调Observer的onNext()方法
  • RxJava第一行代码类图及各方法关系如下:


    RxJava_Observable_class.png

三、线程切换过程

    ...
        emitter.onNext(3);
        emitter.onComplete();
    }
}) .subscribeOn(Schedulers.io())                // 切换上游线程
   .observeOn(AndroidSchedulers.mainThread())   // 切换下游线程
   .subscribe(new Observer<Integer>() {
    @Override
    public void onSubscribe(Disposable d) {
        Log.d(TAG, "subscribe");
    }

    @Override
    public void onNext(Integer value) {
        Log.d(TAG, "onNext: " + value);
    }
    ...

原理:RxJava提供Scheduler作为线程调度器,底层通过封装HandlerExecutor实现线程切换

  • 上游线程切换subscribeOn(Schedulers.newThread())

    subscribeOn()调用后,生成ObservableSubscribeOn对象,并将原有Observable作为成员变量传入
    subscribeActual()方法中scheduler.scheduleDirect(new SubscribeTask(parent))  //实现线程切换;
    SubscribeTask为Runable对象,run()方法执行source.subscribe(parent);            //上游随即切换到对应线程
    scheduleDirect(Runnable run)方法内部创建了Worker对象,通过调用worker.shedule(Runnable, long delay, TimeUnit)实现真正线程切换,worker内部包含线程池等切线程操作
    
  • 下游线程切换observeOn(AndroidSchedulers.mainThread())

    observeOn()调用后,生成ObservableObserveOn对象,并将原有Observable作为成员变量传入
    subscribeActual()方法中通过Scheduler.Worker w = scheduler.createWorker()创建Worker对象
    source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize)) 完成上下游订阅(给下游传入了Worker对象)
    上游发射数据后,即ObserveOnObserver的onNext()方法被调用,导致worker.schedule(this)执行
    ObserveOnObserver即上述方法中的this,它实现了Runnable接口,其run()方法运行在指定线程,run()中也会调用下游onNext()方法,下游开始执行,故下游即被切换到对应线程
    注:下游线程切换后,由于上下游线程不同,上游发射数据后会将数据发射到消息队列,ObserveOnObserver拥有消息发送队列,被观察者发送消息即入队,下游从消息队列中取数据。消息队列queue是ObserveOnObserver的成员变量,queue创建于ObserveOnObserver的onSubscribe()方法,queue = new SpscLinkedArrayQueue<>(bufferSize)
    
  • 注:上下游同线程,上游发射一次onNext(),下游onNext随即被调用一次,因为属于同步调用;上下游不同线程,属于异步调用,上游发射onNext()会进入消息队列,下游会从消息队列取数据回调其onNext(),故上下游调用顺序不能保证。

  • subscribeOn()切换线程时序图 注:红色方框内为重点切线程操作

    subscribeOn_sequence.png
  • observeOn()切换线程时序图

    observeOn_sequence.png
参考:

ReactiveX - Operators
ReactiveX/RxJava文档中文翻译
Github-ReactiveX-RxJava
详解RxJava消息订阅和线程切换原理
Rxjava线程变换原理浅谈

P.S. 本文源码基于RxJava 3.0.4 结尾时序图引用自微信公众号
By the way: 非常感谢能提出批评和修改意见
contact: tu.wentai@nexuslink.cn
WeChat:HUC-Kris

相关文章

网友评论

    本文标题:RxJava 3.x系列(一)简介及其Observable订阅

    本文链接:https://www.haomeiwen.com/subject/faqpyktx.html