美文网首页Android开发教程
Android开发——RxJava原理解析

Android开发——RxJava原理解析

作者: 蜗牛是不是牛 | 来源:发表于2022-01-05 17:34 被阅读0次

    一、RXjava介绍

    首先看一下Rxjava这个名字,其中java代表java语言,而Rx是什么意思呢?Rx是Reactive Extensions的简写,翻译过来就是,响应式拓展。所以Rxjava的名字的含义就是,对java语言的拓展,让其可以实现对数据的响应式编程。

    那么响应的是什么呢?响应的是上游数据的变化。常规用法是,对数据源进行监听,然后做出响应。

    RxJava的整体结构是一条链,其中有这三个角色。

    1. 链的上游:生产者 Observable
    2. 链的下游:观察者 Observer
    3. 链的中间:各个中介节点,既是下游的Observable,又是上游的Observer

    二、Rxjava基本使用

    Single.just("hfhuaizhi").subscribe(object : SingleObserver<String> {
        override fun onSubscribe(d: Disposable) {
            Log.e(TAG, "onSubscribe")
        }
    
        override fun onSuccess(t: String) {
            Log.e(TAG, "onSuccess:$t")
        }
    
        override fun onError(e: Throwable) {
            Log.e(TAG, "onError:$e")
        }
    
    })
    
    

    上面这段代码是对Rxjava简单的使用,其中

    • Single 发出单个数据的被观察者Observable,只发送一次,只有Success和Error两种状态,没有next,在Rxjava2中新增
    • just 被观察者生产的数据,参数类型是一个泛型,这里传进去的是一个String
    • subscribe 观察者Observer,这里声明的是SingleObserver,用来对Single中产生的数据进行响应
    • SingleObserver
      • onSubscribe 订阅成功后就会回调,一般会在此方法中进行一些初始化操作。其参数类型是Disposable,可以通过调用d.dispose() 取消对Observable的监听,并让其停止发送消息。
      • onSuccess 接收数据成功后就会回调,只会回调一次,其参数类型和Observable中just方法传入的数据类型一致,这里是String类型
      • onError 发生错误时回调,参数是Throwable,包含错误信息。

    运行效果

    2021-12-18 13:54:12.450 29223-29223/com.hfhuaizhi.rxjavatest E/hftest: onSubscribe
    2021-12-18 13:54:12.451 29223-29223/com.hfhuaizhi.rxjavatest E/hftest: onSuccess:hfhuaizhi
    
    

    可以看到首先onSubscribe被调用,表明注册了观察者。然后接收数据成功,打印出'hfhuaizhi'。 到这里我们就了解了Rxjava最基本的用法,接下来分析一下函数的内部做了什么。

    三、Rxjava原理解析

    1. just方法分析

    public static <@NonNull T> Single<T> just(T item) {
        Objects.requireNonNull(item, "item is null");
        return RxJavaPlugins.onAssembly(new SingleJust<>(item));
    }
    
    
    1. 对方法参数进行判空
    2. 调用RxJavaPlugins.onAssembly方法,其参数是一个SingleJust,构造方法传入了item
      • 其中onAssembly方法内部对传入的参数进行一些处理,然后返回原参数类型,所以接下来分析的过程中会忽略此方法,可以简单认为just方法直接返回了一个SingleJust实例。
    // onSingleAssembly 参数默认是空的,所以这个方法原样返回了source,当设置onSingleAssembly后,
    // 会先对source进行处理后再返回
    public static <@NonNull T> Single<T> onAssembly(@NonNull Single<T> source) {
        Function<? super Single, ? extends Single> f = onSingleAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
    
    
    final T value;
    public SingleJust(T value) {
        this.value = value;
    }
    
    

    SingleJust将构造方法传入的item保存在value字段中。 由上述分析可知,Single.just方法会返回一个SingleJust实例,所以在我们链式调用中的subscribe方法,实际上调用的是SingleJust的subscribe方法

    public final void subscribe(@NonNull SingleObserver<? super T> observer) {
        // 1. 判空
        Objects.requireNonNull(observer, "observer is null");
        // 2. 对参数中的observer进行处理后又返回observer
        observer = RxJavaPlugins.onSubscribe(this, observer);
        // 3. 对Observer进行判空
        Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
    
        try {
            // 4. 调用真实注册方法
            subscribeActual(observer);
        } catch (NullPointerException ex) {
            throw ex;
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            NullPointerException npe = new NullPointerException("subscribeActual failed");
            npe.initCause(ex);
            throw npe;
        }
    }
    
    

    subscrib方法中主要做了注释中所写的四步操作,其中重要的是第4步subscribeActual,这里才是真正做事的,之前都是数据的校验,因为我们这个类的实例是SingleJust,所以接下来看一下SingleJust的subscribeActual方法做了什么。

    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposable.disposed());
        observer.onSuccess(value);
    }
    
    

    可以看到内容十分简单

    1. 调用observer的onSubscribe方法,表明订阅成功,参数是Disposable.disposed()返回值
    2. 调用observer的onSuccess方法,表明数据回调成功,参数是value,而value就是通过Single的just函数传进来的,通过构造方法传入SingleJust实例中,因此,这一步的操作就是简单地将构造方法中传入的值,通过observer的onSuccess方法回调给我们定义的观察者SingleObserver。

    这样就完事了,因为之前说过Single.just是最简单的RxJava使用方式,先调用onSubscribe表明注册监听,然后又紧接着通过onSuccess回调数据,所以不会有失败的情况。

    2. map方法分析

    map是Rxjava中比较常用的用法,用来实现数据类型的转换 比如像这样,我们发送的数据类型是Integer,接收的数据类型是String,这样当然是无法直接接收的,所以需要进行一下转换,将上游数据发送的Integer转换为String,然后由下游接收。

    private fun testMap(view: View) {
        Single.just(123).map(object : Function<Int, String> {
            override fun apply(t: Int): String {
                return "$t"
            }
        }).subscribe(object : SingleObserver<String> {
            override fun onSubscribe(d: Disposable) {
                Log.e(TAG, "onSubscribe")
            }
    
            override fun onSuccess(t: String) {
                Log.e(TAG, "onSuccess:$t")
            }
    
            override fun onError(e: Throwable) {
                Log.e(TAG, "onError:$e")
            }
    
        })
    }
    
    

    打印结果

    021-12-18 22:32:02.958 5210-5210/com.hfhuaizhi.rxjavatest E/hftest: onSubscribe
    2021-12-18 22:32:02.958 5210-5210/com.hfhuaizhi.rxjavatest E/hftest: onSuccess:123
    
    

    just方法传入的123是Integer类型,onSuccess处接收的数据是String类型,通过map进行转换。其中map方法传入的参数是一个Function<T,E>,此类有两个泛型参数,T代表输入数据类型,E表示输出数据类型,这里的输入数据类型是Integer,返回类型是String,apply方法中返回了String类型的输出数据。

    map(object : Function<Int, String> {
            override fun apply(t: Int): String {
                return "$t"
            }
        })
    
    
    public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) {
        Objects.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper));
    }
    
    

    进入map方法内部,此方法判空后,返回了SingleMap实例,其构造方法传入了当前SingleJust实例和mapper转换参数,并将其分别保存在source和mapper成员变量中。

    public final class SingleMap<T, R> extends Single<R> {
        final SingleSource<? extends T> source;
        final Function<? super T, ? extends R> mapper;
        public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
            this.source = source;
            this.mapper = mapper;
        }
        ...
    }
    
    

    好,map方法暂且看到这儿,我们接下来继续分析链式调用中的subscribe方法。
    subccribe传入了一个SingleObserver,和之前分析的类似,但是区别在于调用的不再是SingleJust的subscribe方法,而是map方法返回的SingleMap的subscribe方法,由之前的分析可知,此方法调用会在数据的判空后调用到SingleMap的subscribeActual方法。 由之前的分析可知,链式调用到subscribe方法会调用到SingleMap的subscribeActual方法

    public final class SingleMap<T, R> extends Single<R> {
        final SingleSource<? extends T> source;
        final Function<? super T, ? extends R> mapper;
        public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
            this.source = source;
            this.mapper = mapper;
        }
        @Override
        protected void subscribeActual(final SingleObserver<? super R> t) {
            source.subscribe(new MapSingleObserver<T, R>(t, mapper));
        }
        ...
    }
    
    

    由之前的分析可知,source就是map的上游SingleJust, 所以在single的实际subscribe方法中会调用其上游的subscribe方法,并传入了一个封装好的新的MapSingleObserver,MapSingleObserver的构造方法中第一个参数t,是下游观察者,在我们这块代码中就是链式调用的时候传入的SingleObserver。第二个参数是我们在map方法中传入的数据类型转换转换器mapper。 由之前的分析可知,当source,也就是SingleJust的subscribe方法调用后,会依次调用其参数传入的Observer的onSubscribe方法和onSuccess方法,此时参数传入的Observer就是上面代码块里的MapSingleObserver

    static final class MapSingleObserver<T, R> implements SingleObserver<T> {
        final SingleObserver<? super R> t;
        final Function<? super T, ? extends R> mapper;
    
        MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
            this.t = t;
            this.mapper = mapper;
        }
    
        @Override
        public void onSubscribe(Disposable d) {
            t.onSubscribe(d);
        }
        @Override
        public void onSuccess(T value) {
            R v;
            try {
                v = Objects.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                onError(e);
                return;
            }
            t.onSuccess(v);
        }
        @Override
        public void onError(Throwable e) {
            t.onError(e);
        }
    }
    
    

    onSubscribe方法原封不动的调用了t.onSubscribe(d);而t就是在MapSingleObserver构造方法传入的下游观察者,也就是SingleObserver实例。这里直接调用了其onSubscribe方法表示注册监听成功。 onSuccess方法中调用了mapper.apply(value),这个mapper就是我们在map方法中传入的转换函数,这里输入了Integer数据类型,得到了String类型输出,最后调用t.onSuccess回调转换后的数据,也就是调用我们subscribe方法传入的实例的onSuccess。

    map方法总结

    map主要做的就是一个承上启下,链式调用中subscribe方法调用后,会依次向上调用中间节点的subscribe方法,直到调用到最初始的没有上游的Observable,最上层的Observable会在其subscribeActual方法中调用其下游观察者的onSubscribe和onSuccess/onError,将数据一层一层传下去,数据传递的过程中,中间节点可能会对数据进行处理后再接着向下传,最终传递到最底层的Observer,整个流程如图所示

    图片含义解释

    最上游的Single就是我们调用Single.just产生的SingleJust,其subscribe方法中会调用onSubscribe()和onSuccess(),向下方观察者传递Integer类型的结果,中间观察者SingleObserver由map方法创建,其接收到上游传递下来的数据后,将其转换为String,然后传递给下方观察者,最后下游收到的数据结果就是String类型。

    3. 线程切换

    线程切换可以说是RxJava中最常用的操作了,甚至很多人选择RxJava,就是因为RxJava可以和方便地实现线程切换。 线程切换主要用到这两个函数:

    • subscribeOn
    • observerOn
    private fun testSubscribe(view: View) {
        Single.just("hfhuaizhi").subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread()).subscribe(object : SingleObserver<String> {
                override fun onSubscribe(d: Disposable) {
                    Log.e(TAG, "onSubscribe")
                }
    
                override fun onSuccess(t: String) {
                    Log.e(TAG, "onSuccess:$t")
                }
    
                override fun onError(e: Throwable) {
                    Log.e(TAG, "onError:$e")
                }
    
            })
    }
    
    

    这样写,可以实现subscribe调用之前的消息发送在io线程,observerOn调用之后的Observer回调在android主线程,其中AndroidSchedulers类不在Rxjava标准库中,需要额外引入RxAndroid依赖。

    subscribeOn

    public final Single<T> subscribeOn(@NonNull Scheduler scheduler) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new SingleSubscribeOn<>(this, scheduler));
    }
    
    

    subscribeOn方法返回一个SingleSubscribeOn实例,其构造方法中传入了this(上游被观察者)和scheduler(线程调度器,我们传入的是Schedulers.io())。 由之前的分析可知,链式调用中最终subscribe方法调用的时候,会由下向上依次调用各个节点的subscribe方法,这里我们看一下SingleSubscribeOn这一线程切换的节点的subscribe方法做了什么,因为SingleSubscribeOn和SingleJust一样继承自Single,其subscribe方法也是调用到了subscribeActual方法

    public final class SingleSubscribeOn<T> extends Single<T> {
        final SingleSource<? extends T> source;
        final Scheduler scheduler;
        public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
            // 上层被观察对象
            this.source = source;
            // 线程类型
            this.scheduler = scheduler;
        }
        @Override
        protected void subscribeActual(final SingleObserver<? super T> observer) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source);
            observer.onSubscribe(parent);
            Disposable f = scheduler.scheduleDirect(parent);
            parent.task.replace(f);
        }
    }
    
    
    1. 将observer(下游观察者)和source(上游被观察者)封装进一个新的观察者SubscribeOnObserver
    2. 调用下游观察者的onSubscribe方法
    3. 调用scheduler的scheduleDirect方法,参数传入刚封装的新的观察者SubscribeOnObserver实例
    4. 将parent的task变量替换为由传入的scheduler生成的Disposable final SequentialDisposable task;
      • 这个task的参数类型是Disposable,之前有提到过,在Observer的onSubscribe方法中会传入一个Disposable,调用Disposable的dispose()方法后,会取消注册并让上游停止发送任务,这个Disposable继承自AtomicReference 实现了Disposable接口,AtomicReference是java里的原子引用类型,可以线程安全地对对象引用进行修改,类似地还有AtomicInteger等,所以这里的parent.task.replace(f)就是将parent中的task这个disposable线程安全地替换为scheduler创建地这个新的Disposable,从而可以实现任务的取消。
    static final class SubscribeOnObserver<T>
    extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {
    ...
    }
    
    

    接下来分析一下第3步主要做了什么

    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    
    

    scheduleDirect方法中传入了一个Runnable类型参数,因为SubscribeOnObserver类实现了Runnable接口,所以可以被当作Runnable传进去。

    因为我们传入的scheduler参数是由Schedulers.io()方法创建的,而此方法默认会返回一个IoScheduler

    这个Scheduler的注释写着,会创建并缓存一个线程池。所以我们知道了scheduleDirect方法会将传入的Runnable放入一个线程池里执行,从而实现任务的异步执行,所以接下来我们去看一下SubscribeOnObserver的run方法里做了什么。

    static final class SubscribeOnObserver<T>
    extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {
        private static final long serialVersionUID = 7000911171163930287L;
        final SingleObserver<? super T> downstream;
        final SequentialDisposable task;
        final SingleSource<? extends T> source;
        SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
            this.downstream = actual;
            this.source = source;
            this.task = new SequentialDisposable();
        }
        @Override
        public void run() {
            source.subscribe(this);
        }
        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    
        @Override
        public void onSuccess(T value) {
            downstream.onSuccess(value);
        }
        ...
    }
    
    

    SubscribeOnObserver的run方法中会调用source.subscribe,并传入自己(自己也是一个Observer),由之前分析我们知道source就是我们监听的上游,这里调用了SingleJust的subscribe,由之前的分析我们知道subscribe会调用到subscribeActual,这里做任务的真正执行,因此就这样实现了让上游任务在异步线程中的执行,上游任务执行过后,会将数据向下传递,传递到当前SubscribeOnObserver节点的时候会调用其onSuccess方法,其调用downstream,也就是下游观察者的onSuccess方法,将数据继续向下传递,此时数据传递的线程也是run方法执行的线程,因为此时并没有再次对线程进行切换。

    observerOn

    public final Single<T> observeOn(@NonNull Scheduler scheduler) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new SingleObserveOn<>(this, scheduler));
    }
    
    

    observeOn函数返回了一个SingleObserveOn,也是需要传入this(上游被观察者),和scheduler(线程调度器类型,此时我们传入的是AndroidSchedulers.mainThread()),由之前分析可知我们此时应该去看SingleObserveOn的subscribeActual方法调用

    protected void subscribeActual(final SingleObserver<? super T> observer) {
        source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));
    }
    
    

    此方法中调用了其上游的subscribe方法,和之前分析的数据流转过程一致,需要依次调用到最根节点的subscribe,参数传入的是封装后的观察者ObserveOnSingleObserver,其构造方法中传入了下游观察者和线程调度类型,接下来我们看一下当ObserveOnSingleObserver收到上游传下来的数据后进行了怎样的操作。

    static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {
        private static final long serialVersionUID = 3528003840217436037L;
        final SingleObserver<? super T> downstream;
        final Scheduler scheduler;
        T value;
        Throwable error;
        ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
            this.downstream = actual;
            this.scheduler = scheduler;
        }
        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.setOnce(this, d)) {
                downstream.onSubscribe(this);
            }
        }
        @Override
        public void onSuccess(T value) {
            this.value = value;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }
        @Override
        public void run() {
            Throwable ex = error;
            if (ex != null) {
                downstream.onError(ex);
            } else {
                downstream.onSuccess(value);
            }
        }
        ...
    }
    
    

    可以看到在onSuccess方法中调用了scheduler.scheduleDirect(this),并穿了个this,而且自身实现了runnable接口,由之前分析可知,run方法会在某一时刻被调用。传入的scheduler是AndroidSchedulers.mainThread()其返回的是HandlerScheduler,其内部封装了个Handler,将Runnable 弄到主线程去执行。最终结果就是ObserveOnSingleObserver的run方法在主线程中被调用, 其run方法调用了下游观察者downstream的onSuccess/onError。 由此分析可知,observerOn方法控制此节点后的被观察者收到数据时所在的线程,无法影响其上游节点。

    最后

    您的点赞收藏就是对我最大的鼓励! 欢迎关注我,分享Android干货,交流Android技术。 对文章有何见解,或者有何技术问题,欢迎在评论区一起留言讨论!最后给大家分享一些Android相关的视频教程,感兴趣的朋友可以去看看。

    相关文章

      网友评论

        本文标题:Android开发——RxJava原理解析

        本文链接:https://www.haomeiwen.com/subject/gjkkcrtx.html