介绍
事件驱动和异步调用是两种慢慢被大家接收的编程范式了。rxjava 库利用观察者模式,把事件驱动和异步调用程序组合在一起。
基于异步调用和事件驱动的程序,经常陷入回调陷阱,导致程序的可读性下降,写出来的程序像意大利面条(callback hell, callback spaghetti)。参考 http://callbackhell.com。
本文从源代码级别上,介绍了 rxjava 中最重要的几个接口和类。
Obserable
final Observable<String> observable = Observable.empty();
Observable
只有一个成员变量 onSubscribe
onSubscribe 只有一个成员方法
Observer
class SimpleObserver implements Observer<String> {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(String s) {}
}
final Observer<String> observer = new SimpleObserver();
observer 没有任何成员变量。
subscribe
observable.subscribe(observer);
这一堆 subcribe
函数都落在了 Subscriber
参数的那个版本上了,返回值都是 Subscription
image::images/observable-11bb0.png[]
subscriber 其实就是 observer ,也是 subscription。
subscription
用来停止订阅 unsubscribe
。
subscribe
比较矫情的一段代码,简化如下。
class Observable<T> {
public final Subscription subscribe(Subscriber<? super T> subscriber) {
onSubscribe.call(subscriber);
return subscriber;
}
}
-
subscriber
就是传进去的第一个参数observable.subscribe(observer)
。subscriber
也是一个Observer
, 因为Subcriber
继承Observer
。 - 这个参数
subscriber
传递给对象onSubscribe
的call
方法,onSubscribe.call(subscriber)
。 -
subscriber
作为一个Subscription
返回。 因为Subscriber
继承Subscription
。
由此可见 Subscriber
这个类才是 rxjava 的核心。subscriber
对象不停的在各个类之间流转。 各种各样不同 OnSubscribe
接口的实现,可以去产生数据源,然后调用 subscriber.onNext(data)
。
小结
一个超级简化版的 rxjava 可以是下面这个样子。
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
}
public class Observable<T> {
final OnSubscribe<T> onSubscribe;
public Observable(OnSubscribe<T> onSubscribe) {
this.onSubscribe = onSubscribe;
}
public final Subscription subscribe(Subscriber<? super T> subscriber) {
onSubscribe.call(subscriber);
return subscriber;
}
}
public interface Observer<T> {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}
public interface Subscription {
void unsubscribe();
boolean isUnsubscribed();
}
public abstract class Subscriber<T> implements Observer<T>, Subscription {
}
尽管 Subscriber
类所占的篇幅很小,他确实核心的一个类。
测试一下这段代码
@Test
public void main() throws Exception {
final Observable<String> observable = new Observable<>(subscriber -> {
subscriber.onNext("hello world");
subscriber.onCompleted();
});
final Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("byebye");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println("> " + s);
}
@Override
public void unsubscribe() {
}
@Override
public boolean isUnsubscribed() {
return false;
}
};
observable.subscribe(subscriber);
}
网友评论