引言
前面的文章我们走完了订阅方法线程切换的实现,今天我们来看观察方法的线程切换。
线程调度observeOn
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {...}
接着看observeOn方法:
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
....
//返回ObservableObserveOn对象
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
observeOn操作符返回了ObservableObserveOn对象,这个类比较长,我们仍然只梳理核心部分:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
//调用层传入的线程调度器
final Scheduler scheduler;
//默认false
final boolean delayError;
//默认128
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
// false
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//1 创建出一个对应的Worker
//连续切换观察者线程时候,scheduler发生变化,observer的方法就会交给新设定的Scheduler.Worker执行.
Scheduler.Worker w = scheduler.createWorker();
//2 订阅上游数据源, 封装了传入的observer,构造ObserveOnObserver对象并订阅它
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
这段代码做了两件事情:
1.创建一个Scheduler对应的Worker;
2.根据传入的Observer对象构造代理类对象ObserveOnObserver,在这个类中通过Worker实现观察方法的线程切换;
接下来我们看核心类ObserveOnObserver。
观察方法线程切换核心实现ObserveOnObserver
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
//调用层传入的观察者,也是观察方法的最终调用者
final Observer<? super T> actual;
//对应Scheduler里的Worker
final Scheduler.Worker worker;
//上游被观察者发送过来的数据都存在这里
SimpleQueue<T> queue;
Disposable s;
Throwable error;
//是否完成
volatile boolean done;
//是否取消
volatile boolean cancelled;
// 代表同步发送 异步发送
int sourceMode;
....
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
...
//创建一个queue 用于保存上游 onNext()发送的数据,SpscLinkedArrayQueue根据注释解释,是一个实现了单生产-单消费模型的线程安全队列。
queue = new SpscLinkedArrayQueue<T>(bufferSize);
//回调下游观察者onSubscribe方法
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
//1 执行过error / complete 会是true
if (done) {
return;
}
//2 如果数据源类型不是异步的, 默认不是
if (sourceMode != QueueDisposable.ASYNC) {
//3 将上游push过来的数据 加入 queue里
queue.offer(t);
}
//4 开始进入对应Workder线程,在线程里 将queue里的t 取出 发送给下游Observer
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
//给error存个值
error = t;
done = true;
//开始调度
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
//开始调度
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
//交给worker调度run()方法
worker.schedule(this);
}
}
//从这里开始,这个方法已经是在Workder对应的线程里执行的了
@Override
public void run() {
//默认是false
if (outputFused) {
drainFused();
} else {
//取出queue里的数据 发送
drainNormal();
}
}
//循环从队列取数据,交给观察者
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
// 1 如果已经 终止 或者queue空,则跳出函数,
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
//2 从queue里取出一个值
v = q.poll();
} catch (Throwable ex) {
//3 异常处理 并跳出函数
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
return;
}
boolean empty = v == null;
//4 再次检查 是否 终止 如果满足条件 跳出函数
if (checkTerminated(d, empty, a)) {
return;
}
//5 上游还没结束数据发送,但是这边处理的队列已经是空的,不会push给下游 Observer
if (empty) {
//仅仅是结束这次循环,不发送这个数据而已,并不会跳出函数
break;
}
//6 发送给观察者
a.onNext(v);
}
...
}
}
//检查 是否 已经 结束(error complete), 是否没数据要发送了(empty 空),
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
//如果已经disposed
if (cancelled) {
queue.clear();
return true;
}
// 如果已经结束
if (d) {
Throwable e = error;
//如果是延迟发送错误
if (delayError) {
//如果空
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
//停止worker(线程)
worker.dispose();
return true;
}
} else {
//发送错误
if (e != null) {
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
//发送complete
if (empty) {
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
}
主要部分都加了注释,这里总结一下:
1.ObserveOnObserver实现了Observer和Runnable接口,封装调用层传入的观察者的对象;
2.在onNext()里,先不切换线程,将数据加入队列queue。然后开始切换线程,在另一线程中,从queue里取出数据,push给下游Observer;
3.observeOn()影响的是其下游的代码,subscribeActual方法就会给Observer方法分配新的调度器,所以多次调用仍然生效。
网友评论