美文网首页
响应式编程开源库 RxJava2——起源

响应式编程开源库 RxJava2——起源

作者: 阿扎泼柴 | 来源:发表于2019-05-09 16:46 被阅读0次

    1.RxJava来源

    从github上我们可以看到RxJava是隶属于ReactiveX(Rx)仓库,Rx是从微软的函数响应式编程库(Reactive Extensions)发展而来的, 提供了一种新的组织和协调异步事件的方式。
    官方对于该库的介绍是,

    1. An API for asynchronous programming with observable streams
    2. ReactiveX is a combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming
      ReactiveX提供可观测流的异步编程API,它的灵感来源于观察者模式、迭代器模式和函数编程这些优秀思想。
      ReactiveX作为一个通用库, 现在已经有多种语言的实现版本(都是开源的), 包含RxJava, RxCpp, RxSwift, RxKotlin, RxGroovy, RxJavaScript等具体可参照所有支持语言

    2.什么是RxJava

    RxJava 是 ReactiveX 在 Java 上的开源的实现。
    网上大多概况为它是一个实现了异步操作的库。这种概括来源于官方但是太过片面,它不仅仅异步一个功能,在使用中发现它对于数据流的各种处理非常方便。

    下面是Github上RxJava库的官方简介。

    RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

    RXJava——JVM的响应性扩展——在Java VM上使用可观察序列(即ReactiveX中提到的可观测流)编写异步和基于事件的程序的库。
    简而言之,就是在使用Java编程时,该库能使用观察者模式编写异步和基于事件的程序。

    3.关键词解析

    既然是学习就要搞清楚细节,所以对于这些平时听到的名词,还是值得我们基于极客和刨根问底的精神来细细品味的。

    观察者模式

    观察者模式(有时又被称为模型(Model)-视图(View)模式、源-收听者(Listener)模式或从属者模式)是软件设计模式的一种。在此种模式中,一个目标物件管理所有相依于它的观察者物件,并且在它本身的状态改变时主动发出通知。这通常透过呼叫各观察者所提供的方法来实现。此种模式通常被用来实现事件处理系统。

    迭代器模式

    提供一种方法顺序访问一个聚合对象中各个元素,而又不需暴露该对象的内部表示。比如Java中的LinkedList内部实现是双向链表,比较复杂,这时候通过迭代器Iterator遍历元素就很简单,我们不必关注内部实现。

    函数式编程

    函数式编程是种编程方式,它将电脑运算视为函数的计算。函数编程语言最重要的基础是λ演算(lambda calculus),而且λ演算的函数可以接受函数当作输入(参数)和输出(返回值)。
    函数式编程中的函数,是指数学中的函数,即自变量的映射(一种东西和另一种东西之间的对应关系)。也就是说,一个函数的值仅决定于函数参数的值,不依赖其他状态。

    响应式编程

    简称RP(Reactive Programming)
    在计算机领域,响应式编程是一个专注于数据流和变化传递的异步编程范式。这意味着可以使用编程语言很容易地表示静态(例如数组)或动态(例如事件发射器)数据流,并且在关联的执行模型中,存在着可推断的依赖关系,这个关系的存在有利于自动传播与数据流有关的更改。
    简单概括,本质上是程序会对数据流或某种变化所作出相应的反应。

    4.从基本使用RxJava分析

    掘金翻译计划中看到的有关RxJava的介绍讲的很仔细,解决了很多困惑。还是从最基本的使用入手,下面是最基本的最简单的用法。

    /****************创建被观察者******************/
            Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    //执行耗时操作
                    Thread.sleep(2000);
                    emitter.onNext("耗时操作完毕,发送可观测的对象");
                }
            })
                    /****************线程调度,被观察者在IO线程,观察者在主线程订阅******************/
                    .observeOn(Schedulers.io())
                    .subscribeOn(AndroidSchedulers.mainThread())
    
    
                    /****************被观察者被观察者订阅******************/
                    .subscribe(new Observer<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(String s) {
    
                            Log(s);
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    

    我们从类名就能大概简单理解到整个代码的意思,创建一个被观察者Observable,如果它被观察者Observer订阅了,那么事件发射器ObservableEmitter就会将事件推到每个观察者。其中ObservableOnSubscribe在源码中的解释如下

    /**
     * A functional interface that has a {@code subscribe()} method that receives
     * an instance of an {@link ObservableEmitter} instance that allows pushing
     * events in a cancellation-safe manner.
     *
     * @param <T> the value type pushed
     */
    public interface ObservableOnSubscribe<T> {
    
        /**
         * Called for each Observer that subscribes.
         * @param emitter the safe emitter instance, never null
         * @throws Exception on error
         */
        void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
    }
    

    它是一个拥有 subscribe方法的功能性接口,能接收一个 ObservableEmitter对象,该对象能安全的推送事件,并能以安全方式取消。
    注意这里的 subscribe方法的描述,每个订阅该事件的观察者都会调用这个方法,拥有这个发射器。

    举个例子来说明,比如现在有很多学生订阅21世纪英语报,那么报社就相当于被观察者Observable,我们相当于观察者Observer,送报人就相当于这里的事件发射器ObservableEmitter。我们想要看报纸不需要去询问报社什么时候有新的报纸,只需要订阅,如果有新的报纸,送报人就会给我们送来新的报纸。

    5.Observable类

    下面就Observable这个类,结合ReactiveX中的关于Observable的解释和RxJava中的文档解释来分析一下这个类。
    ReactiveX中的介绍:在ReactiveX中,观察者订阅一个被观察者。然后,观察者可对可观察到的项目或项目序列做出反应。这种模式有利于并发操作。因为它不会在等待被观察者发射对象时阻塞,而是以观察者的形式,随时准备对被观察者在未来的任何时间做出适当的反应。
    下面这种图表示了在ReactiveX文档中被观察者是如何转换的,文档中会多次出现这样类似的图,对于我们理解很有帮助。

    RxJava中的介绍:Observable是非背压的,它是一个提供工厂方法、中间操作符来同步或异步的处理数据流的基类。该类中的许多运算符接受ObservableSource(s),这是非背压流的基本接口,Observable本身也实现了这种接口。ObservableSource是可被观察者处理的非背压被观察者源基本接口。它里面的subscribe方法是为了将观察者和被观察者联系起来,也就是产生订阅关系。

    /**
     * Represents a basic, non-backpressured {@link Observable} source base interface,
     * consumable via an {@link Observer}.
     *
     * @param <T> the element type
     * @since 2.0
     */
    public interface ObservableSource<T> {
    
        /**
         * Subscribes the given Observer to this ObservableSource instance.
         * @param observer the Observer, not null
         * @throws NullPointerException if {@code observer} is null
         */
        void subscribe(@NonNull Observer<? super T> observer);
    }
    

    6.各方法作用

    create方法在上面提到过就是创建被观察者,调用ObservableEmitter发射器的方法如下图,onNext会以推的方式发送数据,onComplete表示已经完成。


    observeOn方法就是对被观察者进行线程调度,让它在指定的线程发送。这里要注意的是,如果调度器真的是异步,那么在发送的线程中onError可能先于onNext执行,如果需要严格的事件顺序,请考虑使用 observeOn(Scheduler scheduler, boolean delayError)重载。


    subscribeOn在指定线程异步订阅被观察者。订阅过程是异步的,结果是在指定线程接收。


    Observer就是观察者基类接口,其中的方法和发射器基类接口的方法基本一致,所以发送的数据会在onNext中接收到,onError、onComplete同理,唯一多了的方法就是onSubscribe它会接收一个Disposable对象,以便随时打断被观察者和观察者之间的联系。

    相关文章

      网友评论

          本文标题:响应式编程开源库 RxJava2——起源

          本文链接:https://www.haomeiwen.com/subject/odwpoqtx.html