美文网首页
Rx线程切换

Rx线程切换

作者: gczxbb | 来源:发表于2019-07-21 14:34 被阅读0次

    线程切换

    Rx数据发射器和观察者在同一个线程,未发生线程切换,串行工作,Rx是一个异步框架,主要功能是提供异步操作,让数据源和观察者工作在不同线程。
    线程调度器Scheduler,它指定每一段代码应该运行的线程,线程切换两个重要方法。

    subscribeOn方法,指定发射数据工作线程。
    observeOn方法,指定观察者工作线程。

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(2);   //发送
            e.onComplete();
        }
    })
    .subscribeOn(Schedulers.newThread())
    //插入一个被观察者
    .doOnNext(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            //新线程执行
        }
    })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Consumer<Integer>() {
        @Override
            public void accept(Integer integer) throws Exception {
                //主线程
            }
    });
    

    创建ObservableCreate被观察者,调用subscribeOn方法,数据发射器切换Scheduler新建线程工作。

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    

    创建ObservableSubscribeOn被观察者,Observable子类,使用委托者设计模式,内部封装ObservableCreate,同时,封装Scheduler,发射线程。
    doNext方法,创建一个Consumer,返回一个ObservableDoOnEach被观察者,封装ObservableSubscribeOn,内部Consumer,在订阅观察者时将封装到观察者对象,作为观察者onNext方法的消费对象。
    observeOn方法,设置线程在主线程。

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
    

    创建ObservableObserveOn被观察者,封装ObservableDoOnEach被观察者和Scheduler线程。

    被观察者链式关系

    ObservableSubscribeOn和ObservableObserveOn都持有线程Scheduler切换器。
    订阅时,调用链条中第一个被观察者ObservableObserveOn的subscribe方法,**被观察者基类Observable都实现ObservableSource接口#subscribe方法,订阅方法都来自这个接口,每个被观察者的subscribeActual方法自己实现。

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            //ObservableDoOnEach被观察者
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    

    调用内部source的subscribe方法,即ObservableSource类型,其实是链条中下一个被观察者ObservableDoOnEach。
    创建一个观察者ObserveOnObserver,封装用户自己的观察者,创建一个Worker,一起带到观察者中。
    ObservableDoOnEach的#subscribeActual方法,创建一个DoOnEachObserver观察者,封装ObserveOnObserver,同时封装外部Consumer。
    继续沿着链条走,ObservableSubscribeOn的#subscribeActual方法。

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        s.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    

    创建一个SubscribeOnObserver观察者,创建一个SubscribeTask任务,scheduler是发射器绑定的工作线程,它的scheduleDirect方法在指定的线程中执行SubscribeTask。

    观察者链条关系

    subscribeOn方法,设置线程是新线程,因此,将在新线程执行任务。

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
        @Override
        public void run() {
            source.subscribe(parent);//线程执行,source是ObservableCreate。
        }
    }
    

    线程run方法,此时,被观察者是ObservableSubscribeOn,在新线程中,调用内部source的subscribe方法,ObservableCreate,订阅时,创建发射器,数据源外部调用发射器发射流程和Rx异步文章的一致。
    区别是在subscribeOn方法指定新线程执行发射方法。

    新线程发射数据时,将找到观察者,观察者也是链式节点,第一个节点是最后创建的SubscribeOnObserver,当执行DoOnEachObserver的onNext方法时,内部Consumer的accept方法。

    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }
        try {
            onNext.accept(t);
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            s.dispose();
            onError(e);
            return;
        }
        actual.onNext(t);//下一个观察者onNext方法
    }
    

    Consumer类型包括onNext,onError,onNext变量即doOnNext方法设置的Consumer,在新线程执行,上图黄色部分都在新线程执行。
    到达ObserveOnObserver观察者时,它内部封装工作线程Work。

    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }
        if (sourceMode != QueueDisposable.ASYNC) {
            queue.offer(t);
        }
        schedule();
    }
    

    观察者实现了Runnable,通过Worker,schedule方法,将观察者本身作为任务,在observeOn方法指定主线程运行。后面的观察者,onNext方法,都会切换线程,在主线程中运行。

    每调用一次observeOn方法,都会创建一个ObserveOnObserver绑定执行线程,它后面的观察者都会在指定线程运行,因此,线程便会切换一次。

    如果调用两次subscribeOn方法。

    .subscribeOn(AndroidSchedulers.mainThread())
    .subscribeOn(Schedulers.newThread())
    

    创建两个ObservableSubscribeOn被观察者节点,根据链表,首先调用的节点在后面,第二次调用的节点在前面,从头部节点开始subscribe订阅时,即使前面(第二次)的切换了线程,在后面(第一次)的节点subscribe订阅由会转换到另外的线程,最终发射数据将以后面的节点指定线程运行,即第一次。
    因此,subscribeOn方法设置多次时,以第一次设置的发射数据线程有效。

    数据流

    任重而道远

    相关文章

      网友评论

          本文标题:Rx线程切换

          本文链接:https://www.haomeiwen.com/subject/vigmlctx.html