美文网首页
响应式编程与RxJava

响应式编程与RxJava

作者: 师傅_有妖气 | 来源:发表于2019-07-30 18:04 被阅读0次

    1. 响应式编程

    1.1 响应式编程概念

    • 响应式编程是一种通过异步和数据流来构建事物关系的编程模型。
    • 事物的关系 也可以说成是 业务逻辑 ,是响应式编程的核心理念。
    • 数据流异步 是实现这个核心理念的关键。异步和数据流都是为了正确的构建事物的关系而存在的。

    1.2 响应式编程demo

    int a=1;
    int b=a+1;
    System.out.print(“b=”+b)    //  b=2
    a=10;
    System.out.print(“b=”+b)    //  b=2
    

    上面是一段很常见的代码,简单的赋值打印语句,但是这种代码有一个缺陷,那就是如果我们想表达的并不是一个赋值动作,而是b和a之间的关系,即无论a如何变化,b永远比a大1。那么可以想见,我们就需要花额外的精力去构建和维护一个b和a的关系。
    而响应式编程的想法正是企图用某种操作符帮助你构建这种关系。
    它的思想完全可以用下面的代码片段来表达:

    int a=1;
    int b <= a+1;   // <= 符号只是表示a和b之间关系的操作符
    System.out.print(“b=”+b)    //  b=2
    a=10;
    System.out.print(“b=”+b)    //  b=11
    

    响应式编程的思想,它希望有某种方式能够构建关系,而不是执行某种赋值命令。

    应用初始化.png

    比如在收单应用初始化逻辑中,先完成SDK初始化,数据库初始化,签到,才会跳转到交易菜单界面。

    在响应式编程中,这一流程可以这样解读


    应用初始化2.png

    在初始化过程中,SDK初始化,数据库初始化,签到这些业务完成之后才会去安排页面跳转的操作,那么这些上游的业务在自己工作完成之后,就需要通知下游,通知下游的方式有很多种,响应式编程的方式就是通过数据(事件)流。

    每一个业务完成后,都会有一条数据(一个事件)流向下游,下游的业务收到这条数据(这个事件),才会开始自己的工作。

    我们能发现SDK初始化,数据库初始化,签到这三个业务本身相互独立,应当在不同的线程环境中执行,以保证他们不会相互阻塞。而假如没有异步编程,我们可能只能在一个线程中顺序调用这三个相对耗时较多的业务,最终再去做页面跳转,这样做不仅没有忠实反映业务本来的关系,而且会让你的程序“反应”更慢。

    总的来说,异步和数据流都是为了正确的构建事务的关系而存在的。只不过,异步是为了区分出无关的事务,而数据流(事件流)是为了联系起有关的事务。

    2. RxJava

    Rx是响应式拓展,即支持响应式编程的一种拓展,用来处理事件和异步任务。

    2.1 RxJava的优点

    简洁。而且当业务越繁琐越复杂时这一点就越显出优势——它能够保持简洁。

    2.2 RxJava的基本概念

    我们都知道监听者模式,订阅模式这些概念。而Observable和Subscribers的英文意思就是如此。我们大概也知道差不多和监听者模式差不多。

    • Observable事件源,被观察者。
    • Observer / Subcriblers 观察者,事件订阅者
    • subscribe() 方法,绑定Observable与Subcribler或者Observabler
    • 事件 (包括 onNext,onComplete,onError 等事件)

    以第一章的初始化应用为例:

    Observable obserInitSDK=Observable.create((context)->{initSDK(context)}).subscribeOn(Schedulers.newThread())
    
    Observable obserInitDB=Observable.create((context)->{initDatabase(context)}).subscribeOn(Schedulers.newThread())
    
    Observable obserLogin=Observable.create((context)->{Login(context)})
                                .subscribeOn(Schedulers.newThread())
    // 合并多个Observables的发射物                           
    Observable observable = Observable.merge(obserInitSDK,obserInitDB,obserLogin)
    // 订阅被观察者
    observable.subscribe(()->{startActivity()})
    

    当initSDK,initDB,Login都是耗时较长的操作时,遵照业务关系编写响应式代码可以极大的提高程序的执行效率,降低阻塞。
    从上面代码中,可以看出,响应式编程有如下优点

    • 在业务层面实现代码逻辑分离,方便后期维护和拓展
    • 极大提高程序响应速度,充分发掘CPU的能力
    • 帮助开发者提高代码的抽象能力和充分理解业务逻辑
    • Rx丰富的操作符会帮助我们极大的简化代码逻辑

    2.3 操作符决策树

    RxJava的几种主要操作符:

    • 创建操作:直接创建一个Observable
    • 组合操作:组合多个Observable
    • 变换操作:对Observable发射的数据执行变换操作
    • 过滤操作:从Observable发射的数据中取特定的值
    • 条件/布尔/过滤操作:转发Observable的部分值
    • 算术/聚合操作:对Observable发射的数据序列求值

    创建操作

    用于创建Observable的操作符

    • Create — 通过调用观察者的方法从头创建一个Observable
    • Defer — 在观察者订阅之前不创建这个Observable,为每一个观察者创建一个新的Observable
    • Empty/Never/Throw — 创建行为受限的特殊Observable
    • From — 将其它的对象或数据结构转换为Observable
    • Interval — 创建一个定时发射整数序列的Observable
    • Just — 将对象或者对象集合转换为一个会发射这些对象的Observable
    • Range — 创建发射指定范围的整数序列的Observable
    • Repeat — 创建重复发射特定的数据或数据序列的Observable
    • Start — 创建发射一个函数的返回值的Observable
    • Timer — 创建在一个指定的延迟之后发射单个数据的Observable

    变换操作

    用于对Observable发射的数据进行变换

    • Buffer — 缓存,可以简单的理解为缓存,它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
    • FlatMap — 扁平映射,将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,可以认为是一个将嵌套的数据结构展开的过程。
    • GroupBy — 分组,将原来的Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据
    • Map — 映射,通过对序列的每一项都应用一个函数变换Observable发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项
    • Scan — 扫描,对Observable发射的每一项数据应用一个函数,然后按顺序依次发射这些值
    • Window — 窗口,定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项。类似于Buffer,但Buffer发射的是数据,Window发射的是Observable,每一个Observable发射原始Observable的数据的一个子集

    过滤操作

    用于从Observable发射的数据中进行选择

    • Debounce — 只有在空闲了一段时间后才发射数据,通俗的说,就是如果一段时间没有操作,就执行一次操作
    • Distinct — 去重,过滤掉重复数据项
    • ElementAt — 取值,取特定位置的数据项
    • Filter — 过滤,过滤掉没有通过谓词测试的数据项,只发射通过测试的
    • First — 首项,只发射满足条件的第一条数据
    • IgnoreElements — 忽略所有的数据,只保留终止通知(onError或onCompleted)
    • Last — 末项,只发射最后一条数据
    • Sample — 取样,定期发射最新的数据,等于是数据抽样,有的实现里叫ThrottleFirst
    • Skip — 跳过前面的若干项数据
    • SkipLast — 跳过后面的若干项数据
    • Take — 只保留前面的若干项数据
    • TakeLast — 只保留后面的若干项数据

    组合操作

    用于将多个Observable组合成一个单一的Observable

    • And/Then/When — 通过模式(And条件)和计划(Then次序)组合两个或多个Observable发射的数据集
    • CombineLatest — 当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果
    • Join — 无论何时,如果一个Observable发射了一个数据项,只要在另一个Observable发射的数据项定义的时间窗口内,就将两个Observable发射的数据合并发射
    • Merge — 将两个Observable发射的数据组合并成一个
    • StartWith — 在发射原来的Observable的数据序列之前,先发射一个指定的数据序列或数据项
    • Switch — 将一个发射Observable序列的Observable转换为这样一个Observable:它逐个发射那些Observable最近发射的数据
    • Zip — 打包,使用一个指定的函数将多个Observable发射的数据组合在一起,然后将这个函数的结果作为单项数据发射

    错误处理

    用于从错误通知中恢复

    • Catch — 捕获,继续序列操作,将错误替换为正常的数据,从onError通知中恢复
    • Retry — 重试,如果Observable发射了一个错误通知,重新订阅它,期待它正常终止

    辅助操作

    用于处理Observable的操作符

    • Delay — 延迟一段时间发射结果数据
    • Do — 注册一个动作占用一些Observable的生命周期事件,相当于Mock某个操作
    • Materialize/Dematerialize — 将发射的数据和通知都当做数据发射,或者反过来
    • ObserveOn — 指定观察者观察Observable的调度程序(工作线程)
    • Serialize — 强制Observable按次序发射数据并且功能是有效的
    • Subscribe — 收到Observable发射的数据和通知后执行的操作
    • SubscribeOn — 指定Observable应该在哪个调度程序上执行
    • TimeInterval — 将一个Observable转换为发射两个数据之间所耗费时间的Observable
    • Timeout — 添加超时机制,如果过了指定的一段时间没有发射数据,就发射一个错误通知
    • Timestamp — 给Observable发射的每个数据项添加一个时间戳
    • Using — 创建一个只在Observable的生命周期内存在的一次性资源

    条件和布尔操作

    用于单个或多个数据项,也可用于Observable

    • All — 判断Observable发射的所有的数据项是否都满足某个条件
    • Amb — 给定多个Observable,只让第一个发射数据的Observable发射全部数据
    • Contains — 判断Observable是否会发射一个指定的数据项
    • DefaultIfEmpty — 发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据
    • SequenceEqual — 判断两个Observable是否按相同的数据序列
    • SkipUntil — 丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据
    • SkipWhile — 丢弃原始Observable发射的数据,直到一个特定的条件为假,然后发射原始Observable剩余的数据
    • TakeUntil — 发射来自原始Observable的数据,直到第二个Observable发射了一个数据或一个通知
    • TakeWhile — 发射原始Observable的数据,直到一个特定的条件为真,然后跳过剩余的数据

    算术和聚合操作

    用于整个数据序列

    • Average — 计算Observable发射的数据序列的平均值,然后发射这个结果
    • Concat — 不交错的连接多个Observable的数据
    • Count — 计算Observable发射的数据个数,然后发射这个结果
    • Max — 计算并发射数据序列的最大值
    • Min — 计算并发射数据序列的最小值
    • Reduce — 按顺序对数据序列的每一个应用某个函数,然后返回这个值
    • Sum — 计算并发射数据序列的和

    另外还有连接操作转换操作,可以通过文档查看使用方法。

    2.4 RxJava 基础框架解析

    • 先从比较常用的create方法看
    public static Completable create(CompletableOnSubscribe source) {
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new CompletableCreate(source));
        }
    

    在 create 方法中,其实很简单,只是对 source 进行判空处理,并将 source 用 ObservableCreate 包装起来,并返回回去。下面让我们一起来看一下 ObservableCreate方法

    public final class CompletableCreate extends Completable {
    
        final CompletableOnSubscribe source;
    
        public CompletableCreate(CompletableOnSubscribe source) {
            this.source = source;
        }
    
        // daizy -- 持有了上游 source 的引用,并重写 subscribeActual 方法
        @Override
        protected void subscribeActual(CompletableObserver observer) {
            Emitter parent = new Emitter(observer);
            observer.onSubscribe(parent);
    
            try {
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    

    ObservableCreate 也很简单,它是 Observable 的子类,持有了上游 source 的引用,并重写 subscribeActual 方法,这个方法要结合订阅Subscribe源码看。

    @SchedulerSupport(SchedulerSupport.NONE)
        @Override
        public final void subscribe(CompletableObserver observer) {
            // 检查 observer 是否为 null,为 null 抛出异常
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                // RxJavaPlugins 插件的,暂时不管
                observer = RxJavaPlugins.onSubscribe(this, observer);
                // 检查 observer 是否为 null,为 null 抛出异常
                ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null CompletableObserver. Please check the handler provided to RxJavaPlugins.setOnCompletableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
    
                subscribeActual(observer);
            } catch (NullPointerException ex) { // NOPMD
                throw ex;
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                RxJavaPlugins.onError(ex);
                throw toNpe(ex);
            }
        }
    

    subscribe 方法也比较简单,大概可以分为以下两步:

    • 第一步,对observer 进行判空,为空则抛出异常
    • 第二步,调用 subscribeActual 方法,在Observable类 中,subscribeActual 是一个抽象方法,要关注的是其实现类的subscribeActual方法。从上面的分析,我们知道,当我们调用 Observable create(ObservableOnSubscribe source) 方法的时候,最终会返回 ObservableCreate 实例。因此,我们只需要关注 ObservableCreate 的 subscribeActual 方法。
    protected void subscribeActual(Observer<? super T> observer) {
            // CreateEmitter 是 ObservableCreate 的一个静态内部类
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            observer.onSubscribe(parent);
    
            try {
                // source 是上游 ObservableOnSubscribe 的引用
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    

    继续看ObservableCreate的subscribeActual方法,在执行observer.onSubscribe 方法的时候,会将parent对象作为方法参数暴露出去,parent即是CreateEmitter,可以通过它的dispose方法取消订阅关系。
    接着在调用source.subscribe(parent)的时候,会先调用ObservableOnSubscribe 的 subscribe 方法。
    因此,我们可以得出,调用的顺序是:
    Observable.subscrible -> Observable.subscribleActual -> Observable.subscribleActual -> observer.onSubscribe -> ObservableOnSubscribe.subscribe(emitter)
    emitter是CreateEmitter的实例,包装了observe,调用emitter的方法,就会调用observe的 onNext 、onComolete/onError方法。

    以上是RxJava基本原理,Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer,并且回调 Observer 的相应的方法。

    2.5 RxJava 线程切换

    Observable通过subscribeOn方法来指定线程

    public final Observable<T> subscribeOn(Scheduler scheduler) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
        }
    

    通过代码可以看出,先对scheduler进行判空,然后用ObservableSubscribeOn 将scheduler 包装起来,接下来研究看看ObservableSubscribeOn这个类的源码。

    final Scheduler scheduler;
    
        public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
            super(source);
            this.scheduler = scheduler;
        }
    
        @Override
        public void subscribeActual(final Observer<? super T> observer) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
    
            observer.onSubscribe(parent);
    
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
    

    首先先来看他的构造函数 ,有两个参数 source ,scheduler。

    • source 代表上游的引用,是 Observable 的一个实例
    • scheduler 调度器可以通过 Schedulers.newThread() 或者 Schedulers.io() 创建相应的实例。
      RxJava 可用的调度器大概有下面几种,根据需求选择:
    图片.png

    我们主要看下这个方法

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    

    SubscribeTask 这个类,他是 ObservableSubscribeOn 的一个非静态内部类,可以看到 其实也比较简单,他实现了 Runnable 接口,并且持有 parent 引用。

     final class SubscribeTask implements Runnable {
            private final SubscribeOnObserver<T> parent;
    
            SubscribeTask(SubscribeOnObserver<T> parent) {
                this.parent = parent;
            }
    
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }
    

    在 run 方法中,通过 source.subscribe(parent) 建立联系。因而,当我们的 SubscribeTask 的 run 方法运行在哪个线程,相应的 observer 的 subscribe 方法就运行在哪个线程。

    接下来再看看scheduleDirect的实现

    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
            final Worker w = createWorker();
    
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            DisposeTask task = new DisposeTask(decoratedRun, w);
    
            w.schedule(task, delay, unit);
    
            return task;
        }
    

    这个方法主要是将task包装成DisposeTask,然后通过Worker进行调度。再看看Worker 是在做啥。
    Scheduler我们以NewThreadScheduler为例子

    public final class NewThreadScheduler extends Scheduler {
    
        final ThreadFactory threadFactory;
    
        private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
        /**
         * daizy -- 线程池
         */
        private static final RxThreadFactory THREAD_FACTORY;
    
        /** The name of the system property for setting the thread priority for this Scheduler. */
        private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";
    
        static {
            int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
                    Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));
    
            THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
        }
    
        public NewThreadScheduler() {
            this(THREAD_FACTORY);
        }
    
        public NewThreadScheduler(ThreadFactory threadFactory) {
            this.threadFactory = threadFactory;
        }
    
    
        @NonNull
        @Override
        public Worker createWorker() {
            // 通过线程池来调度
            return new NewThreadWorker(threadFactory);
        }
    }
    

    通过代码可以看出来,Worker里头封装了线程池,所以RxJava的线程切换,也是基于线程池来处理。

    回过来看DisposeTask

    static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
    
            @NonNull
            final Runnable decoratedRun;
    
            @NonNull
            final Worker w;
    
            @Nullable
            Thread runner;
    
            DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
                this.decoratedRun = decoratedRun;
                this.w = w;
            }
    
            @Override
            public void run() {
                runner = Thread.currentThread();
                try {
                    decoratedRun.run();
                } finally {
                    dispose();
                    runner = null;
                }
            }
    
            @Override
            public void dispose() {
                if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
                    ((NewThreadWorker)w).shutdown();
                } else {
                    w.dispose();
                }
            }
    
            @Override
            public boolean isDisposed() {
                return w.isDisposed();
            }
    
            @Override
            public Runnable getWrappedRunnable() {
                return this.decoratedRun;
            }
        }
    

    DisposeTask 实现了 Disposable,Runnable ,SchedulerRunnableIntrospection 接口,Disposable 接口主要是用来取消订阅关系的 Disposable。

    从上面的分析,可以得出Observable.subscribeOn方法,控制Observable的执行线程是通过将 Observable.subscribe(Observer) 的操作放在了指定线程中,当我们调用 subcribe 的时候,它的过程是从下往上的,即下面的 Observable 调用上面的 Observable。

    用流程图描述如下:


    图片.png

    相关文章

      网友评论

          本文标题:响应式编程与RxJava

          本文链接:https://www.haomeiwen.com/subject/opoacxtx.html