美文网首页Android开发
RxJava本质上不变的是什么?

RxJava本质上不变的是什么?

作者: 码农的书柜 | 来源:发表于2020-11-27 17:53 被阅读0次

前言

RxJava的版本从发布到现在,已经经历了好多个版本了,虽然源码在不断的修改,但是不知你有没有发现,RxJava的主体架构还是没有变化,为什么呢? 可以说是RxJava架构决定了它的特性,比如代码逻辑的简洁以及操作符带来极强的扩展能力,这些在RxJava迭代了这么多个版本之后,这些特性,没有减少,反而大大的增强了,这个特性,就是响应式编程,那么接下来,就来讲讲RxJava为什么会有这种特性,以及带来其特性不变的本质是啥!

本文主要讲解RxJava的架构思想,不会涉及到大量的源码分析,请放心食用,文章篇幅较长,建议收藏,慢慢品尝!

1、RxJava不变的是什么?

不知你是否会有这样的焦虑,框架的更新有时候会让人感叹!

image

虽然RxJava的版本更新没有那么频繁,但是每次的更新,总会让人感觉,之前刚看的源码,是不是前功尽弃了😭,源码一更新,我又得继续去学习;

但是不知你是否有这样想过,框架一直在更新,底层有没有不变的东西,今天很高兴告诉你,RxJava框架有,那么这个不变的东西是什么呢?

架构

没错,RxJava虽然迭代了几个版本,但是其底层的架构还是没有怎么变动,为什么呢?因为其特性就决定了它的架构不会有多大的变化;

它的特性有哪些?

简单明了的代码逻辑,强大的操作符,而这些特性正是响应式编程的思想;就好比一栋房子,其特性有抗震,防风,那么其底层的架构就是必然是按着抗震和防风的特性去建造的,而一旦建造成功,其具有的特性,不会跟着房子的装修而变化,那么其底层的架构也是同样的道理;

那么你想知道是什么架构来实现这种特性的吗?

别急,下面我们先来讲一讲RxJava涉及到的设计模式,为什么设计模式这么重要呢?

因为设计模式是架构的基础,我们怎么设计才能让这个架构具有某种特性,这个和设计模式分不开;

RxJava的观察者模式

2.1、观察者模式

观察者模式,或许是我们最熟悉的设计模式,为什么呢?因为我们在代码里无时无刻都在使用着它;

它就是View的点击事件;

为什么说View的点击事件是观察者模式呢?下面我们先来看看观察者模式的定义;

定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知。

简单来说,就是被观察者与观察者之间存在着一对多的关系,一个被观察者可以被多个观察者依赖,当被观察者变化时,会通知到观察者;

而View的点击事件这里只是简单的一对一的关系,但是其实是可以实现一对多的关系;

而观察者模式的本质是当被观察者变化时,会通知到观察者,当View被点击的时候,会通过回调的监听通知到观察者,而观察者和被观察者之间存在订阅关系的时候,观察者才会被通知到;

而View的点击事件,是通过设置监听方法来实现订阅的,这就是我们最熟悉的观察者模式;

2.2、RxJava的观察者模式是怎样的呢?

RxJava的观察者模式,是扩展的观察者模式,为啥要叫扩展呢?因为它和普通的观察者模式不太一样,扩展的观察者模式,不止一个通知观察者的方法,它有好几个,下面我们来看看它的实现原理吧!

首先先来看一下RxJava观察者模式涉及到的几个类:

  • Observable:被观察者
  • Observer:观察者
  • Event:被观察者通知观察者的事件
  • Subscribe:订阅

看完下面这个图,你的心目中是不是已经对RxJava的观察者模式一目了然了;

下面我们来看一RxJava的事件类型,这个事件是被观察者用来通知观察者的,也就是Event,而Event可以分为下面几种类型,这个我们了解一下即可;

  • Next:常规事件,可以传递各种各样的数据;
  • Complete:结束事件,当观察者接收到结束事件后,就不会再接收后续被观察者发送来的事件;
  • Error:异常事件,当被观察者发送异常事件后,那么其他的事件就不会再继续发送了;

下面我用示例代码来讲一次RxJava的观察者模式;

public abstract class Observer<T> {

    // 和被观察者订阅后,会回调这个方法;
    public abstract void onSubscribe(Emitter emitter);

    // 传递常规事件,用于传递数据
    public abstract void onNext(T t);

    // 传递异常事件
    public abstract void onError(Throwable e);

    // 传递结束事件
    public abstract void onComplete();
}

image.gif

Observer类的方法很简单,都是回调,这里有个新的接口类Emitter,这个Emitter是用来干嘛的呢?

我们暂且把这个Emitter称为发射器,主要用于发射事件;

public interface Emitter<T> {

    void onNext(T value);

    void onError(Throwable error);

    void onComplete();
}

image.gif

实现逻辑就是通过包装Observer,里面最终是通过Observer来进行调用的,来看看这个类有哪些方法;

public class CreateEmitter<T> implements Emitter<T> {

    final Observer<T> observer;

    CreateEmitter(Observer<T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        observer.onNext(t);
    }

    @Override
    public void onError(Throwable error) {
        observer.onError(error);
    }

    @Override
    public void onComplete() {
        observer.onComplete();
    }
}

image.gif

里面的具体实现就是通过Observer对象来进行调用;

下面我们来看一下被观察者Observable是怎么实现的;

public abstract class Observable<T> {

