分析目的
- Observable发出数据和Observer接收数据
- 如何实现线程调度和操作符原理
文章仅分析Observable不分析带背压的Flowable
Observable创建和订阅
一个常见例子
Observable.create((ObservableOnSubscribe<String>) emitter -> {
emitter.onNext("test");
emitter.onComplete();
}).map(s -> s + "111")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
LPLogger.e("onNext:" + s);
}
@Override
public void onError(@NonNull Throwable e) {
e.printStackTrace();
LPLogger.e("onError:" + e.getMessage());
}
@Override
public void onComplete() {
LPLogger.e("onComplete");
}
});
}
以上常见例子展示了Observable
创建,订阅和调度以及转换操作符的整个过程下面分析各个过程
Observable创建
Observable.create()
方法参数ObservableOnSubscribe
public interface ObservableOnSubscribe<T> {
void subscribe(ObservableEmitter<T> e) throws Exception;
}
该接口方法subscribe(ObservableEmitter)
参数ObservableEmitter
实现了onNext()
,onError()
和onComplete()
即我们使用发送数据的地方
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
其中RxJavaPlugins.onAssembly()
方法是hook方法,默认返回原值即ObservableCreate(source)
是一个Observable
,实现了subscribeActual()
此方法是订阅真正执行的方法,先不用关注
只需要注意Observable.create()
方法传入参数ObservableOnSubscribe
返回ObservableCreate
即可
Observable订阅
先不看线程调度和操作符转换处理仅看最简单的部分subscribe()
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
//..
subscribeActual(observer);
}
实际是调用的subscribeActual(observer)
,即前面创建的ObservableCreate
里面的subscribeActual()
protected void subscribeActual(Observer<? super T> observer) {
//实现自ObservableEmitter
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//这里是外界的回调
observer.onSubscribe(parent);
try {
//这里source即create传入的ObservableOnSubscribe
//parent即前面传入的ObservableEmitter,即emitter.onNext("test")中的emitter
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
看下CreateEmitter
源码
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("xxx"));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("xxx");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
- 以上
onNext(T)
调用了observer.onNext(t)
即让订阅者接收到发送端数据
我们可以看到subscribeActual(observer)
中的方法已经将Observable
和Observer
联系起来 - 同时注意到
CreateEmitter
是继承自Disposable
即我们可以使用回调中的onSubscribe(Disposable d)
中的Disposable
去结束Observable发送,当我们subscribe(Consumer)
获取的返回值同理 - 从上面代码可以得出我们之前记住的一些结论如
onError()
和onComplete()
只能调用一次,onSubscribe
在onNext()
之前执行等
线程调度
subcribeOn
subscribeOn(Schedulers.io())
返回值类似于create()
,返回的是ObservableSubscribeOn<T>(this, scheduler)
比ObservableCreate<T>(source)
多一个线程处理,典型的装饰器模式应用
源码也和ObservableCreate
类似
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
//和CreateEmitter一样也是包装Observer,最终调用Observer.onNext之类方法
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
//线程切换SubscribeTask是Runnable最终执行的还是source.subscribe(parent);
//source.subscribe(parent)执行后会执行到ObservableCreate的subscribeActual()
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
继续看
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//实际是一个Dispose并设置到ObservableSubscribeOn方便管理任务
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
执行调用如下不一一展开:
w.schedule(task, delay, unit)
->IoScheduler.EventLoopWorker.schedule
->NewThreadWorker.scheduleActual()
->ScheduledRunnable.call()
->ScheduledRunnable.run()
->DisposeTask.run()
->new SubscribeTask(parent)
即在使用线程池执行了source.subscribe(parent)
observeOn
observeOn(AndroidSchedulers.mainThread())
中创建的是ObservableObserveOn
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//创建worker
Scheduler.Worker w = scheduler.createWorker();
//还是和source.subscribe(parent)一致
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
查看ObserveOnObserver
是继承Runnable
public void onNext(T t) {
if (done) {
return;
}
//先把值存储到队列中,然后切换线程处理
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
//run方法运行到这里
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
//切到线程后再执行onNext
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
-
ObserveOnObserver
的onNext()
中把发送值存到队列,然后调用schedule()
- 调用的是
worker.schedule(this)
;和前面分析subcribeOn
一样直接查看run()
,此时已经完成线程切换 -
run()
中调用的是drainNormal()
,从1中存储的队列中取出值发送
切换线程分析
AndroidSchedulers.mainThread()
实现
private static final class MainHolder {
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
继续看HandlerScheduler
public Worker createWorker() {
return new HandlerWorker(handler, async);
}
private static final class HandlerWorker extends Worker {
private final Handler handler;
private final boolean async;
private volatile boolean disposed;
HandlerWorker(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
//设置了runnable()后续发送到主线程会执行run()
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
if (async) {
message.setAsynchronous(true);
}
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
ObservableObserveOn
中的subscribeActual()
创建的worker就是HandlerWorker
-
AndroidSchedulers.mainThread()
创建了一个带主线程Handler
的HandlerScheduler
-
schedule()
中通过handler.sendMessageDelayed(msg,delay)
发送消息到主线程,因为message
设置了Runnable()
,消息发送到主线程后会调用message.callback.run()
从而调用schedule()
中的参数run()
,即实际完成切换线程回调到ObserveOnObserver
的run()
操作符原理
仅分析下map操作符,其它操作符类似
直接看ObservableMap
源码
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qd.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}
- 可以看到
subscribeActual()
直接是source.subscribe(parent)
类型,我们直接看MapObserver
的onNext()
- 非聚合模式sourceMode的值是NONE,相当于
Observer.onNext(mapper.apply(t))
而mapper.apply(t)
则是我们写的lambda表达式的返回值即s + "111"
,由此可以看出map是直接把值返回
总结
-
subscribeActual()
方法中实际完成订阅,subscribe
订阅后各个操作符才完成订阅,即订阅是自下而上进行的 - 线程操作是通过线程池和
Handler
完成切换
网友评论