今天想到这个问题,又回来看了一遍,虽然是rxjava1 讲的,rxjava2 也是一样的啦,之前也是看了好多次,就是那种似懂非懂的感觉,这次明白了。
就按map 理解一下吧,下边这个不是源码,但是关键代码。
public Observable lift(Operator operator)
4. return Observable.create(new OnSubscribe({
@Override
public void call(Subscriber subscriber{
//这一句将老的转成新的了
1. Subscriber newSubscriber=operator.call(subscriber);
2. newSubscriber.onStart();
3. onSubscribe.call(newSubscriber);
}
});
}
1.讲解:
OperatorMap 的源码
public final class OperatorMap implements Operator {
private final Func1 transformer;
public OperatorMap(Func1 transformer) {
this.transformer= transformer;
}
@Override
public Subscriber call(final Subscriber o) {
return newSubscriber(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
try{
o.onNext(transformer.call(t));
}catch(Throwable e) {
Exceptions.throwOrReport(e, this,t);
}
}
};
}
}
看到这一行的时候先去看下OperatorMap的源码,OperatorMap .call 是创建一个新的Subscriber 返回目标Subscriber,第一行就是返回一个新的目标Subscriber。
2.
his method is invoked when the Subscriber and Observable have been connected but the Observable has
not yet begun to emit items or send notifications to the Subscriber. Override this method to add any
useful initialization to your subscription, for instance to initiate backpressure.
3.onSubscribe 是新的Observable 中的,然后call 的新的目标Subscriber。
4.返回一个新的Observable赋值给老的Observable
网友评论