一、Observable的创建
//Observable的创建, 这里的Object类可以替换为任意类型。
Observable<Object> observable =
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> e) throws Exception {
e.onNext(/*Object*/value1);
e.onNext(/*Object*/value2);
...
e.onComplete();
}
});
//Observer订阅事件
observable.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
1.1 被观察者(Observable)接口及实现类:
(1) 接口ObservableSource
public interface ObservableSource<T> {
void subscribe(@NonNull Observer<? super T> observer);
}
(2) 接口ObservableSource的抽象实现类Observable
public abstract class Observable<T> implements ObservableSource<T> {
/* 实现ObservableSource接口。*/
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
...
}
}
protected abstract void subscribeActual(Observer<? super T> observer);
/* 静态方法,用于创建Observable实例。*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
(3) Observable的具体实现类ObservableCreate
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T> extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) { this.observer = observer;}
/* Emitter接口的实现。*/
@Override
public void onNext(T t) {
...
observer.onNext(t);
}
@Override
public void onError(Throwable t) {
...
observer.onError(t);
}
@Override
public void onComplete() {
...
observer.onComplete();
}
/* Disposable接口的实现。*/
@Override
public void dispose() { ...}
@Override
public boolean isDisposed() {...}
}
}
1.2 被观察者(Observable)和观察者(Observer)之间的桥梁
被观察者(Observable)持有ObservableOnSubscribe实例的引用, 参数ObservableEmitter持有观察者(Observer)实例的引用,这样通过ObservableOnSubscribe的subscribe(ObservableEmitter<T> e)方法,Observable和Observer之间就建立了联系。
(1) 接口ObservableOnSubscribe
/**
* A functional interface that has a {@code subscribe()} method that receives
* an instance of an {@link ObservableEmitter} instance that allows pushing
* events in a cancellation-safe manner.
*
* @param <T> the value type pushed
*/
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}
二、总结
RxJava的使用可以概括为:
- 通过Observable.create(ObservableOnSubscribe<T> source)创建一个Observable实例,实际类型为ObservableCreate, 它持有对source的引用;
- 通过已创建的Observable实例,调用subscribe(Observer<? super T> observer)方法,在该方法中完成的主要工作:
- 使用observer作参数构建CreateEmitter类的实例, 记为parent,parent持有对observer的引用;
- 调用observer.onSubscribe(parent);
- 通过ObservableCreate类的source成员, 调用source.subscribe(parent),该方法由用户自定义,一般包含:
parent.onNext(Object);
parent.onError(Throwable);
parent.onComplete();
最终分别会调用observer的onNext(Object),onComplete()和onError(Throwable)方法。
三、操作符
concatMap
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);
说明:
对源Publisher发射的每一个元素应用一个转换函数(transformation function),来生成一个新的Flowable实例,然后将新的Flowable实例按原序发给订阅者。
举例:
private void timeoutWithRetry() {
Flowable
.just("red", "dark", "yellow", "green", "black", "blue")
.concatMap(new Function<String, Publisher<? extends String>>() {
@Override
public Publisher<? extends String> apply(String color) throws Exception {
Log.d(TAG, "applying " + color + ", thread: " + Thread.currentThread().getName());
return Flowable.just(color)
.delay(color.length(), TimeUnit.SECONDS)
.timeout(5, TimeUnit.SECONDS) //超时时间, 如果超时、且没有添加重试,则抛出TimeoutException.
.retry(2); //Publisher在出现onError()时重试的次数, 重试之后再决定是否调用onError().
}
})
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, String.format("Received {%s} delaying for {%d}, thread: %s", s, s.length(), Thread.currentThread().getName()));
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e(TAG, "accept exception: ", throwable);
}
});
}
四、RxJava的线程控制
- 产生事件的代码(ObservableOnSubscribe接口)和doOnSubscribe()分别在它们后面最近的一个subscribeOn() 指定的Scheduler上执行,如果后面没有找到subscribeOn(),则在subscribe()的调用者所在的线程执行;
- 普通操作(map、filter等)和消费事件的代码(Consumer、Observer接口)在它们前面最近的一个observeOn指定的Scheduler上执行;如果它们前面没有observeOn了,那么它们就在整个调用链的第一个subscribeOn指定的Scheduler上执行;如果没找到subscribeOn调用,则在subscribe()的调用者所在的线程执行。
网友评论