RxJava 是一个针对响应式编程思想的一个框架,其本质是利用了观察者模式,充分的利用的函数式编程的思想,学习此框架,注意类的命名方式,这样有助了对其流程的理解
主要类
-
Publisher:发布者,发布数据的source
- subscribe(Subscriber<? super T> s) 订阅观察者
-
Observer:观察者,接收来自Publisher的数据,进行处理
- onSubscribe(Subscription s) 订阅时触发
- onNext(T t) 接收一个消息时触发
- onError(Throwable t) 发生错误时触发
- onComplete() 消息全部结束(及结束)时触发
-
Subscription: 订阅,用于观察者对发布者订阅的信息
- request(long n) 向Publisher请求n个数据
- cancel() 告诉Publisher停止发送数据
触发流程
用一个最简单的程序例子来描述其运行流程
Flowable.just("1")
.subscribe(System.out::println);
Flowable.just("1")
只是new了一个FlowableJust的Publisher对象
subscribe(System.out::println)
会new一个LambdaSubscriber的Subscriber的对象,
并发起数据流动操作调用subscribe(Subscriber<? super T> s)
方法触发整个数据的操作
从subscribe(Subscriber<? super T> s)
开始解析其流程
//io.reactivex.Flowable.java
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING,
Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Subscription> onSubscribe) {
...
LambdaSubscriber<T> ls = new LambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
public final void subscribe(FlowableSubscriber<? super T> s) {
...
subscribeActual(z);
}
subscribeActual
将会调用具体Publiser实现的此方法
//io.reactivex.internal.operators.flowable.FlowableJust.java
protected void subscribeActual(Subscriber<? super T> s) {
s.onSubscribe(new ScalarSubscription<T>(s, value));
}
onSubscribe 将会触发Observer此方法的实现
//io.reactivex.internal.subscribers.LambdaSubscriber
public void onSubscribe(Subscription s) {
//注意,由于当前类extends AtomicReference<Subscription>, 此方法是将Subscription set到当前类
//到后面再使用当前类的Subscription相关的方法时,其实使用的是s的实现方法
if (SubscriptionHelper.setOnce(this, s)) {
onSubscribe.accept(this);
...
}
}
onSubscribe.accept(this). 将会调用Observer中的Subscription请求触发数据
//io.reactivex.internal.operators.flowable.FlowableInternalHelper.RequestMax
public void accept(Subscription t) throws Exception {
//之前执行了在LambdaSubscriber中SubscriptionHelper.setOnce(this, s)方法,
//所以其实调用的是ScalarSubscription.request方法
t.request(Long.MAX_VALUE);
}
t.request(Long.MAX_VALUE) 的最终调用
//io.reactivex.internal.subscriptions.ScalarSubscription
@Override
public void request(long n) {
//其s指的就是LambdaSubscriber,到这数据的流向处理就已经通了
s.onNext(value);
}
网友评论