简述:本篇主要分析 Observable、Observer 产生(create)、关联(subscribe)、数据发送(emitter)的过程!
下面这段代码,是常规的RxJava的操作,从这段代码入手分析:
//1- create 一个 Observable
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("This is Create!");
}
}).subscribe(new Observer<String>() { //订阅一个 Observer
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
一、Create 被观察者 Observable 过程
1,被观察者Observable
//抽象类,具体实现由子类来做
public abstract class Observable<T> implements ObservableSource<T> {
.....
}
2,接着看 Observable . Create 方法
//1- create 一个 Observable
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("This is Create!");
}
})//....省略部分代码
//*****************************分割线*********************************
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
这里需要注意:
-
create
方法的返回值是Observable -
create
方法传入的参数为:ObservableOnSubscribe<T>
-
create
方法return
,返回的是RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
,其实就是返回的new ObservableCreate<T>(source),并且将source
作为参数,传了进去(重要) ,ObservableCreate
这个类,稍后分析.
3,上面create
方法提到了3个重要的地方,ObservableOnSubscribe、ObservableCreate、RxJavaPlugins.onAssembly
继续往下看
RxJavaPlugins.onAssembly 分析:;
这个方法需要一个
Observable
作为参数,上面示例代码中,直接new ObservableCreate<T>(source)
作为参数,其中这个source
也就是上面的ObservableOnSubscribe
。这里又提到ObservableCreate这个类,顾名思义就是:被观察者生产类,继续往下看:
ObservableOnSubscribe
,需要注意里面的ObservableEmitter
后面会分析,源码如下:
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param emitter the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
ObservableCreate 分析:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
//这里的 ObservableOnSubscribe 作为参数传递进来
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
//注意,这个方法
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 将观察者作为参数,传给 CreateEmitter
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//将parent 传到了onSubscribe()
observer.onSubscribe(parent);
try {
// 将ObservableOnSubscribe 和 CreateEmitter(也就是Observer) 关联
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
//...........省略部分代码
}
注意
-
observer.onSubscribe(parent);
这里调用的是Observer-->void onSubscribe(@NonNull Disposable d);
这个方法需要的参数类型为:Disposable
, 这也就说明了,CreateEmitter
可以将Observer
阻断。 - 上面的source是被观察者
ObservableOnSubscribe<T> source
,执行source.subscribe(parent);
,parent是CreateEmitter
,CreateEmitter
中又有observer
,也就是将被观察者与观察者关联起来。
接着具体看下CreateEmitter这个类:
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
//******************1************************
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
//********************2***********************
//如果没被阻断,调用观察者的onNext()方法
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
//.... 省略部分代码
}
注意,上面标识的地方:
- 1,
CreateEmitter
(生成发射器)这个类的构造器的参数是:Observer
,下面代码也可以看到,调用 发射器的onNext()
等方法最终都是调用的Observer
中对应的方法 - 2,
onNext();
等方法的逻辑,大致都是,先进行判空操作,如果没有被阻断(!isDisposed()) 就会调用Observer
中对应的方法。
CreateEmitter
实现了 ObservableEmitter<T>, Disposable
接口
public interface ObservableEmitter<T> extends Emitter<T> {
//.... 省略代码....
}
这里ObservableEmitter
又继承自Emitter
,接下来看一下Emitter
public interface Emitter<T> {
/**
* Signal a normal value.
* @param value the value to signal, not null
*/
void onNext(@NonNull T value);
/**
* Signal a Throwable exception.
* @param error the Throwable to signal, not null
*/
void onError(@NonNull Throwable error);
/**
* Signal a completion.
*/
void onComplete();
}
Emitter
: 这里面就是我们经常看到的三个方法!
到这里也就说明了,为什么上面代码中emitter.onNext("This is Create!");
会走到下面观察者的onNext()
方法了。
Create方法总结:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
Observable.create()
方法需要一个ObservableOnSubscribe
作为参数,然后又将这个ObservableOnSubscribe
作为参数传给了new ObservableCreate<T>(source)
, 并返回,这也就是生成的被观察者。
二、接着看 Observable . subscribe() 订阅方法
{@link Observable#subscribe()}
这个方法主要是:被观察者订阅观察者:
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//可以忽略
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
//注意这个方法
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
subscribeActual 分析:
protected abstract void subscribeActual(Observer<? super T> observer);
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
//这里的 ObservableOnSubscribe 作为参数传递进来
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
//注意,这个方法
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 将观察者作为参数,传给 CreateEmitter
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//将parent 传到了onSubscribe()
observer.onSubscribe(parent);
try {
// 将ObservableOnSubscribe 和 CreateEmitter(也就是Observer) 关联
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
//...........省略部分代码
}
注意:
-
subscribeActual(observer);
,这个方法是一个抽象方法,由具体的类去实现,这里的具体类指的是上面的ObservableCreate
(这是返回的Observable) , 所以这里的subscribeActual(observer);
其实是调用的ObservableCreate - >subscribeActual ();
方法
三、观察者:Consumer 分析
在被观察者订阅观察者的时候,可以发现有好几个重载方法,上面分析了Observer
:
但是Observer
需要实现所有的方法,如果只需要onNext()、onError();
就需要使用到Consumer
这个接口类:
public interface Consumer<T> {
/**
* Consume the given value.
* @param t the value
* @throws Exception on error
*/
void accept(T t) throws Exception;
}
如果只需要关注onNext()
方法,可以调用Observable
的这个调用方法,参数为Consumber
:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
继续跟到subscribe(Consumer<? super T> onNext) ;
方法:
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
//一系列的判空操作
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
//重要!!!
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
//这里就是调用具体的实现类的方法,和上面的流程一样
subscribe(ls);
return ls;
}
这里主要是LambdaObserver这个类,可以看出,这个类也是一个观察者:
public final class LambdaObserver<T> extends AtomicReference<Disposable>
implements Observer<T>, Disposable, LambdaConsumerIntrospection {
private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Disposable> onSubscribe;
//构造方法接收三个参数,对应 next,error,complete
public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete,
Consumer<? super Disposable> onSubscribe) {
super();
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onSubscribe = onSubscribe;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
try {
onSubscribe.accept(this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
d.dispose();
onError(ex);
}
}
}
@Override
public void onNext(T t) {
if (!isDisposed()) {
try {
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
get().dispose();
onError(e);
}
}
}
@Override
public void onError(Throwable t) {
if (!isDisposed()) {
lazySet(DisposableHelper.DISPOSED);
try {
onError.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(new CompositeException(t, e));
}
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
lazySet(DisposableHelper.DISPOSED);
try {
onComplete.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
}
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return get() == DisposableHelper.DISPOSED;
}
@Override
public boolean hasCustomOnError() {
return onError != Functions.ON_ERROR_MISSING;
}
}
LambdaObserver这个类也是一个Observer
和之前的观察者一样,都包含next、error、complete这几个方法,也是将LambdaObserver传入到ObservableCreate 与被观察者产生关联。
这里可以只传入我们关心的回调,例如上面只传入了onNext()
的回调,原因如下:
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
这里的Functions.ON_ERROR_MISSING其实也是一个Consumer
,可以理解为一个占位,例如当error回调的时候,我们并没有实现,他会回调到下面这个默认实现中去:
public static final Consumer<Throwable> ON_ERROR_MISSING = new OnErrorMissingConsumer();
//---------------------------------------------------------
static final class OnErrorMissingConsumer implements Consumer<Throwable> {
@Override
public void accept(Throwable error) {
RxJavaPlugins.onError(new OnErrorNotImplementedException(error));
}
}
网友评论