美文网首页互联网科技ITBOX程序员
RxJava 的使用与理解(一)

RxJava 的使用与理解(一)

作者: 孙福生微博 | 来源:发表于2016-04-12 13:48 被阅读1716次

    ReactiveX编程简称Rx编程,又叫响应式编程、响应式扩展,英文为Reactive Extensions。可以查看官方网站www.reactive.io,就像其网站说的"Expertise makes better software.",响应式编程的目标是提供一致的编程接口,
    帮助开发者更方便的处理异步数据流,使软件开发更高效、更简洁。Rx是一个多语言的实现,已经支持多种语言包括Java、Swift、C++、.NET、JavaScript、Ruby、Groovy、Scala等等,支持的库包括:RxJavaRxSwift、Rx.NET、RxJS、RXRuby等等,真是屌炸天。在Android上我们添加 RxAndroid 库就可以,RxAndroid 是对 RxJava 一种更接地气的扩展。下面让我们通过 RxAndroid 去使用、理解 RxJava 吧。

    Rx使用观察者模式

    创建:Rx可以方便的创建事件流和数据流
    组合:Rx使用查询式的操作符组合和变换数据流
    监听:Rx可以订阅任何可观察的数据流并执行操作

    Rx使代码简化

    函数式风格:对可观察数据流使用无副作用的输入输出函数,避免了程序里错综复杂的状态
    简化代码:Rx的操作符通通常可以将复杂的难题简化为很少的几行代码
    异步错误处理:传统的try/catch没办法处理异步计算,Rx提供了合适的错误处理机制
    轻松使用并发:Rx的Observables和Schedulers让开发者可以摆脱底层的线程同步和各种并发问题

    先看简单的例子,通过RxJava将Integer类型转成String类型

    private void funcDemo() {
        Observable.OnSubscribe<Integer> onSubscribe1 = new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onNext(100);
            }
        };
    
        Func1<Integer, String> func1 = new Func1<Integer, String>() {
            @Override
            public String call(Integer integer) {
                return String.valueOf(integer);
            }
        };
    
        Subscriber<String> subscriber1 = new Subscriber<String>() {
            @Override
            public void onCompleted() {
            }
    
            @Override
            public void onError(Throwable e) {
            }
    
            @Override
            public void onNext(String s) {
                Log.d("onNext(): ", s);
            }
        };
    
        Observable.create(onSubscribe1)
                .map(func1)
                .subscribe(subscriber1);
    
        // 将上面分解成三步执行
        // Observable<Integer> observable1 = Observable.create(onSubscribe1);
        // Observable<String> observable2 = observable1.map(func1);
        // observable2.subscribe(subscriber1);
    }
    

    主要关注这里面生成的四个对象:observable1、onSubscribe1、func1、subscriber1,如果对相应的单词不清晰,移步下面的备注。
    这是典型的的被观察者与观察者的关系,或者叫被订阅者与订阅者的关系;下面理一下他们的角色:

    observable1:被观察者,是subscriber1要订阅的对象。  
    onSubscribe1:被观察者的行为,是subscriber1要订阅的行为,subscribe()时被执行。  
    subscriber1:订阅者,是抽象类实现了Observer接口,可以叫观察者,其实就是观察者的角色。
    
    // 分解成三步执行的代码
    Observable<Integer> observable1 = Observable.create(onSubscribe1);
    Observable<String> observable2 = observable1.map(func1);
    observable2.subscribe(subscriber1);
    

    这段代码的意思就是:创建一个被观察者observable1,给被观察者指定其所发布的行为(onSubscribe1来实现);
    指定观察者时,只需指定相应的观察回调即可;在完成订阅的操作时,是先调用subscriber1的onStart方法,
    之后通过订阅行为onSubscribe1来调用subscriber1完成相应的订阅操作;最后若出现异常则会回调subscriber1的onError方法。

    换句话说就是:被观察者发布的行为是传递Integer类型的数值100,map()变换后,观察者收到了String类型的字符串"100"。

    简单点说就是:观察者订阅了被观察者要发布的行为。

    源码参考个人作品【图灵机器人】

    现在结合源码,一步步看它到底是怎么执行的!

    第一步 Observable.create(onSubscribe1)

    public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(hook.onCreate(f));
    }
    
    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }
    

    这里面泛型比较多,先忽略<T>泛型符号;hook又是什么东东?翻译过来就是“钩子”,到底是什么钩子呢,查看源码后知道, hook 是一个 proxy 对象, 仅仅用作调试的时候可以插入一些测试代码的,那也先忽略。

    所以Observable.create(onSubscribe1)干了两件事,第一 new 一个 observable1 对象,第二将 new 出的 onSubscribe1 对象通过 Observable 的构造函数赋值给它的成员变量 onSubscribe。

    第二步 observable1.map(func1)

    先看func1对象,RxJava有一系列的(Func+数字)的接口和一系列(Action+数字)接口,这些接口中都只有一个call方法,其中(Func+数字)接口的call方法都有返回值,而(Action+数字)接口的call方法都没有返回值,后面的那个数字表示call方法接受几个泛型类型的参数。看一下Func1和Action1的源码:

    /**
     * Represents a function with one argument.
     */
    public interface Func1<T, R> extends Function {
        R call(T t);
    }
    
    /**
     * A one-argument action.
     */
    public interface Action1<T> extends Action {
        void call(T t);
    }
    

    Func1 和 Action(Action1继承Action) 都继承 Function 接口,Func1 接收一个 T 泛型类型的参数,call 回调后,返回一个 R 泛型类型的值,是一种“变换”函数接口,我们可以在 call 回调中处理这种“变换”的需求。
    接下来看看 map(func1) 干了神马,上源码。。。

    public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return lift(new OperatorMap<T, R>(func));
    }
    
    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                // 核心代码
                Subscriber<? super T> subscriber2 = hook.onLift(operator).call(o);
                subscriber2.onStart();
                onSubscribe1.call(subscriber2);
            }
        });
    }
    

    map()函数里直接调用的lift()函数,先看看OperatorMap和lift()函数的参数Operator是什么玩意,再上源码。。。

    /**
     * Operator function for lifting into an Observable.
     */
    public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
        // cover for generics insanity
    }
    
    public final class OperatorMap<T, R> implements Operator<R, T> {
    
        final Func1<? super T, ? extends R> transformer;
    
        public OperatorMap(Func1<? super T, ? extends R> transformer) {
            this.transformer = transformer;
        }
    
        @Override
        public Subscriber<? super T> call(final Subscriber<? super R> o) {
            return new Subscriber<T>(o) {
                @Override
                public void onCompleted() {
                    o.onCompleted();
                }
                @Override
                public void onError(Throwable e) {
                    o.onError(e);
                }
                @Override
                public void onNext(T t) {
                    o.onNext(transformer.call(t));
                }
            };
        }
    }
    

    Operator 继承 Func1,Operator 和 Func1 都是一种“变换”接口,比如输入Integer类型参数经过处理返回String类型值,
    OperatorMap 继承 Operator,但它的参数又和Operator相反,难道经过OperatorMap又把String类型变换成Integer类型值啦,
    其实 OperatorMap 类有个Func1属性transformer(transformer就是func1),执行o.onNext(transformer.call(t))就将func1变换的结果传递下去了。

    lift()返回的是个新的被观察者对象observable2,同时创建一个新的OnSubscribe对象,暂时标记为onSubscribe2;
    在onSubscribe2回调中调用hook.onLift(operator).call(o),变换后并生成新的观察者subscriber2对象,
    新的观察者subscriber2对象被绑定到原来创建onSubscribe1对象上,可以理解为subscriber2已经订阅到经过变换后的
    observable1要发布的行为,最后这种变化后的行为继续被发送到subscriber1观察者那里。

    最后一步 subscribe(subscriber1)

    subscriber1开始订阅被观察者的行为,也可以说被观察者的行为subscribe()时被执行;我们可以在观察者的回调中处理我们最终得到的结果;
    在执行订阅后返回了Subscription对象,里面包含两个方法:

    public interface Subscription {
        // 取消订阅
        void unsubscribe();
        // 订阅是否被取消
        boolean isUnsubscribed();
    }
    

    最后总结下吧,为了了解RxJava的运行机制;我使用一个简单的函数,实现的功能就是通过RxJava将Integer类型转成String类型并打印出来,并结合源码理解其执行过程;这里面泛型、接口的回调和各种角色的变换确实不好理解;接下来我会将RxJava和Retrofit2结合起来使用,
    所以【图灵机器人】就这样产生啦。

    参考文章:

    彻底搞懂 RxJava — 基础篇
    彻底搞懂 RxJava — 中级篇
    彻底搞懂 RxJava — 高级篇

    大头鬼Bruce
    RxJava基本流程和lift源码分析

    扔物线
    给 Android 开发者的 RxJava 详解

    备注:

    observable:
    adj. 显著的;觉察得到的;看得见的
    n. [物] 可观察量;感觉到的事物

    observer:
    n. 观察者;[天] 观测者;遵守者

    subscribe:
    vi. 订阅;捐款;认购;赞成;签署
    vt. 签署;赞成;捐助

    subscriber:
    n. 订户;签署者;捐献者

    关于我

    个人邮箱:sfsheng0322@126.com
    GitHub主页
    简书主页
    个人博客
    新浪微博

    相关文章

      网友评论

        本文标题:RxJava 的使用与理解(一)

        本文链接:https://www.haomeiwen.com/subject/tefwlttx.html