美文网首页
RxJava核心代码解析

RxJava核心代码解析

作者: 梧麦_ec3e | 来源:发表于2019-06-01 17:01 被阅读0次

RxJava是一个非常好用的框架,它的作用是将一个复杂的业务逻辑,分拆成一个个小的功能模块,然后将这些功能串起来,达到思路清晰的目的。

关于如何详细使用rxjava,这里就不再介绍了,可以点击 这里
进行查看,rxjava核心代码很简洁巧妙,本文主要介绍核心代码实现。

我们先看一个例子。

        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                Log.e(TAG, "call");
                subscriber.onNext("hello");
                subscriber.onNext("beauty");
                subscriber.onCompleted();
            }
        })
        .subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {         
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onNext(String s) {
                Log.e(TAG, "onNext s: " + s);
            }
        });

05-31 14:36:06.469 29241-29241/? E/rx2: call
05-31 14:36:06.470 29241-29241/? E/rx2: onNext s: hello
05-31 14:36:06.470 29241-29241/? E/rx2: onNext s: beauty
05-31 14:36:06.470 29241-29241/? E/rx2: onCompleted

这个例子创建了一个被观察者和一个订阅者,被观察者产生了两个字符串,然后传递给了订阅者。

我们看一下源码

public class Observable<T> {
    //创建被观察者,需要传入订阅事件f,这个是被观察者和订阅者之间的纽带,我们就称呼它为订阅纽带吧,避免混淆
    public final static <T> Observable<T> create(OnSubscribe<T> f) {
        //hook里面什么都没有做,只是将f返回
        return new Observable<T>(hook.onCreate(f));
    }
    //保存订阅纽带
    protected Observable(OnSubscribe<T> f) {
        this.onSubscribe = f;
    }

    //订阅方法
    public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

    private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        //通知订阅者已经开始
        subscriber.onStart();
        try {
            //onSubscribe是订阅纽带,这里直接调用订阅纽带的call方法,并将订阅者传了过去,然后执行subscriber.onNext("hello");方法,那么订阅者也就收到了该事件
            hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
            return hook.onSubscribeReturn(subscriber);
        } catch (Throwable e) {
            return Subscriptions.unsubscribed();
        }
    }
}

这个例子,从代码实现层来看就是订阅者订阅被观察者后,触发订阅纽带的call方法,call方法产生数据,再传给订阅者。

现在将例子改一下,要将产生的字符串后面加上“*”号,我们可以通过map方法

        //为了方便后面说明,我们称create返回的被观察者为原始被观察者,这个OnSubscribe我们称为原始订阅纽带
        Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                Log.e(TAG, "call");
                subscriber.onNext("hello");
                subscriber.onNext("beauty");
                subscriber.onCompleted();
            }
        })
        //map方法里面有map被观察者,map订阅纽带,map订阅者,见后面的代码
        .map(new Func1<String, String>() {
            @Override
            public String call(String s) {
                Log.e(TAG, "map call s: " + s);
                return s + "*";
            }
        })
        //这个Subscriber我们称为最终订阅者
        .subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                Log.e(TAG, "onCompleted");
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onNext(String s) {
                Log.e(TAG, "onNext s: " + s);
            }
        });

打印结果如下
05-31 14:39:03.707 29692-29692/? E/rx2: call
05-31 14:39:03.707 29692-29692/? E/rx2: map call s: hello
05-31 14:39:03.708 29692-29692/? E/rx2: onNext s: hello*
05-31 14:39:03.708 29692-29692/? E/rx2: map call s: beauty
05-31 14:39:03.708 29692-29692/? E/rx2: onNext s: beauty*
05-31 14:39:03.708 29692-29692/? E/rx2: onCompleted
我们可以看到,在原始订阅纽带里面调用来subscriber.onNext("hello");后,会将字符串传递到map的call方法里面,然后再调用最终订阅者的onNext方法

