基于rxjava1.1.0
用例代码↓
Observable<String> observable1 = Observable.create(new Observable.OnSubscribe<String>() {
public void call(Subscriber<? super String> subscriber) {
Log.e("haha",Thread.currentThread().getName());
subscriber.onNext("1");
subscriber.onCompleted();
}
});
Subscriber<String> subscriber1 = new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.e("haha",s);
Log.e("haha",Thread.currentThread().getName());
}
};
observable1.subscribeOn(Schedulers.io()).subscribe(subscriber1);
Schedulers 源码精简版
public final class Schedulers {
private final Scheduler ioScheduler;
private static final Schedulers INSTANCE = new Schedulers();
private Schedulers() {
Scheduler io = RxJavaPlugins.getInstance().getSchedulersHook().getIOScheduler();
if (io != null) {
ioScheduler = io;
} else {
ioScheduler = new CachedThreadScheduler();
}
}
①
public static Scheduler io() {
return INSTANCE.ioScheduler;
}
}
subscribeOn精简版↓
public final Observable<T> subscribeOn(Scheduler scheduler) {
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
}
OperatorSubscribeOn精简版↓
public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {
private final Scheduler scheduler;
②
public OperatorSubscribeOn(Scheduler scheduler) {
this.scheduler = scheduler;
}
@Override
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
⑥
final Worker inner = scheduler.createWorker();
//create subscriber3
return new Subscriber<Observable<T>>(subscriber) {//subscriber = subscriber 1
⑧
@Override
public void onNext(final Observable<T> o) {//o = observable1
//在指定的线程中执行代码
⑨
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
⑬
o.unsafeSubscribe(new Subscriber<T>(subscriber) {//等价于observable1.onSubscribe1.call(subscriber1)
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
});
}
});
}
};
}
}
lift精简源码↓
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
③
//create Observable3 OnSubscribe3
return new Observable<R>(new OnSubscribe<R>() {
④
@Override
public void call(Subscriber<? super R> o) {
⑤
Subscriber<? super T> st = hook.onLift(operator).call(o);
st.onStart();
⑦
onSubscribe.call(st);//onSubscribe2.call(subscriber3)
}
});
}
CachedThreadScheduler代码片段↓
⑩
@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.unsubscribed();
}
⑪
ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);
return s;
}
NewThreadWorker代码片段↓
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
⑫
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);
return run;
}
代码调用顺序从①开始往后
代码分解
observable1.subscribeOn(Schedulers.io()).subscribe(subscriber1) =
observable1.nest().lift(operatorSubscribeOn(Schedulers.io())).subscribe(subscriber1) =
observable2<observable1>.lift(operatorSubscribeOn(Schedulers.io())).subscribe(subscriber1)
nest()调用的是just(this) 即just(observable1)并且onSubscribe2.call直接调用onNext(observable1)
执行上述代码时未发生订阅关系已经产生一个observable2 onSubscribe2,继续执行代码到达①处创建了ioScheduler 继续执行到达②创建operatorSubscribeOn并制定线程为ioScheduler 继续执行到达③创建observable3 onSubscribe3此时订阅关系变成observable3 .subscribe(subscriber1) 由文章http://www.jianshu.com/p/394debafe192知道会执行observable3.onSubscribe3.call(subscriber1)即④处并传入subscriber1
通过⑤创建subscriber3执行到⑥时创建Worker(创建的具体操作由Scheduler的子类去创建这里由CachedThreadScheduler去创建),继续执行到达⑦调用onSubscribe2.call(subscriber3),因为onSubscribe2由just创建 所以直接调用onSubscribe2.onNext(observable1) = subscriber3.onNext(observable1)到达⑧给onNext(o)赋值为observable1
继续执行在⑨的时候调用了Worker.schedule()这段代码到达了⑩继续执行到达⑪最后到达⑫完成了指定线程运行的操作,继续执行Action方法到达⑬ o.unsafeSubscribe(new Subscriber<T>(subscriber) =
observable1.unsafeSubscribe(new Subscriber<T>(subscriber1) =
observable1.onSubscribe1.call(subscriber1)
至此observable1.onSubscribe1.call(subscriber1)已经在指定线程中运行
网友评论