美文网首页
Rx线程切换

Rx线程切换

作者: gczxbb | 来源:发表于2019-07-21 14:34 被阅读0次

线程切换

Rx数据发射器和观察者在同一个线程,未发生线程切换,串行工作,Rx是一个异步框架,主要功能是提供异步操作,让数据源和观察者工作在不同线程。
线程调度器Scheduler,它指定每一段代码应该运行的线程,线程切换两个重要方法。

subscribeOn方法,指定发射数据工作线程。
observeOn方法,指定观察者工作线程。

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
        e.onNext(2);   //发送
        e.onComplete();
    }
})
.subscribeOn(Schedulers.newThread())
//插入一个被观察者
.doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
        //新线程执行
    }
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
    @Override
        public void accept(Integer integer) throws Exception {
            //主线程
        }
});

创建ObservableCreate被观察者,调用subscribeOn方法,数据发射器切换Scheduler新建线程工作。

public final Observable<T> subscribeOn(Scheduler scheduler) {
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

创建ObservableSubscribeOn被观察者,Observable子类,使用委托者设计模式,内部封装ObservableCreate,同时,封装Scheduler,发射线程。
doNext方法,创建一个Consumer,返回一个ObservableDoOnEach被观察者,封装ObservableSubscribeOn,内部Consumer,在订阅观察者时将封装到观察者对象,作为观察者onNext方法的消费对象。
observeOn方法,设置线程在主线程。

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

创建ObservableObserveOn被观察者,封装ObservableDoOnEach被观察者和Scheduler线程。

被观察者链式关系

ObservableSubscribeOn和ObservableObserveOn都持有线程Scheduler切换器。
订阅时,调用链条中第一个被观察者ObservableObserveOn的subscribe方法,**被观察者基类Observable都实现ObservableSource接口#subscribe方法,订阅方法都来自这个接口,每个被观察者的subscribeActual方法自己实现。

@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();
        //ObservableDoOnEach被观察者
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

调用内部source的subscribe方法,即ObservableSource类型,其实是链条中下一个被观察者ObservableDoOnEach。
创建一个观察者ObserveOnObserver,封装用户自己的观察者,创建一个Worker,一起带到观察者中。
ObservableDoOnEach的#subscribeActual方法,创建一个DoOnEachObserver观察者,封装ObserveOnObserver,同时封装外部Consumer。
继续沿着链条走,ObservableSubscribeOn的#subscribeActual方法。

@Override
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    s.onSubscribe(parent);
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

创建一个SubscribeOnObserver观察者,创建一个SubscribeTask任务,scheduler是发射器绑定的工作线程,它的scheduleDirect方法在指定的线程中执行SubscribeTask。

观察者链条关系

subscribeOn方法,设置线程是新线程,因此,将在新线程执行任务。

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;
    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }
    @Override
    public void run() {
        source.subscribe(parent);//线程执行,source是ObservableCreate。
    }
}

线程run方法,此时,被观察者是ObservableSubscribeOn,在新线程中,调用内部source的subscribe方法,ObservableCreate,订阅时,创建发射器,数据源外部调用发射器发射流程和Rx异步文章的一致。
区别是在subscribeOn方法指定新线程执行发射方法。

新线程发射数据时,将找到观察者,观察者也是链式节点,第一个节点是最后创建的SubscribeOnObserver,当执行DoOnEachObserver的onNext方法时,内部Consumer的accept方法。

@Override
public void onNext(T t) {
    if (done) {
        return;
    }
    try {
        onNext.accept(t);
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        s.dispose();
        onError(e);
        return;
    }
    actual.onNext(t);//下一个观察者onNext方法
}

Consumer类型包括onNext,onError,onNext变量即doOnNext方法设置的Consumer,在新线程执行,上图黄色部分都在新线程执行。
到达ObserveOnObserver观察者时,它内部封装工作线程Work。

@Override
public void onNext(T t) {
    if (done) {
        return;
    }
    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    schedule();
}

观察者实现了Runnable,通过Worker,schedule方法,将观察者本身作为任务,在observeOn方法指定主线程运行。后面的观察者,onNext方法,都会切换线程,在主线程中运行。

每调用一次observeOn方法,都会创建一个ObserveOnObserver绑定执行线程,它后面的观察者都会在指定线程运行,因此,线程便会切换一次。

如果调用两次subscribeOn方法。

.subscribeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.newThread())

创建两个ObservableSubscribeOn被观察者节点,根据链表,首先调用的节点在后面,第二次调用的节点在前面,从头部节点开始subscribe订阅时,即使前面(第二次)的切换了线程,在后面(第一次)的节点subscribe订阅由会转换到另外的线程,最终发射数据将以后面的节点指定线程运行,即第一次。
因此,subscribeOn方法设置多次时,以第一次设置的发射数据线程有效。

数据流

任重而道远

相关文章

  • Rx线程切换

    线程切换 Rx数据发射器和观察者在同一个线程,未发生线程切换,串行工作,Rx是一个异步框架,主要功能是提供异步操作...

  • Android-Retrofit2+Rxjava2之网络通用请求

    一直也是用MVP模式,也就结合Rx做网络请求,Rx子线程和UI线程的切换是相当的方便(小白后面准备看下相关的切换的...

  • Rx中的线程切换

    初学者在使用RxJava的过程中,经常搞不清Observable的事件序列和每次操作应该怎样切换线程,切换哪个线程...

  • RxJava——线程控制切换/调度

    本篇代码见:RxJava_Demo_Translater 这里开始学习RxJava线程控制(切换/调度)。一、Rx...

  • 实现简单的 RxKotlin (中)

    线程切换的操作在 Rx 里面非常常用,主要有 subscribeOn observeOn 他们都需要一个 Sche...

  • RxJava的常见使用场景

    demo地址 rx的优势: 线程切换,不需要像handler那样 请求与结果在不同地方 链式编程, 复杂的逻辑形成...

  • 12.RxSwift 调度者(上)

    创建子线程 - self.actionBtn.rx.tap .subscribe 猜想,推测:在子线程中进行 线程...

  • Rxjava2 Note

    线程切换 subscribeOn - 切换上游线程,但仅调用的第一次生效 observeOn - 切换下游线程,调...

  • RxJava系列(三)--切换线程

    1.主线程是怎么切换到子线程2.为什么只有第一次切换有效3.子线程是怎么切换到主线程1>问题1,主线程是怎么切换到...

  • subscribeOn 谐音上 就是上面的

    subscribeOn的调用切换之前的线程。 observeOn的调用切换之后的线程。 observeOn之后,不...

网友评论

      本文标题:Rx线程切换

      本文链接:https://www.haomeiwen.com/subject/vigmlctx.html