介绍
subscribeOn()方法是将Observable内的数据处理器Observable.OnSubscribe放置在一个新的线程内执行。
执行代码
//初始化被观察者Observable,并给其加上数据处理器Observable.OnSubscribe
Observable Aobservable = Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber) {
LogShowUtil.addLog("RxJava","发送线程: "+Thread.currentThread().getName(),true);
subscriber.onNext("杨");
subscriber.onCompleted();
}
});
//做subscribeOn线程切换处理
Observable Bobservable = Aobservable.subscribeOn(Schedulers.newThread());
//初始化观察者Observer,视作结果接收器
Observer observer = new Observer<String>() {
@Override
public void onCompleted() {
LogShowUtil.addLog("RxJava","结束",true);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String string) {
LogShowUtil.addLog("RxJava","接受线程: "+Thread.currentThread().getName(),true);
LogShowUtil.addLog("RxJava","结果: "+string,true);
}
};
//订阅
Bobservable.subscribe(observer);
源码分析
1. 初始化被观察者AObservable
Observable Aobservable = Observable.create(原始数据处理器);
由此可知被观察者AObservable持有原始数据处理器对象Observable.OnSubscribe。
2. 执行subscribeOn线程切换操作
Observable Bobservable = Aobservable.subscribeOn(Schedulers.newThread());
Observable#subscribeOn
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
接着我们看其中的new OperatorSubscribeOn(Aobservable,线程切换工具)操作
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}
由代码可知代理线程切换器OperatorSubscribeOn持有
Aobservable
和线程切换工具Scheduler
回到subscribeOn()方法内继续执行create(代理线程切换器)
return create(new OperatorSubscribeOn<T>(this, scheduler));
create方法之前已经分析过,由此可知Bobservable持有代理线程切换器OperatorSubscribeOn。
3. 初始化结果接受器观察者Observer
Observer observer = new Observer<String>() {
...
}
4. 订阅
Bobservable.subscribe(observer);
由之前分析可知会使用 Bobservable内的代理线程切换器OperatorSubscribeOn做call()方法。
其中observer为结果接受器
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
//步骤一 切换线程执行以下操作
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
//步骤二
//新建一个代理结果接受器,其内持有结果接收器Observer
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}
@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};
//步骤三 Aobservable.unsafeSubscribe(代理结果接受器)
source.unsafeSubscribe(s);
}
});
}
步骤一
inner.schedule
会将下面将要执行的步骤二和步骤三会在一个新的线程内执行
步骤二会生成一个新的代理接收器Subscribers
,代理接收器内持有外部实现的结果接受器Observer
接着会执行步骤三unsafeSubscribe()
方法
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
subscriber.onStart();
//获取数据处理器Observable.OnSubscribe,并做数据处理工作
RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
try {
subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
RxJavaHooks.onObservableError(r);
throw r;
}
return Subscriptions.unsubscribed();
}
}
由此可知AObservable的原始数据处理器先执行call(代理接收器Subscriber
s
)方法
接下来会进入外部实现的外部数据处理器
原始数据处理器内的call()方法
@Override
public void call(Subscriber<? super String> subscriber) {
LogShowUtil.addLog("RxJava","发送线程: "+Thread.currentThread().getName(),true);
subscriber.onNext("杨");
subscriber.onCompleted();
}
然后会进入代理接收器Subscriber s
的onNext()方法
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}
@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};
接着会执行subscriber.onNext(t);
进入结果接收器Observer内方法体内
Observer observer = new Observer<String>() {
@Override
public void onCompleted() {
LogShowUtil.addLog("RxJava","结束",true);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String string) {
LogShowUtil.addLog("RxJava","接受线程: "+Thread.currentThread().getName(),true);
LogShowUtil.addLog("RxJava","结果: "+string,true);
}
};
此过程中只有在代理线程切换器OperatorSubscribeOn做call()方法的时候做过一次线程切换,所以那之后的所有操作都是在新的线程内执行的。
最终输出结果
发送线程: RxNewThreadScheduler-1
接受线程: RxNewThreadScheduler-1
结果: 杨
结束
网友评论