1.回顾
上篇已经介绍了RxJava的基本概念以及用法 RxJava2基本框架分析一(基础篇)
2.实例讲解
// RxJava的链式操作
// 1. 创建被观察者(Observable) & 定义需发送的事件
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
// 2. 创建观察者(Observer) & 定义响应事件的行为
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Integer value) {
System.out.println("对Next事件" + value + "作出响应");
}
@Override
public void onError(Throwable e) {
System.out.println("对Error事件作出响应");
}
@Override
public void onComplete() {
System.out.println("对Complete事件作出响应");
}
};
// 3. 通过订阅(subscribe)连接观察者和被观察者
observable.subscribe(observer);
-
运行结果
示意图
3. 源码分析
下面,我讲根据 使用步骤 进行RxJava2
的源码进行分析
步骤1:创建被观察者(Observable)
&定义需发送的事件
步骤2:创建观察者(Observer)
&定义响应事件的行为
步骤3:通过订阅(subscribe)
连接观察者和被观察者
步骤一:创建被观察者(Observable)
- 源码分析如下
// 1. 创建被观察者(Observable) & 定义需发送的事件
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
/**
* 源码分析 Observable.create(object : ObservableOnSubscribe<Int>{...])
* create 操作主要是创建了 ObservableCreate 对象并且返回出去
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//判断source是否为空
ObjectHelper.requireNonNull(source, "source is null");
//hook函数:判断是否需要再原对象加上一些代码操作(暂时可以当做返回对象本身)
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
/**
* 下面我们来看看 ObservableCreate 对象里面做了什么操作
*/
public final class ObservableCreate<T> extends Observable<T> {
// ObservableCreate 是Observable的子类
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
//构造函数
//传入source对象,并且赋值全局 = 手动创建的ObservableOnSubscribe匿名内部类对象(Observable.create(new ObservableOnSubscribe<Integer>())
this.source = source;
}
//这里需要留心关注subscribeActual方法后面会讲到
- 步骤1总结:创建被观察者的操作已经完成了,调用
Observable.create()
返回了一个ObservableCreate
对象。
步骤二创建观察者(Observer)
- 源码分析
/**
* 使用步骤2:创建观察者 & 定义响应事件的行为(方法内的创建对象代码)
**/
// 2. 创建观察者(Observer) & 定义响应事件的行为
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Integer value) {
System.out.println("对Next事件" + value + "作出响应");
}
@Override
public void onError(Throwable e) {
System.out.println("对Error事件作出响应");
}
@Override
public void onComplete() {
System.out.println("对Complete事件作出响应");
}
};
/**
* 源码分析Observer类
**/
public interface Observer<T> {
// 注:Observer本质 = 1个接口
// 接口内含4个方法,分别用于 响应 对应于被观察者发送的不同事件
void onSubscribe(@NonNull Disposable d); // 内部参数:Disposable 对象,可结束事件
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
- 步骤2总结:创建观察者的操作已经完成了,通过
new
了一个Observer
的匿名内部类
步骤三:通过订阅(subscribe)连接观察者和被观察者
- 源码分析
// 3. 通过订阅(subscribe)连接观察者和被观察者
observable.subscribe(observer);
/**
* 源码分析:Observable.subscribe(observer)
* 说明:该方法属于 Observable 类的方法(注:传入1个 Observer 对象)
**/
public abstract class Observable<T> implements ObservableSource<T> {
...
// 仅贴出关键源码
@Override
public final void subscribe(Observer<? super T> observer) {
...
// 仅贴出关键源码
//可以看到调用的是本类的下面抽象方法
subscribeActual(observer);
}
//定义了一个抽象方法当调用subscribe时会跟这个调用Observable子类的实现方法(就是调用者)
protected abstract void subscribeActual(Observer<? super T> observer);
}
/**
* 现在我们回到先前创建的被观察者中 ObservableCreate类
**/
public final class ObservableCreate<T> extends Observable<T> {
// ObservableCreate 是Observable的子类
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
//构造函数
//传入source对象,并且赋值全局 = 手动创建的ObservableOnSubscribe匿名内部类对象(Observable.create(new ObservableOnSubscribe<Integer>())
this.source = source;
}
/**
* 重点关注:复写了subscribeActual()
* 作用:订阅时,通过接口回调 调用被观察者(Observerable) 与 观察者(Observer)的方法
**/
@Override
protected void subscribeActual(Observer<? super T> observer) {
//1. 创建1个CreateEmitter对象(封装成一个Disposable对象)
//作用:发射事件
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//2. 调用观察者(Observer)的onSubscribe()
// onSubscribe()的实现 = 使用步骤2(创建观察者(Observer))时复写的onSubscribe()
//将Disposable(CreateEmitter) 传到观察者onSubscribe(Disposable d) 参数中,使之可以解除订阅
observer.onSubscribe(parent);
try {
//3.调用source对象的subscribe()方法
// source对象 = 使用步骤1(创建被观察者(Observable))中创建的ObservableOnSubscribe对象
//subscribe()的实现 = 使用步骤1(创建被观察者(Observable))中复写的subscribe()
//将CreateEmitter对象传递给被观察者进行对象方法的调用(onNext(),onComplete()...)
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
/**
* 分析2:emitter.onNext("1");
* 此处仅讲解subscribe()实现中的onNext()
* onError()、onComplete()类似,此处不作过多描述
**/
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
//初始化讲观察者赋值到全局变量observer
this.observer = observer;
}
@Override
public void onNext(T t) {
//当被观察者调用onNext()方法时,回调此方法(步骤一中创建Observable.create()匿名内部类中的onNext())
//发送的事件不能为null
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
//判断是否断开连接(调用Disposable.dispose())
//没有断开的话,则调用观察者中的onNext()方法
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
}
步骤3总结:当被观察者订阅观察者的时候,会调用被观察者Observable
的subscribeActual()
抽象方法,回调其子类重新的subscribeActual()
方法。这方法里面有三个步骤:
- 创建1个
CreateEmitter
对象(封装成一个Disposable
对象) - 调用观察者(
Observer
)的onSubscribe(CreateEmitter parent )
使其可以取消订阅 - 调用
source
对象的subscribe(CreateEmitter parent)
方法,通过parent
发送事件回调
4. 源码总结
- 在步骤1(创建被观察者(Observable))、步骤2(创建观察者(Observer))时,仅仅只是定义了发送的事件 & 响应事件的行为;
- 只有在步骤3(订阅时),才开始发送事件 & 响应事件,真正连接了被观察者 & 观察者
-
具体源码总结如下
总结
网友评论