美文网首页
RxJava2源码初探-线程切换

RxJava2源码初探-线程切换

作者: 码农朱同学 | 来源:发表于2018-12-19 21:37 被阅读0次

Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer,并且回调 Observer 的相应的方法。

Observable#subscribeOn(Scheduler)

在 Android 中,我们知道默认都是执行在主线程的,那么 Rxjava 是如何实现线程切换的。

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
                emitter.onComplete();
            }
        })
        .subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("TAG", "onSubscribe():  ");
            }

            @Override
            public void onNext(String s) {
                Log.e("TAG", "onNext():  " + s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                Log.e("TAG", "onComplete():  ");
            }
        });

大致流程

我们先来看一下 subscribeOn 方法,可以看到

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
    // scheduler 判空
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    // 用 ObservableSubscribeOn 将 scheduler 包装 起来
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

从 [图片上传失败...(image-8534da-1545226643196)] ,我们知道,当我们调用 observable.subscibe(observable) 的时候,最终会调用到具体的 observable 的实例的 subscribActual 方法。而这里具体的 observable 的实例为 ObservableSubscribeOn。
接下来,我们来看一下 ObservableSubscribeOn 这个类,可以看到继承 AbstractObservableWithUpstream ,而 AbstractObservableWithUpstream 继承 Observable,实现 HasUpstreamObservableSource 这个接口。

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    ---
}


abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

    /** The source consumable Observable. */
    protected final ObservableSource<T> source;

    /**
     * Constructs the ObservableSource with the given consumable.
     * @param source the consumable Observable
     */
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }

    @Override
    public final ObservableSource<T> source() {
        return source;
    }

}

public interface HasUpstreamObservableSource<T> {
    /**
     * Returns the upstream source of this Observable.
     * <p>Allows discovering the chain of observables.
     * @return the source ObservableSource
     */
    ObservableSource<T> source();
}

observableSubscribeOn 的 subscribeActual 方法,跟 ObservableCreate 的 subscribeActual 的套路差不多,它也是 Observable 的一个子类。只不过比 ObservableCreate 多实现了一个接口HasUpstreamObservableSource,这个接口很有意思,他的 source() 方法返回类型是 ObservableSource(还记得这个类的角色吗?)。也就是说 ObservableSubscribeOn 这个 Observable 是一个拥有上游的 Observable 。他有一个非常关键的属性 source,这个 source 就代表了他的上游。

接下来我们一起来看一下 ObservableSubscribeOn 的具体实现

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
}

首先先来看他的构造函数 ,有两个参数 source ,scheduler。

source 代表上游的引用,是 Observable 的一个实例
scheduler 可以通过 Schedulers.newThread() 或者 Schedulers.io() 创建相应的实例
这里我们先大概了解一下 Scheduler 是个什么,Scheduler 里面封装了 Worker 和 DisposeTask,下面会详细讲到。

Schedulers.newThread()

@NonNull
public static Scheduler newThread() {
    return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}


NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
static final class NewThreadTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
        return NewThreadHolder.DEFAULT;
    }
}
static final class NewThreadHolder {
    static final Scheduler DEFAULT = new NewThreadScheduler();
}

相关文章

网友评论

      本文标题:RxJava2源码初探-线程切换

      本文链接:https://www.haomeiwen.com/subject/fvpskqtx.html