美文网首页
Rxjava源码解析--subscribeOn指定线程

Rxjava源码解析--subscribeOn指定线程

作者: Rogge666 | 来源:发表于2017-11-19 15:32 被阅读8次

    基于rxjava1.1.0

    用例代码↓
            Observable<String> observable1 = Observable.create(new Observable.OnSubscribe<String>() {
                public void call(Subscriber<? super String> subscriber) {
                    Log.e("haha",Thread.currentThread().getName());
                    subscriber.onNext("1");
                    subscriber.onCompleted();
                }
            });
            
            Subscriber<String> subscriber1 = new Subscriber<String>() {
                @Override
                public void onCompleted() {
                }
    
                @Override
                public void onError(Throwable e) {
                }
    
                @Override
                public void onNext(String s) {
                    Log.e("haha",s);
                    Log.e("haha",Thread.currentThread().getName());
                }
            };
    
            observable1.subscribeOn(Schedulers.io()).subscribe(subscriber1);
    
    Schedulers 源码精简版
    public final class Schedulers {
    
        private final Scheduler ioScheduler;
    
        private static final Schedulers INSTANCE = new Schedulers();
    
        private Schedulers() {
            Scheduler io = RxJavaPlugins.getInstance().getSchedulersHook().getIOScheduler();
            if (io != null) {
                ioScheduler = io;
            } else {
                ioScheduler = new CachedThreadScheduler();
            }
        }
    
        ①
        public static Scheduler io() {
            return INSTANCE.ioScheduler;
        }
    }
    
    subscribeOn精简版↓
    public final Observable<T> subscribeOn(Scheduler scheduler) {
            return nest().lift(new OperatorSubscribeOn<T>(scheduler));
        }
    
    OperatorSubscribeOn精简版↓
    public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {
        private final Scheduler scheduler;
    
        ②
        public OperatorSubscribeOn(Scheduler scheduler) {
            this.scheduler = scheduler;
        }
    
       
        @Override
        public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
             ⑥
            final Worker inner = scheduler.createWorker();
            //create subscriber3
            return new Subscriber<Observable<T>>(subscriber) {//subscriber = subscriber 1
                ⑧
                @Override
                public void onNext(final Observable<T> o) {//o = observable1
                    //在指定的线程中执行代码
                    ⑨
                    inner.schedule(new Action0() {
    
                        @Override
                        public void call() {
                            final Thread t = Thread.currentThread();
                            ⑬
                            o.unsafeSubscribe(new Subscriber<T>(subscriber) {//等价于observable1.onSubscribe1.call(subscriber1)
    
                                @Override
                                public void onCompleted() {
                                    subscriber.onCompleted();
                                }
    
                                @Override
                                public void onError(Throwable e) {
                                    subscriber.onError(e);
                                }
    
                                @Override
                                public void onNext(T t) {
                                    subscriber.onNext(t);
                                }
    
                            });
                        }
                    });
                }
            };
        }
    }
    
    lift精简源码↓
    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
            ③
            //create Observable3  OnSubscribe3
            return new Observable<R>(new OnSubscribe<R>() {
                ④
                @Override
                public void call(Subscriber<? super R> o) {
                    ⑤
                    Subscriber<? super T> st = hook.onLift(operator).call(o);
                    st.onStart();
                    ⑦
                    onSubscribe.call(st);//onSubscribe2.call(subscriber3)
                }
            });
        }
    
    CachedThreadScheduler代码片段↓
     ⑩
    @Override
            public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
                if (innerSubscription.isUnsubscribed()) {
                    // don't schedule, we are unsubscribed
                    return Subscriptions.unsubscribed();
                }
                ⑪
                ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit);
                innerSubscription.add(s);
                s.addParent(innerSubscription);
                return s;
            }
    
    NewThreadWorker代码片段↓
    public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
            Action0 decoratedAction = schedulersHook.onSchedule(action);
            ScheduledAction run = new ScheduledAction(decoratedAction);
            Future<?> f;
            if (delayTime <= 0) {
                ⑫
                f = executor.submit(run);
            } else {
                f = executor.schedule(run, delayTime, unit);
            }
            run.add(f);
    
            return run;
        }
    

    代码调用顺序从①开始往后
    代码分解
    observable1.subscribeOn(Schedulers.io()).subscribe(subscriber1) =
    observable1.nest().lift(operatorSubscribeOn(Schedulers.io())).subscribe(subscriber1) =
    observable2<observable1>.lift(operatorSubscribeOn(Schedulers.io())).subscribe(subscriber1)

    nest()调用的是just(this) 即just(observable1)并且onSubscribe2.call直接调用onNext(observable1)

    执行上述代码时未发生订阅关系已经产生一个observable2 onSubscribe2,继续执行代码到达①处创建了ioScheduler 继续执行到达②创建operatorSubscribeOn并制定线程为ioScheduler 继续执行到达③创建observable3 onSubscribe3此时订阅关系变成observable3 .subscribe(subscriber1) 由文章http://www.jianshu.com/p/394debafe192知道会执行observable3.onSubscribe3.call(subscriber1)即④处并传入subscriber1

    通过⑤创建subscriber3执行到⑥时创建Worker(创建的具体操作由Scheduler的子类去创建这里由CachedThreadScheduler去创建),继续执行到达⑦调用onSubscribe2.call(subscriber3),因为onSubscribe2由just创建 所以直接调用onSubscribe2.onNext(observable1) = subscriber3.onNext(observable1)到达⑧给onNext(o)赋值为observable1

    继续执行在⑨的时候调用了Worker.schedule()这段代码到达了⑩继续执行到达⑪最后到达⑫完成了指定线程运行的操作,继续执行Action方法到达⑬ o.unsafeSubscribe(new Subscriber<T>(subscriber) =
    observable1.unsafeSubscribe(new Subscriber<T>(subscriber1) =
    observable1.onSubscribe1.call(subscriber1)

    至此observable1.onSubscribe1.call(subscriber1)已经在指定线程中运行

    相关文章

      网友评论

          本文标题:Rxjava源码解析--subscribeOn指定线程

          本文链接:https://www.haomeiwen.com/subject/xuouvxtx.html