简述:简单分析Rxjava2 常见操作符的源码
range操作符原理
range是一个生产操作符,例如下面的示例代码,发送0~5给下游:
Disposable disposable1 = Observable.range(0, 5).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: t" + integer);
}
});
生产操作符主要是在range
实现,具体看一下:
//规定了类型为Integer
public static Observable<Integer> range(final int start, final int count) {
//一些判断操作
if (count < 0) {
throw new IllegalArgumentException("count >= 0 required but it was " + count);
}
if (count == 0) {
return empty();
}
if (count == 1) {
return just(start);
}
if ((long)start + (count - 1) > Integer.MAX_VALUE) {
throw new IllegalArgumentException("Integer overflow");
}
//主要的方法
return RxJavaPlugins.onAssembly(new ObservableRange(start, count));
}
- 返回值类型为:Observable<Integer>
- 主要的方法是:new ObservableRange(start, count)
继续看一下ObservableRange,类不是太长主要包含RangeDisposable
静态内部类和subscribeActual();
方法:
public final class ObservableRange extends Observable<Integer> {
private final int start;
private final long end;
public ObservableRange(int start, int count) {
this.start = start;
this.end = (long)start + count;
}
//将观察者传入
@Override
protected void subscribeActual(Observer<? super Integer> o) {
RangeDisposable parent = new RangeDisposable(o, start, end);
o.onSubscribe(parent); // 观察者跟Disposable 关联,可以截断从上游到下游的数据
parent.run();
}
static final class RangeDisposable
extends BasicIntQueueDisposable<Integer> {
private static final long serialVersionUID = 396518478098735504L;
final Observer<? super Integer> downstream;
final long end;
long index;
boolean fused;
RangeDisposable(Observer<? super Integer> actual, long start, long end) {
this.downstream = actual;
this.index = start;
this.end = end;
}
void run() {
if (fused) {
return;
}
Observer<? super Integer> actual = this.downstream;
long e = end;
for (long i = index; i != e && get() == 0; i++) {
actual.onNext((int)i);
}
if (get() == 0) {
lazySet(1);
actual.onComplete();
}
}
@Nullable
@Override
public Integer poll() throws Exception {
long i = index;
if (i != end) {
index = i + 1;
return (int)i;
}
lazySet(1);
return null;
}
@Override
public boolean isEmpty() {
return index == end;
}
@Override
public void clear() {
index = end;
lazySet(1);
}
@Override
public void dispose() {
set(1);
}
@Override
public boolean isDisposed() {
return get() != 0;
}
@Override
public int requestFusion(int mode) {
if ((mode & SYNC) != 0) {
fused = true;
return SYNC;
}
return NONE;
}
}
}
-
subscribeActual
方法中,将参数跟需要发送的内容关联起来,最后调用了RangeDisposable.run();
方法。 -
上面代码可以看出,将
o、start、end
o为Observer
作为参数传到了RangeDisposable
,这里需要注意,RangeDisposable.run()方法,用了一个for循环,依次调用onNext,将int发送给下游
void run() {
if (fused) {
return;
}
Observer<? super Integer> actual = this.downstream;
long e = end;
// 这里for循环,调用观察者的onNext,依次发送 int,
for (long i = index; i != e && get() == 0; i++) {
actual.onNext((int)i);
}
if (get() == 0) {
lazySet(1);
actual.onComplete();
}
}
Map操作符原理
map是转换操作符,例如下面的代码,将String类型转换为Integer类型:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
//发送 String 类型
emitter.onNext("1");
}
}).map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
//String 类型 转换为 Integer
return Integer.valueOf(s);
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
转换操作,是在map
这里实现,跟一下代码:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
//判空操作
ObjectHelper.requireNonNull(mapper, "mapper is null");
// 重要方法
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
注意:
- 这里两个泛型的类型为:String 和 Integer
- 这个方法返回值是
Observable
,实际上返回的是new ObservableMap<T, R>(this, mapper),主要的实现还是在ObservableMap这个类中
ObservableMap 分析
ObservableMap
类中包含一个静态内部类:MapObserver
具体如下:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qd.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}
-
ObservableMap
构造方法,将ObservableSource
,Function
传入,其中Function
泛型分别为T : String
和U: Integer
-
subscribeActual
上一篇中分析过,被观察者订阅观察者的时候,调用的是subscribeActual
这个方法。 -
MapObserver
具体的转换类,具体分析一下onNext(T t);
@Override
public void onNext(T t) { // 这个T = String ,代表转换前的类型
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v; // U: Integer ,指的是需要转换的类型
/**
*具体的转换方法是:mapper.apply(t);,将t转换为v类型
*/
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//将转换好的v ,调用onNext()发送给下游,从而实现了类型转换
downstream.onNext(v);
}
注意
- 上面的Function是一个接口
public interface Function<T, R> {
/**
* Apply some calculation to the input value and return some other value.
* @param t the input value
* @return the output value
* @throws Exception on error
*/
R apply(@NonNull T t) throws Exception;
}
所以,上面调用mapper.apply(t)
其实是找具体实现的apply
方法(这里apply
返回值类型R其实就是Integer
)
其实最开始代码已经具体实现了Function
方法:
到这里,也就完成了一次类型转换,将String 转换成了 Integer。
总结:
通过上面的2个操作符的分析,其实都是对Observable
进行包装/变换,例如下面的两个操作符(可以猜想其它操作符应该也是差不多的流程):
1,range使用new ObservableRange(start, count);
对Observable
进行包装处理。
2,map 使用new ObservableMap<T, R>(this, mapper)
对Observable
进行包装处理。
网友评论