美文网首页
Rxjava源码解析--observeOn指定线程

Rxjava源码解析--observeOn指定线程

作者: Rogge666 | 来源:发表于2017-11-19 17:13 被阅读19次

    基于rxjava1.1.0 rxandroid 1.0.1

    用例代码↓
            Observable<String> observable1 = Observable.create(new Observable.OnSubscribe<String>() {
                public void call(Subscriber<? super String> subscriber) {
                    subscriber.onNext("1");
                    subscriber.onCompleted();
                }
            });
    
            Subscriber<String> subscriber1 = new Subscriber<String>() {
                @Override
                public void onCompleted() {
                }
    
                @Override
                public void onError(Throwable e) {
                }
    
                @Override
                public void onNext(String s) {
                    Log.e("haha",s);
                }
            };
     observable1.observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber1);
    
    observeOn源码精简↓
    public final Observable<T> observeOn(Scheduler scheduler) {
            return lift(new OperatorObserveOn<T>(scheduler));
        }
    
    AndroidSchedulers 源码↓
    public final class AndroidSchedulers {
        private AndroidSchedulers() {
            throw new AssertionError("No instances");
        }
    
        private static final Scheduler MAIN_THREAD_SCHEDULER =
                new HandlerScheduler(new Handler(Looper.getMainLooper()));
    
        ①
        public static Scheduler mainThread() {
            Scheduler scheduler =
                    RxAndroidPlugins.getInstance().getSchedulersHook().getMainThreadScheduler();
            return scheduler != null ? scheduler : MAIN_THREAD_SCHEDULER;
        }
    }
    
    lift精简源码↓
    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
            ②
            //create Observable2  OnSubscribe2
            return new Observable<R>(new OnSubscribe<R>() {
                ③
                @Override
                public void call(Subscriber<? super R> o) {
                    Subscriber<? super T> st = hook.onLift(operator).call(o);
                    st.onStart();
                    ⑤
                    onSubscribe.call(st);//onSubscribe1.call(subscriber2)
                }
            });
        }
    
    OperatorObserveOn源码片段↓
        ④
        @Override
        public Subscriber<? super T> call(Subscriber<? super T> child) {//child = subscriber1
            if (scheduler instanceof ImmediateScheduler) {
                // avoid overhead, execute directly
                return child;
            } else if (scheduler instanceof TrampolineScheduler) {
                // avoid overhead, execute directly
                return child;
            } else {
                ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child);
                parent.init();
                return parent;
            }
        }
    
            ⑥
            @Override
            public void onNext(final T t) {
                if (isUnsubscribed()) {
                    return;
                }
                ⑦
                if (!queue.offer(on.next(t))) {
                    onError(new MissingBackpressureException());
                    return;
                }
                schedule();
            }
    
    final Action0 action = new Action0() {
                @Override
                public void call() {
                    pollQueue();
                }
            };
    
            protected void schedule() {
                if (counter.getAndIncrement() == 0) {
                    recursiveScheduler.schedule(action);
                }
            }
    
    pollQueue精简版↓
    void pollQueue() {
            Object o = queue.poll();
            if (o != null) {
                ⑧
                child.onNext(on.getValue(o));
            } else {
                break;
            }
        }
    
    OperatorObserveOn.ObserveOnSubscriber源码片段↓
    public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
                this.child = child;
                this.recursiveScheduler = scheduler.createWorker();
                if (UnsafeAccess.isUnsafeAvailable()) {
                    queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);
                } else {
                    queue = new SynchronizedQueue<Object>(RxRingBuffer.SIZE);
                }
                this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler);
            }
    

    代码调用流程由①到最后
    代码分解
    observable1.observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber1) =
    observable1.lift(operatorObserveOn(func)).subscribe(subscriber1)=
    observable2.subscribe(subscriber1)

    执行代码首先在①创建一个HandlerScheduler 其Looper为主线程的Looper
    继续执行②创建observable2 OnSubscribe2 此时订阅关系变成observable2 .subscribe(subscriber1) 执行observable2.OnSubscribe2.call(subscriber1)到达③传入subscriber1到④中作为call()的入参 此时child = subscriber1创建subscriber2

    继续执行到达⑤等价执行onSubscribe1.call(subscriber2) 即subscriber2.onNext("1")到达⑥其中subscriber2.onNext方法中在节点⑦把数据存放在队列中然后执行schedule();在节点⑧会在指定的线程从队列中取出数据重新发射出来child.onNext(on.getValue(o));其中child为subscriber1 即调用subscriber1.onNext("123"));

    至此流程完结

    相关文章

      网友评论

          本文标题:Rxjava源码解析--observeOn指定线程

          本文链接:https://www.haomeiwen.com/subject/oxhpvxtx.html