Rxjava 本质上是 观察者模式框架。被观察者(Observable)->传递数据-> 观察者(observer)在调用subscribe ()方法进行订阅时,会把observer 层层往上构造出新的observer。
例如:Observable.create().map().subscribe(observer) ,observer会先被new MapObserver(observer),用MapObserver接收事件源,再传递到原始的observer。
RxJava源码分析
我们先来分析一下RxJava的工程目录
从源码结构上看无非是观察者Observer/Subscriber、被观察者Observerable、以及与订阅/观察相关的功能类:
-
annotations是相关注解
-
exceptions
-
functions 主要是订阅相关的接口类,比如Action1、Func0等
-
internal是内部使用的util、operaters的综合,方便将订阅关系捋顺,比较重要
-
observerable包主要是专门为某种场景定制的Oberverable类
-
observer包特定的订阅者及集合
-
schedulers包有关异步逻辑的线程关系
-
subjects包订阅中间产生的对象
-
subscriptions包是订阅集合,网络调用常用到
上述最最最核心的就是Observerable类了,光代码就有上万行,不过先不要慌,内部是很有条理的,我们下面继续看。
Observerable类包含三类方法
-
a.产生Observerable被观察者的方法,主要的有create、just、list等
-
b.对Observerable进行中间变换的方法,我们比较熟悉的是map、flatMap、lift、merge、zip、startwith、takeab类方法的特征是都返回Observerable<T>对象
- c.订阅方法,返回subscriptiond订阅对象
Observerable类是所有异步处理的开始、进行和结束,是核心类,理解了这个类就理解了RxJava。
Observerable类的众多方法中总有一款适合用来处理你的异步逻辑,有兴趣的可以深入的研究一下这些方法。
实战一个简易rxjava
为学习rxjava的基本流程,写一个精简版的rxjava
Subscriber
观察者
Observer
接口
public interface Observer<T> {
void onCompleted();
void onError(Throwable t);
void onNext(T var1);
}
SubScriber
简化:
public abstract class Subscriber<T> implements Observer<T> {
public void start() {
}
}
Observable
订阅源
Observable(订阅源)
在RxJava里面是一个大而杂的类,拥有很多工厂方法和各式各样的操作符。每个Observable里面有一个OnSubscribe对象,只有一个方法(void call(Subscriber<? super T> subscriber);),用来产生数据流,这是典型的命令模式。
public class Observable<T> {
final OnSubscribe<T> onSubscribe;
private Observable(OnSubscribe<T> onSubscribe) {
this.onSubscribe = onSubscribe;
}
public static <T> Observable<T> create(OnSubscribe<T> onSubscribe) {
return new Observable<T>(onSubscribe);
}
public void subscribe(Subscriber<? super T> subscriber) {
subscriber.start();
onSubscribe.call(subscriber);
}
public interface OnSubscribe<T> {
void call(Subscriber<? super T> subscriber);
}
}
这样一个大致的框架就出来了
测试
Observable.create(new Observable.OnSubscribe<Integer>() {
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 10; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}).subscribe(new Subscriber<String>() {
public void onCompleted() {
System.out.println("complete");
}
public void onError(Throwable r) {
}
public void onNext(String string) {
System.out.println(Thread.currentThread().getName());
System.out.println(string);
}
});
下面实现map,起始map是就是对结果再包装一层Observe.
实现结果测试
Observable.create(new Observable.OnSubscribe<Integer>() {
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 10; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
})
.map(new Observable.Transformer<Integer, String>() {
public String call(Integer from) {
System.out.println("subsc1@ " + Thread.currentThread().getName());
return "maping " + from;
}
})
.map(new Observable.Transformer<String, String>() {
public String call(String from) {
System.out.println("subsc2@ " + Thread.currentThread().getName());
return "maping2 " + from;
}
})
.subscribe(new Subscriber<String>() {
public void onCompleted() {
System.out.println("complete");
}
public void onError(Throwable r) {
}
public void onNext(String string) {
System.out.println(Thread.currentThread().getName());
System.out.println(string);
}
});
至于线程切换,就是在指定的线程调用call
函数、或调用subscriber
里的onNext()等函数
小结一下,文章到这里主要简单说明了它的原理,以及源码分析。再到一个简单的实战演练。很好的理解rxjava的使用原理。
文末
总体来说RxJava主要作用帮你优雅的处理异步逻辑。RxJava是处理异步逻辑的利器,以往我们处理异步时,需要创建一个线程,传入callback或者listener,线程处理完任务后通过callback、listener、notify或者发送广播去通知UI线程和其他线程。使用RxJava可以在一个方法体内完成这所有逻辑。
网友评论