这个例子将一件事情分拆层两个步骤去实现,产生字符串,加工字符串。我们看一下具体是怎么做到的

这个例子跟上一个的差别是在中间加了map方法,我们看一下map方法的实现

public class Observable<T> {
      public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        //这里看一下OperatorMap类,func是要加工数据的方法
        return lift(new OperatorMap<T, R>(func));
    }

    //该方法返回的是被观察者,相当于新建了一个观察者,为了方便说明,我们称该返回的被观察者是map被观察者,订阅纽带为map订阅纽带
    //注意,下面的call方法是理解整个框架的关键,有点绕,冷静下来分析一下
    //我们现在就来梳理这个例子的调用流程,在这之前,我们定义了几个名称:原始被观察者、原始订阅纽带、map被观察者、map订阅纽带、map订阅者,最终订阅者,先熟悉一下这几个词
    //1.在最后调用subscribe(new Subscriber<String>())的时候,也是就是执行map被观察者的subscribe方法时,会调用map订阅纽带的call方法,也就是下面lift里面call方法
    //2.调用hook.onLift(operator).call(o)得到的是map订阅者,o是最终订阅者,map订阅者是最终订阅者的代理,里面的onNext方法多了一层在转换
    //3.onSubscribe是这个类的订阅纽带,也就是初始订阅纽带。调用onSubscribe.call(st)方法执行初始订阅者的call方法,也就是上面代码块的subscriber.onNext("hello")这里。
    //这里的subscriber订阅者是map订阅者,调用了它的call方法,在OperatorMap这个类里面,执行的是里面的订阅者的onNext(T t)方法,调用o.onNext(transformer.call(t));。
    //t就是传进来的"hello",通过transformer进行转换,这里的o是最终订阅者,t是原始订阅纽带传过来的"hello"字符串,这样就完成了转换,所以最终订阅者o就收到来转化后的数据
    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        return new Observable<R>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<? super R> o) {
                Subscriber<? super T> st = hook.onLift(operator).call(o);
                st.onStart();
                onSubscribe.call(st);
            }
        });
    }
}

public final class OperatorMap<T, R> implements Operator<R, T> {
    private final Func1<? super T, ? extends R> transformer;
    //加工数据的类,负责转换数据
    public OperatorMap(Func1<? super T, ? extends R> transformer{
        this.transformer = transformer;
    }
    //为了方便说明,我们称该方法返回的订阅者是map订阅者,从代码来看,生成的这个订阅者是最终订阅者的代理,唯一不同的是多了一步transformer.call(t)的转化
    @Override
    public Subscriber<? super T> call(final Subscriber<? super R> o){
        return new Subscriber<T>(o) {
            @Override
            public void onCompleted() {
                o.onCompleted();
            }
            @Override
            public void onError(Throwable e) {
                o.onError(e);
            }
            @Override
            public void onNext(T t) {
                //这里的o是最终订阅者,t是原始订阅纽带传过来的
                o.onNext(transformer.call(t));
            }
        };
    }
}
这里有一点绕,我画了一张图来梳理这个关系 rxjava调用关系

上面的说明再结合这张图,关系就能梳理清楚了

总结一下,调用subscribe订阅时,触发map被观察者的call方法,调用onSubscribe.call(st),st又是最终订阅者的代理(该代理获取到数据后加工数据),onSubscribe是原始订阅纽带,触发产生数据的onNext方法,然后将数据传递给map订阅者,加工完数据后,再传递给最终订阅者的onNext方法

每次添加一个map方法时,在原先的连接关系中间插入了一层关系,对上一层的被观察者来说,插入了一个订阅者;对下一层对订阅者而言,插入了一个被观察者,通过这种方式,又将原先对链条连接上了。

对于map方法是这个逻辑,对应其它的方法,也是类似的原理

相关文章

网友评论

      本文标题:RxJava核心代码解析

      本文链接:https://www.haomeiwen.com/subject/dtswtctx.html