所有代码的演示都在RxJava2.1.13版本上进行的
Observable.just(1,2,3,4).subscribe(integer -> System.out.println(integer));
Observable.fromArray(1,2,3,4,5).subscribe(integer -> System.out.println(integer));
just也是间接代用fromArray(),我们简单看看just的分析过程
看看这段代码是如何运行的
@SuppressWarnings("unchecked")
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> just(T item1, T item2, T item3, T item4) {
...
return fromArray(item1, item2, item3, item4);
}
重要的是调用fromArray()创建ObservableFromArray对象
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
} else
if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
ObservableFromArray继承自Observable,而Observable里有一个系列函数subscribe(),一旦被观察者订阅观察者(调用subscribe())就会触发subscribeActual(Observer<? super T> observer),
每一个Observable的子类都有各自的实现
简单看看ObservableFromArray的源码
public final class ObservableFromArray<T> extends Observable<T> {
final T[] array;
public ObservableFromArray(T[] array) {
this.array = array;
}
@Override
public void subscribeActual(Observer<? super T> s) {
FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
s.onSubscribe(d);
if (d.fusionMode) {
return;
}
d.run();
}
static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
final Observer<? super T> actual;
final T[] array;
int index;
boolean fusionMode;
volatile boolean disposed;
FromArrayDisposable(Observer<? super T> actual, T[] array) {
this.actual = actual;
this.array = array;
}
@Override
public int requestFusion(int mode) {
if ((mode & SYNC) != 0) {
fusionMode = true;
return SYNC;
}
return NONE;
}
@Nullable
@Override
public T poll() {
int i = index;
T[] a = array;
if (i != a.length) {
index = i + 1;
return ObjectHelper.requireNonNull(a[i], "The array element is null");
}
return null;
}
@Override
public boolean isEmpty() {
return index == array.length;
}
@Override
public void clear() {
index = array.length;
}
@Override
public void dispose() {
disposed = true;
}
@Override
public boolean isDisposed() {
return disposed;
}
void run() {
T[] a = array;
int n = a.length;
for (int i = 0; i < n && !isDisposed(); i++) {
T value = a[i];
if (value == null) {
actual.onError(new NullPointerException("The " + i + "th element is null"));
return;
}
actual.onNext(value);
}
if (!isDisposed()) {
actual.onComplete();
}
}
}
}
一旦订阅就会执行subscribeActual(),进而执行run(),而run()方法中只是将数组遍历一遍
网友评论