rxJava的思维
响应式编程,卡片式编程,流式编程,有一个起点和一个终点,起点开始流向我们的“事件”, 把事件流向终点,只不过在流向的过程中,可以增加拦截,拦截是可以对事件进行改变,终点只关心他的上一个拦截返回信息。
rxJava 观察者设计模式或者发布订阅模式
1.创建被观察者 Observable
2.创建观察者Observer
3.观察者订阅被观察者 subscribe()
RxBinding 可以防抖动,flateMap避免网络嵌套,doOnnext() 多api接口链式调用
Rxjava 全局Hook 点
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
//rxjava预留给开发者的hook 钩子
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
微信图片_20220615093821.png
微信图片_20220615094024.png
1111.png
rxjava 源码解析 之观察者 Observer
Observer就是一个接口,预留四个函数,回调不同状态
public interface Observer<T> {
//开始订阅 回调函数
void onSubscribe(@NonNull Disposable d);
//回调结果
void onNext(@NonNull T t);
//出现错误
void onError(@NonNull Throwable e);
//订阅结束
void onComplete();
}
rxjava源码解析 之被观察者 Observable
// ObservableOnSubscribe
public interface ObservableOnSubscribe<T> {
//ObservableEmitter 分发器
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
//被观察者
Observable.create(
//自定义source
object :ObservableOnSubscribe<String>{
override fun subscribe(emitter: ObservableEmitter<String>) {
emitter.onNext("A")
}
})
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//判空
ObjectHelper.requireNonNull(source, "source is null");
//RxJavaPlugins.onAssembly 全局封装,预留hook点,如果开发者自定义了function,那么会在所有操作符执行前,先执行开发者自定义的函数。
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
//创建真正的操作符对象,把开发者自定义的ObservableOnSubscribe 作为参数构造
new ObservableCreate<T>(source)
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//分发器
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//回调接口ObservableOnSubscribe 的onSubscribe函数
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
333.png
RxJava源码解析 之订阅操作
Observable.create(
.....
})
//订阅操作
.subscribe(
object :Observer<String>{
......
})
------Observable.subscribe(Observer)---------
public final void subscribe(Observer<? super T> observer) {
......
//Observable的抽象函数,具体实现由各操作符对象实现,比如 ObservableCreate
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
......
} catch (Throwable e) {
.......
}
}
//ObservableCreate的实现 subscribeActual,observer是自定义的观察者
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//调用观察者的onSubscribe回调函数
observer.onSubscribe(parent);
try {
//source 是自定义的ObservableOnSubscribe,把分发器传进入,用来发射数据
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
//被观察者
Observable.create(
//自定义source
object :ObservableOnSubscribe<String>{
override fun subscribe(emitter: ObservableEmitter<String>) {
//在此处利用发射器,发射数据,回调给观察者
emitter.onNext("A")
}
})
//分发器
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
//observer 就是自定义的观察者对象,通过分发器来回调各状态函数
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
......
if (!isDisposed()) {
observer.onNext(t); //接受数据
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (!isDisposed()) {
try {
observer.onError(t);//数据报错
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();//执行结束
} finally {
dispose();
}
}
}
.....
}
2222.png
5555.png
map分发事件 结构流程图
分发事件装包裹,回调数据拆包裹,u型结构
6666.png
8888.png
999.png
777.png
888.png
观察者设计模式
传统观察者设计模式
一个被观察者,多个观察者,是一对多的关系,并且需要容器管理观察者进行增删,只有被观察者发出改变通知,遍历observable里面的容器,依次发送通知,观察者才能观察到更新变化。耦合度比较高
rxjava的观察者设计模式(发布订阅模式)
多个被观察者(create,map,flatemap等),一个观察者(observer),并且需要起点和终点在订阅(subscribe)后,才会发出通知,终点(观察者)才能观察到。分发事件是会拿到发射器,通过发射器(CreateEmitter)关联到开发者自定义的observer(观察者),发射器调用自定义的observer接口的回调函数onSubscribe,onNext,onError,onComplete。
网友评论