    // 实现订阅的逻辑
    public void subscribe(Observer<T> observer){
        // 通过将传进来的observer包装成CreateEmitter,用于回调
        CreateEmitter<T> emitter = new CreateEmitter<T>(observer);
        // 回调订阅成功的方法
        observer.onSubscribe(emitter);

        // 回调发射器emitter
        subscribe(emitter);
    }

    // 订阅成功后,进行回调
    public abstract void subscribe(Emitter<T> emitter);

}

image.gif

这个类的逻辑很简单,就两步,第一步,进行订阅,第二步,回调Emitter对象,用于发射事件;

那么我们来看看怎么调用吧;

private void observer() {
    // 第一步,创建被观察者
    Observable<String> observable = new Observable<String>() {
            @Override
            public void subscribe(Emitter<String> emitter) {
                emitter.onNext("第一次");

                emitter.onNext("第二次");

                emitter.onNext("第三次");

                emitter.onComplete();
            }
        };

    // 第二步,创建观察者
    Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Emitter emitter) {
                Log.i("TAG", " onSubscribe ");
            }

            @Override
            public void onNext(String s) {
                Log.i("TAG", " onNext s:" + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.i("TAG", " onError e:" + e.toString());
            }

            @Override
            public void onComplete() {
                Log.i("TAG", " onComplete ");
            }
        };

    // 第三步,被观察者订阅观察者
    observable.subscribe(observer);
}

image.gif

这里是使用逻辑很简单,分为三步:

  • 第一步:创建被观察者Observable;
  • 第二步:创建观察者Observer;
  • 第三步:被观察者Observable订阅Observer;

当订阅成功之后,被观察者的subscribe方法里面,就可以通过发射器发射各种事件,最终在观察者的方法里进行回调;

RxJava也是观察者和被观察者订阅的过程,只是被观察者有变化的时候,是通过发射器来发射各种事件的,这样就不局限于一种事件了;

至于RxJava的装饰者模式,这次不做总结,下次单列出来汇总

3、为什么RxJava要这样设计?

3.1、事务的概念

在开始之前,我们先来了解一个概念,“事务”;

什么是事务?

事务,一般是指要做的或所做的事情。而在代码里我们可以理解为一段代码逻辑;

而事务的关系,我们可以理解为业务逻辑之间的关系,有可能有关联,也有可能没有关联;

比如我进入一个列表页,这个列表页的数据,需要从好几个接口请求返回的,而请求返回后我还要根据数据和网络来展示对应的页面,比如列表页或者无数据,无网络的页面,那么我展示的逻辑就是根据列表页返回的逻辑来展示的;

而这几个请求,我姑且称为事务A,事务B,事务C,事务D

事务A,事务B,事务C分别对应请求网络接口的数据,而事务D则是根据返回的数据处理展示的逻辑;

那么我正常的处理逻辑可能是这样,在子线程去处理这三个事务A,事务B,事务C,最终等都处理完了之后,再处理事务D,而这样写的坏处就是我把这几个接口的数据都放在一个子线程去执行了,那么最终结果就是会导致加载缓慢;

那么我们是否可以换成另外一种写法,事务A,事务B,事务C分别在三个子线程去执行,然后最终在三个子线程的回调里面去判断这几个接口是否已经加载完毕了,这样可以解决上面的问题,但是如果以后还有新增的事务,那么最终会导致判断的逻辑越来越臃肿;

而RxJava提供了响应式编程的思想,可以解决这类问题;

3.2、响应式编程

什么是响应式编程?

响应式编程是一种通过异步和数据流来构建事务关系的编程模型

我们可以理解为由事件来驱动事务,比如我请求网络数据成功了,发送请求成功的事件通知下一层事务进行处理;

而RxJava提供了一系列的特性,比如我们可以对事务进行变换,串联,组装等来对事务进行操作,比如上面的事务A,事务B,事务C,我们可以通过组装的方式来进行处理;

那么最终RxJava的处理逻辑如下:

将事务A,事务B,事务C进行组装,等处理完毕了之后,最终会发送一个事件通知事务D进行处理;

RxJava响应式编程带来了什么好处?

  • 1、大幅度降低事务之间的耦合性,方便后期维护与扩展;
  • 2、简化复杂的线程操作,让我们专注于业务开发,避免了很多线程并发的问题,比如线程死锁;
  • 3、提供了丰富的操作符,让我们对于事务的处理更加方便;
  • 4、对于复杂的业务,我们可以构建出清晰的代码逻辑,方便理解业务逻辑;

3.3、RxJava的架构对于响应式编程的思考

RxJava底层通过观察者模式来处理的事件传递,通过装饰者模式来处理事务的操作,由这两个设计者模式来构建了响应式编程的思想,并且装饰者模式还保证了其灵活的扩展性,比如我以后要新增一个操作符,只需要实现对应的观察者和被观察者的包装类即可;

RxJava不仅仅是一个异步框架,还提供了我们处理事务的能力,把复杂的逻辑通过响应式编程的思想,变得更清晰易懂,这让我们对于复杂业务的处理更加的得心应手;

这是非常优秀的源码,也很感叹作者奇妙的思路,很值得我们去学习;

当我们掌握了RxJava的核心原理后,那么无论源码再怎么更新,也脱离不了这个架构思想,当我们带着这个架构思想去看源码细节的时候,架构思想就是你的灯塔,让你不会迷失在茫茫的码海里😊;

相关文章

网友评论

    本文标题:RxJava本质上不变的是什么?

    本文链接:https://www.haomeiwen.com/subject/kdzxwktx.html