美文网首页
RxJava(十四)--subscribeOn()解析

RxJava(十四)--subscribeOn()解析

作者: azu_test | 来源:发表于2019-03-08 23:09 被阅读0次

介绍

subscribeOn()方法是将Observable内的数据处理器Observable.OnSubscribe放置在一个新的线程内执行。

执行代码

        //初始化被观察者Observable,并给其加上数据处理器Observable.OnSubscribe
        Observable Aobservable = Observable.create(new Observable.OnSubscribe<String>(){
            @Override
            public void call(Subscriber<? super String> subscriber) {
                LogShowUtil.addLog("RxJava","发送线程: "+Thread.currentThread().getName(),true);
                subscriber.onNext("杨");
                subscriber.onCompleted();
            }
        });
        //做subscribeOn线程切换处理
        Observable Bobservable = Aobservable.subscribeOn(Schedulers.newThread());
         //初始化观察者Observer,视作结果接收器
        Observer observer = new Observer<String>() {
            @Override
            public void onCompleted() {
                LogShowUtil.addLog("RxJava","结束",true);
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onNext(String string) {
                LogShowUtil.addLog("RxJava","接受线程: "+Thread.currentThread().getName(),true);
                LogShowUtil.addLog("RxJava","结果: "+string,true);
            }
        };
        //订阅
        Bobservable.subscribe(observer);

源码分析

1. 初始化被观察者AObservable
  Observable  Aobservable = Observable.create(原始数据处理器);

由此可知被观察者AObservable持有原始数据处理器对象Observable.OnSubscribe。

2. 执行subscribeOn线程切换操作
        Observable Bobservable = Aobservable.subscribeOn(Schedulers.newThread());
    Observable#subscribeOn
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        if (this instanceof ScalarSynchronousObservable) {
            return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
        }
        return create(new OperatorSubscribeOn<T>(this, scheduler));
    }

接着我们看其中的new OperatorSubscribeOn(Aobservable,线程切换工具)操作

    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    }

由代码可知代理线程切换器OperatorSubscribeOn持有Aobservable和线程切换工具Scheduler

回到subscribeOn()方法内继续执行create(代理线程切换器)

        return create(new OperatorSubscribeOn<T>(this, scheduler));

create方法之前已经分析过,由此可知Bobservable持有代理线程切换器OperatorSubscribeOn。

3. 初始化结果接受器观察者Observer
        Observer observer = new Observer<String>() {
           ...
        }
4. 订阅
        Bobservable.subscribe(observer);

由之前分析可知会使用 Bobservable内的代理线程切换器OperatorSubscribeOn做call()方法。
其中observer为结果接受器

    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();
        subscriber.add(inner);
        //步骤一 切换线程执行以下操作
        inner.schedule(new Action0() {
            @Override
            public void call() {
                final Thread t = Thread.currentThread();
                //步骤二
                //新建一个代理结果接受器,其内持有结果接收器Observer
                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }

                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }
                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                        }
                                    });
                                }
                            }
                        });
                    }
                };
                //步骤三 Aobservable.unsafeSubscribe(代理结果接受器)
                source.unsafeSubscribe(s);
            }
        });
    }

步骤一 inner.schedule会将下面将要执行的步骤二和步骤三会在一个新的线程内执行
步骤二会生成一个新的代理接收器Subscriber s,代理接收器内持有外部实现的结果接受器Observer

接着会执行步骤三unsafeSubscribe()方法

    public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
        try {
            subscriber.onStart();
            //获取数据处理器Observable.OnSubscribe,并做数据处理工作
            RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            try {
                subscriber.onError(RxJavaHooks.onObservableError(e));
            } catch (Throwable e2) {
                Exceptions.throwIfFatal(e2);
                RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                RxJavaHooks.onObservableError(r);
                throw r;
            }
            return Subscriptions.unsubscribed();
        }
    }

由此可知AObservable的原始数据处理器先执行call(代理接收器Subscriber s)方法

接下来会进入外部实现的外部数据处理器

            原始数据处理器内的call()方法
            @Override
            public void call(Subscriber<? super String> subscriber) {
                LogShowUtil.addLog("RxJava","发送线程: "+Thread.currentThread().getName(),true);
                subscriber.onNext("杨");
                subscriber.onCompleted();
            }

然后会进入代理接收器Subscriber s的onNext()方法

                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }

                    @Override
                    public void onError(Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void onCompleted() {
                        try {
                            subscriber.onCompleted();
                        } finally {
                            inner.unsubscribe();
                        }
                    }

                    @Override
                    public void setProducer(final Producer p) {
                        subscriber.setProducer(new Producer() {
                            @Override
                            public void request(final long n) {
                                if (t == Thread.currentThread()) {
                                    p.request(n);
                                } else {
                                    inner.schedule(new Action0() {
                                        @Override
                                        public void call() {
                                            p.request(n);
                                        }
                                    });
                                }
                            }
                        });
                    }
                };

接着会执行subscriber.onNext(t);进入结果接收器Observer内方法体内

        Observer observer = new Observer<String>() {
            @Override
            public void onCompleted() {
                LogShowUtil.addLog("RxJava","结束",true);
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onNext(String string) {
                LogShowUtil.addLog("RxJava","接受线程: "+Thread.currentThread().getName(),true);
                LogShowUtil.addLog("RxJava","结果: "+string,true);
            }
        };

此过程中只有在代理线程切换器OperatorSubscribeOn做call()方法的时候做过一次线程切换,所以那之后的所有操作都是在新的线程内执行的。

最终输出结果

发送线程: RxNewThreadScheduler-1
接受线程: RxNewThreadScheduler-1
结果: 杨
结束

相关文章

网友评论

      本文标题:RxJava(十四)--subscribeOn()解析

      本文链接:https://www.haomeiwen.com/subject/duyrpqtx.html