美文网首页
RxJava原理解析

RxJava原理解析

作者: 付小影子 | 来源:发表于2022-06-15 10:42 被阅读0次

    rxJava的思维

    响应式编程,卡片式编程,流式编程,有一个起点和一个终点,起点开始流向我们的“事件”, 把事件流向终点,只不过在流向的过程中,可以增加拦截,拦截是可以对事件进行改变,终点只关心他的上一个拦截返回信息。

    rxJava 观察者设计模式或者发布订阅模式

    1.创建被观察者 Observable
    2.创建观察者Observer
    3.观察者订阅被观察者 subscribe()
    RxBinding 可以防抖动,flateMap避免网络嵌套,doOnnext() 多api接口链式调用

    Rxjava 全局Hook 点

     public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            ObjectHelper.requireNonNull(source, "source is null");
             //rxjava预留给开发者的hook 钩子
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    
    微信图片_20220615093821.png
    微信图片_20220615094024.png
    1111.png

    rxjava 源码解析 之观察者 Observer

    Observer就是一个接口,预留四个函数,回调不同状态

    public interface Observer<T> {
      //开始订阅 回调函数
        void onSubscribe(@NonNull Disposable d);
    //回调结果
        void onNext(@NonNull T t);
    //出现错误
        void onError(@NonNull Throwable e);
    //订阅结束
        void onComplete();
    }
    

    rxjava源码解析 之被观察者 Observable

    // ObservableOnSubscribe
    public interface ObservableOnSubscribe<T> {
        //ObservableEmitter 分发器
        void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
    }
    
     //被观察者
            Observable.create(
                //自定义source
                object :ObservableOnSubscribe<String>{
                    override fun subscribe(emitter: ObservableEmitter<String>) {
                       emitter.onNext("A")
                    }
            })
    
    
     public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            //判空
            ObjectHelper.requireNonNull(source, "source is null");    
            //RxJavaPlugins.onAssembly 全局封装,预留hook点,如果开发者自定义了function,那么会在所有操作符执行前,先执行开发者自定义的函数。
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    
    //创建真正的操作符对象,把开发者自定义的ObservableOnSubscribe 作为参数构造
       new ObservableCreate<T>(source)
    
    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            //分发器
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            //回调接口ObservableOnSubscribe 的onSubscribe函数
            observer.onSubscribe(parent);
    
            try {
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    
    333.png

    RxJava源码解析 之订阅操作

            Observable.create(
               .....
            })
                //订阅操作
                .subscribe( 
                    object :Observer<String>{
                    ......
            })
    
    ------Observable.subscribe(Observer)---------
    public final void subscribe(Observer<? super T> observer) {
               ......
              //Observable的抽象函数,具体实现由各操作符对象实现,比如 ObservableCreate
                subscribeActual(observer); 
            } catch (NullPointerException e) { // NOPMD
               ......
            } catch (Throwable e) {
               .......
            }
        }
    
    //ObservableCreate的实现 subscribeActual,observer是自定义的观察者
     protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            //调用观察者的onSubscribe回调函数
            observer.onSubscribe(parent);
    
            try {
              //source 是自定义的ObservableOnSubscribe,把分发器传进入,用来发射数据
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    
    //被观察者
            Observable.create(
                //自定义source
                object :ObservableOnSubscribe<String>{
                    override fun subscribe(emitter: ObservableEmitter<String>) {
                      //在此处利用发射器,发射数据,回调给观察者
                       emitter.onNext("A")
                    }
            })
    
    //分发器 
      static final class CreateEmitter<T>
        extends AtomicReference<Disposable>
        implements ObservableEmitter<T>, Disposable {
          //observer 就是自定义的观察者对象,通过分发器来回调各状态函数
            final Observer<? super T> observer;
            CreateEmitter(Observer<? super T> observer) {
                this.observer = observer;
            }
    
            @Override
            public void onNext(T t) {
              ......
                if (!isDisposed()) {
                    observer.onNext(t); //接受数据
                }
            }
    
            @Override
            public void onError(Throwable t) {
                if (!tryOnError(t)) {
                    RxJavaPlugins.onError(t);
                }
            }
    
            @Override
            public boolean tryOnError(Throwable t) {
                if (!isDisposed()) {
                    try {
                        observer.onError(t);//数据报错
                    } finally {
                        dispose();
                    }
                    return true;
                }
                return false;
            }
    
            @Override
            public void onComplete() {
                if (!isDisposed()) {
                    try {
                        observer.onComplete();//执行结束
                    } finally {
                        dispose();
                    }
                }
            }
    
           .....
        }
    
    
    2222.png
    5555.png

    map分发事件 结构流程图

    分发事件装包裹,回调数据拆包裹,u型结构


    6666.png
    8888.png
    999.png
    777.png
    888.png

    观察者设计模式

    传统观察者设计模式

    一个被观察者,多个观察者,是一对多的关系,并且需要容器管理观察者进行增删,只有被观察者发出改变通知,遍历observable里面的容器,依次发送通知,观察者才能观察到更新变化。耦合度比较高

    rxjava的观察者设计模式(发布订阅模式)

    多个被观察者(create,map,flatemap等),一个观察者(observer),并且需要起点和终点在订阅(subscribe)后,才会发出通知,终点(观察者)才能观察到。分发事件是会拿到发射器,通过发射器(CreateEmitter)关联到开发者自定义的observer(观察者),发射器调用自定义的observer接口的回调函数onSubscribe,onNext,onError,onComplete。

    相关文章

      网友评论

          本文标题:RxJava原理解析

          本文链接:https://www.haomeiwen.com/subject/jrwzmrtx.html