美文网首页
RxJava原理解析

RxJava原理解析

作者: 付小影子 | 来源:发表于2022-06-15 10:42 被阅读0次

rxJava的思维

响应式编程,卡片式编程,流式编程,有一个起点和一个终点,起点开始流向我们的“事件”, 把事件流向终点,只不过在流向的过程中,可以增加拦截,拦截是可以对事件进行改变,终点只关心他的上一个拦截返回信息。

rxJava 观察者设计模式或者发布订阅模式

1.创建被观察者 Observable
2.创建观察者Observer
3.观察者订阅被观察者 subscribe()
RxBinding 可以防抖动,flateMap避免网络嵌套,doOnnext() 多api接口链式调用

Rxjava 全局Hook 点

 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
         //rxjava预留给开发者的hook 钩子
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
微信图片_20220615093821.png
微信图片_20220615094024.png
1111.png

rxjava 源码解析 之观察者 Observer

Observer就是一个接口,预留四个函数,回调不同状态

public interface Observer<T> {
  //开始订阅 回调函数
    void onSubscribe(@NonNull Disposable d);
//回调结果
    void onNext(@NonNull T t);
//出现错误
    void onError(@NonNull Throwable e);
//订阅结束
    void onComplete();
}

rxjava源码解析 之被观察者 Observable

// ObservableOnSubscribe
public interface ObservableOnSubscribe<T> {
    //ObservableEmitter 分发器
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

 //被观察者
        Observable.create(
            //自定义source
            object :ObservableOnSubscribe<String>{
                override fun subscribe(emitter: ObservableEmitter<String>) {
                   emitter.onNext("A")
                }
        })


 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        //判空
        ObjectHelper.requireNonNull(source, "source is null");    
        //RxJavaPlugins.onAssembly 全局封装,预留hook点,如果开发者自定义了function,那么会在所有操作符执行前,先执行开发者自定义的函数。
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

//创建真正的操作符对象,把开发者自定义的ObservableOnSubscribe 作为参数构造
   new ObservableCreate<T>(source)

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //分发器
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //回调接口ObservableOnSubscribe 的onSubscribe函数
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
333.png

RxJava源码解析 之订阅操作

        Observable.create(
           .....
        })
            //订阅操作
            .subscribe( 
                object :Observer<String>{
                ......
        })

------Observable.subscribe(Observer)---------
public final void subscribe(Observer<? super T> observer) {
           ......
          //Observable的抽象函数,具体实现由各操作符对象实现,比如 ObservableCreate
            subscribeActual(observer); 
        } catch (NullPointerException e) { // NOPMD
           ......
        } catch (Throwable e) {
           .......
        }
    }

//ObservableCreate的实现 subscribeActual,observer是自定义的观察者
 protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //调用观察者的onSubscribe回调函数
        observer.onSubscribe(parent);

        try {
          //source 是自定义的ObservableOnSubscribe,把分发器传进入,用来发射数据
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

//被观察者
        Observable.create(
            //自定义source
            object :ObservableOnSubscribe<String>{
                override fun subscribe(emitter: ObservableEmitter<String>) {
                  //在此处利用发射器,发射数据,回调给观察者
                   emitter.onNext("A")
                }
        })

//分发器 
  static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
      //observer 就是自定义的观察者对象,通过分发器来回调各状态函数
        final Observer<? super T> observer;
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
          ......
            if (!isDisposed()) {
                observer.onNext(t); //接受数据
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (!isDisposed()) {
                try {
                    observer.onError(t);//数据报错
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();//执行结束
                } finally {
                    dispose();
                }
            }
        }

       .....
    }

2222.png
5555.png

map分发事件 结构流程图

分发事件装包裹,回调数据拆包裹,u型结构


6666.png
8888.png
999.png
777.png
888.png

观察者设计模式

传统观察者设计模式

一个被观察者,多个观察者,是一对多的关系,并且需要容器管理观察者进行增删,只有被观察者发出改变通知,遍历observable里面的容器,依次发送通知,观察者才能观察到更新变化。耦合度比较高

rxjava的观察者设计模式(发布订阅模式)

多个被观察者(create,map,flatemap等),一个观察者(observer),并且需要起点和终点在订阅(subscribe)后,才会发出通知,终点(观察者)才能观察到。分发事件是会拿到发射器,通过发射器(CreateEmitter)关联到开发者自定义的observer(观察者),发射器调用自定义的observer接口的回调函数onSubscribe,onNext,onError,onComplete。

相关文章

  • RxJava

    使用RxJava:添加依赖: 走进RxJava:RxJava实质上就是一个异步操作库。API介绍和原理解析:1.扩...

  • MVP+Retrofit+Rxjava在项目中实战解析 

    文章目标 MVP在android中的原理解析 MVP+Retrofit+Rxjava在项目中实战解析 架构经验分享...

  • Android架构

    MVP+Retrofit+Rxjava在项目中实战解析 文章目标 MVP在android中的原理解析 MVP+Re...

  • 学习清单

    HTTP原理解析 RxJava使用和原理,并应用到BaseLib库,还有Dragger MVVM 多线程和线程池使...

  • RxJava源码解析(二)

    前言 本篇主要解析RxJava的线程切换的原理实现 subscribeOn 首先, 我们先看下subscribeO...

  • Rxjava原理解析

    先看RxJava的简单使用及解析: 以上是Rxjava的一个简单示例,第一步通过Single.just()发送一个...

  • RxJava原理解析

    rxJava的思维 响应式编程,卡片式编程,流式编程,有一个起点和一个终点,起点开始流向我们的“事件”, 把事件流...

  • RxJava原理解析一

    初学RxJava,对其许多的API颇感神奇,所以RxJava的原理充满了兴趣。正好最近教父大头鬼也出了一篇RxJa...

  • RxJava AutoDispose原理解析

    版权声明:本文为博主原创文章,未经博主允许不得转载https://blog.csdn.net/wsygyb/art...

  • RxJava的原理解析

    前言 RxJava的核心:订阅流程、线程切换。直接看用法: 首先我们看代码知道需要先Create然后继续调用下去,...

网友评论

      本文标题:RxJava原理解析

      本文链接:https://www.haomeiwen.com/subject/jrwzmrtx.html