1. example:
Observable
.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onComplete();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
LogUtils.log(getClass(), "onSubscribe()");
}
@Override
public void onNext(Integer value) {
LogUtils.log(getClass(), "onNext()");
}
@Override
public void onError(Throwable e) {
LogUtils.log(getClass(), "onError()");
}
@Override
public void onComplete() {
LogUtils.log(getClass(), "onComplete()");
}
});
- 单线程中使用rxjava示例. 下面结合源码分析每个api到底做了哪些事, 以及单线程中这些api的关系如何;
继承关系:
public abstract class Observable<T> implements ObservableSource<T>;
public interface ObservableSource<T>;
public interface Observer<T>;
public interface ObservableOnSubscribe<T>;
public interface ObservableEmitter<T> extends Emitter<T>;
public interface Emitter<T>;
onCreate():
public abstract class Observable<T> implements ObservableSource<T> {
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
}
public final class RxJavaPlugins {
public static <T> Observable<T> onAssembly(Observable<T> source) {
return source;
}
}
public interface ObservableOnSubscribe<T> {
void subscribe(ObservableEmitter<T> emitter) throws Exception;
}
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
}
上面代码做了下面几件事:
- 1、创建ObervableOnSubscirbe对象, 并将引用指向ObservableCreate;
- 2、创建ObservableCreate对象, ObservalbeCreate继承自Observable, 并且ObservableCreate持有ObservableOnSubscribe对象的引用;
public abstract class Observable<T> implements ObservableSource<T>;
public interface ObservableSource<T> {
void subscribe(Observer<? super T> observer);
}
public interface Observer<T> {
void onSubscribe(Disposable d);
void onNext(T value);
void onError(Throwable e);
void onComplete();
}
- 从上面代码可以看出create()做得事仅仅是初始化的作用;
subscribe():
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(Integer value) {}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {}
});
public abstract class Observable<T> implements ObservableSource<T> {
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
}
}
public final class RxJavaPlugins {
public static <T> Observer<? super T> onSubscribe(Observable<T> source, Observer<? super T> observer) {
BiFunction<Observable, Observer, Observer> f = onObservableSubscribe;
if (f != null) {
return apply(f, source, observer);
}
return observer;
}
}
- 1、经过测试, f = null;
- 2、从create()知道, subscribeActual()实际是被Observable的子类ObservableCreate所实现;
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
source.subscribe(parent);
}
}
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable ;
public interface ObservableOnSubscribe<T> {
void subscribe(ObservableEmitter<T> emitter) throws Exception;
}
被观察者Observable与观察者Observer之间的关联实际就是在subscribeActual方法中建立起来的
- 1、CreateEmitter实现于Disposable;
- 2、observer持有CreateEmitter的引用, 并将CreateEmitter引用指向其内部的Disposable;
- 3、ObservableOnSubscribe参股体验CreateEmitter的引用, 并将CreateEmitter引用指向其内部的ObservableEmitter;
- 4、onSubscribe(Disposable disposable)首先被调用;
CreateEmitter:
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
- 如果调用了dispose()方法, emitter调用onNext(), onError(), onComplete()方法会继续执行, 但是Observer里面的onSubscribe(), onNext, onError, onComplete不会被调用;
- 如果emitter调用了onComplete()或onErrer()方法, 则Observer内的方法onXXX方法不会再次执行;
网友评论