美文网首页week.io第三方库源码分析RX系列知识
迷之RxJava —— subscribeOn 和 observ

迷之RxJava —— subscribeOn 和 observ

作者: Gemini | 来源:发表于2016-04-03 13:51 被阅读1588次

    开头

    之前我们分析过subscribeOn这个函数,
    现在我们来看下subscribeOnobserveOn这两个函数到底有什么异同。

    用过rxjava的旁友都知道,subscribeOnobserveOn都是用来切换线程用的,可是我什么时候用subscribeOn,什么时候用observeOn呢,我们很少知道这两个区别是啥。

    友情提示,如果不想看分析过程的,可以直接跳到下面的总结部分。

    subscribeOn

    先看下OperatorSubscribeOn的核心代码:

    public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
    
        final Scheduler scheduler;
        final Observable<T> source;
    
        public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
            this.scheduler = scheduler;
            this.source = source;
        }
    
        @Override
        public void call(final Subscriber<? super T> subscriber) {
            final Worker inner = scheduler.createWorker();
            subscriber.add(inner);
            
            inner.schedule(new Action0() {
            
                @Override
                public void call() {
                   
                    Subscriber<T> s = new Subscriber<T>(subscriber) {
                        @Override
                        public void onNext(T t) {
                            subscriber.onNext(t);
                        }
                        
                        @Override
                        public void onError(Throwable e) {
                            try {
                                subscriber.onError(e);
                            } finally {
                                inner.unsubscribe();
                            }
                        }
                        
                        @Override
                        public void onCompleted() {
                            try {
                                subscriber.onCompleted();
                            } finally {
                                inner.unsubscribe();
                            }
                        }
                        
                        ....
                    };
                    
                    source.unsafeSubscribe(s);
                }
            });
        }
    }
    

    这里注意两点:

    1. 因为OperatorSubscribeOn是个OnSubscribe对象,所以在call参数中传入的subscriber就是我们在外面使用Observable.subscribe(a)传入的对象a
    1. 这里source对象指向的是调用subscribeOn之前的那个Observable序列。

    明确了这两点,我们就很好的知道了subscribeOn是如何工作,产生神奇的效果了。
    其实最最主要的就是一行函数

    source.unsafeSubscribe(s);

    并且要注意它所在的位置,是在worker的call里面,说白了,就是把source.subscribe这一行调用放在指定的线程里,那么总结起来的结论就是:

    subscribeOn的调用,改变了调用前序列所运行的线程。

    observeOn

    同样看下OperatorObserveOn这个类的主要代码:

    public final class OperatorObserveOn<T> implements Operator<T, T> {
    
        private final Scheduler scheduler;
        private final boolean delayError;
    
        /**
         * @param scheduler the scheduler to use
         * @param delayError delay errors until all normal events are emitted in the other thread?
         */
        public OperatorObserveOn(Scheduler scheduler, boolean delayError) {
            this.scheduler = scheduler;
            this.delayError = delayError;
        }
    
        @Override
        public Subscriber<? super T> call(Subscriber<? super T> child) {
            ....
            ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError);
            parent.init();
            return parent;
        }
    
        /** Observe through individual queue per observer. */
        private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 {
            final Subscriber<? super T> child;
            final Scheduler.Worker recursiveScheduler;
            final NotificationLite<T> on;
            final boolean delayError;
            final Queue<Object> queue;
            
            // the status of the current stream
            volatile boolean finished;
    
            final AtomicLong requested = new AtomicLong();
            
            final AtomicLong counter = new AtomicLong();
            
            /** 
             * The single exception if not null, should be written before setting finished (release) and read after
             * reading finished (acquire).
             */
            Throwable error;
    
            // do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
            // not prevent anything downstream from consuming, which will happen if the Subscription is chained
            public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError) {
                this.child = child;
                this.recursiveScheduler = scheduler.createWorker();
                this.delayError = delayError;
                this.on = NotificationLite.instance();
                if (UnsafeAccess.isUnsafeAvailable()) {
                    queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);
                } else {
                    queue = new SpscAtomicArrayQueue<Object>(RxRingBuffer.SIZE);
                }
            }
            
            void init() {
                // don't want this code in the constructor because `this` can escape through the 
                // setProducer call
                Subscriber<? super T> localChild = child;
                
                localChild.setProducer(new Producer() {
    
                    @Override
                    public void request(long n) {
                        if (n > 0L) {
                            BackpressureUtils.getAndAddRequest(requested, n);
                            schedule();
                        }
                    }
    
                });
                localChild.add(recursiveScheduler);
                localChild.add(this);
            }
    
            @Override
            public void onStart() {
                // signal that this is an async operator capable of receiving this many
                request(RxRingBuffer.SIZE);
            }
    
            @Override
            public void onNext(final T t) {
                if (isUnsubscribed() || finished) {
                    return;
                }
                if (!queue.offer(on.next(t))) {
                    onError(new MissingBackpressureException());
                    return;
                }
                schedule();
            }
    
            @Override
            public void onCompleted() {
                if (isUnsubscribed() || finished) {
                    return;
                }
                finished = true;
                schedule();
            }
    
            @Override
            public void onError(final Throwable e) {
                if (isUnsubscribed() || finished) {
                    RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
                    return;
                }
                error = e;
                finished = true;
                schedule();
            }
    
            protected void schedule() {
                if (counter.getAndIncrement() == 0) {
                    recursiveScheduler.schedule(this);
                }
            }
    
            // only execute this from schedule()
            @Override
            public void call() {
                long emitted = 0L;
    
                long missed = 1L;
    
                // these are accessed in a tight loop around atomics so
                // loading them into local variables avoids the mandatory re-reading
                // of the constant fields
                final Queue<Object> q = this.queue;
                final Subscriber<? super T> localChild = this.child;
                final NotificationLite<T> localOn = this.on;
                
                // requested and counter are not included to avoid JIT issues with register spilling
                // and their access is is amortized because they are part of the outer loop which runs
                // less frequently (usually after each RxRingBuffer.SIZE elements)
                
                for (;;) {
                    long requestAmount = requested.get();
                    long currentEmission = 0L;
                    
                    while (requestAmount != currentEmission) {
                        boolean done = finished;
                        Object v = q.poll();
                        boolean empty = v == null;
                        
                        if (checkTerminated(done, empty, localChild, q)) {
                            return;
                        }
                        
                        if (empty) {
                            break;
                        }
                        
                        localChild.onNext(localOn.getValue(v));
    
                        currentEmission++;
                        emitted++;
                    }
                    
                    if (requestAmount == currentEmission) {
                        if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
                            return;
                        }
                    }
                    
                    if (currentEmission != 0L) {
                        BackpressureUtils.produced(requested, currentEmission);
                    }
                    
                    missed = counter.addAndGet(-missed);
                    if (missed == 0L) {
                        break;
                    }
                }
                
                if (emitted != 0L) {
                    request(emitted);
                }
            }
            
            boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {
                if (a.isUnsubscribed()) {
                    q.clear();
                    return true;
                }
                
                if (done) {
                    if (delayError) {
                        if (isEmpty) {
                            Throwable e = error;
                            try {
                                if (e != null) {
                                    a.onError(e);
                                } else {
                                    a.onCompleted();
                                }
                            } finally {
                                recursiveScheduler.unsubscribe();
                            }
                        }
                    } else {
                        Throwable e = error;
                        if (e != null) {
                            q.clear();
                            try {
                                a.onError(e);
                            } finally {
                                recursiveScheduler.unsubscribe();
                            }
                            return true;
                        } else
                        if (isEmpty) {
                            try {
                                a.onCompleted();
                            } finally {
                                recursiveScheduler.unsubscribe();
                            }
                            return true;
                        }
                    }
                        
                }
                
                return false;
            }
        }
    }
    

    这里的代码有点长,我们先注意到它是一个Operator,它没有对上层Observable做任何的控制或者包装。

    既然是Operator,那么它的职责就是把一个Subscriber转换成另外一个Subscriber, 我们来关注下转换后的Subscriber对转换前的Subscriber做了些什么事。

    首先它是一个ObserveOnSubscriber类, 既然是Subscriber那么肯定有onNext, onCompleteonError 看最主要的onNext

    @Override
    public void onNext(final T t) {
        if (isUnsubscribed() || finished) {
            return;
        }
        if (!queue.offer(on.next(t))) {
            onError(new MissingBackpressureException());
            return;
        }
        schedule();
    }
    

    好了,这里做了两件事,首先把结果缓存到一个队列里,然后调用schedule启动传入的worker

    我们这里需要注意下:

    在调用observeOn前的序列,把结果传入到onNext就是它的工作,它并不关心后续的流程,所以工作就到这里就结束了,剩下的交给ObserveOnSubscriber继续。

    protected void schedule() {
        if (counter.getAndIncrement() == 0) {
            recursiveScheduler.schedule(this);
        }
    }
    

    recursiveScheduler 就是之前我们传入的Scheduler,我们一般会在observeOn传入AndroidScheluders.mainThread()对吧、

    接下去,我们看下在scheduler中调用的call方法,这里只列出主要带代码

    @Override
    public void call() {
        ...
        final Subscriber<? super T> localChild = this.child;
        for (;;) {
            ...
            boolean done = finished;
            Object v = q.poll();
            boolean empty = v == null;
            
            if (checkTerminated(done, empty, localChild, q)) {
                return;
            }
            
            if (empty) {
                break;
            }
            
            localChild.onNext(localOn.getValue(v));
    
            ...
        }
        
        if (emitted != 0L) {
            request(emitted);
        }
    }
    

    OK,在Scheduler启动后, 我们在Observable.subscribe(a)传入的a就是这里的child我们看到,在call中终于调用了它的onNext方法,把真正的结果传了出去,但是在这里,我们是工作在observeOn的线程上的。

    那么总结起来的结论就是:

    1. observeOn 对调用之前的序列默不关心,也不会要求之前的序列运行在指定的线程上
    2. observeOn 对之前的序列产生的结果先缓存起来,然后再在指定的线程上,推送给最终的subscriber

    复杂情况

    我们经常多次使用subscribeOn切换线程,那么以后是否可以组合observeOnsubscribeOn达到自由切换的目的呢?

    组合是可以的,但是他们的执行顺序是有条件的,如果仔细分析的话,可以知道observeOn调用之后,再调用subscribeOn是无效的,原因是什么?

    因为subscribeOn改变的是subscribe这句调用所在的线程,大多数情况,产生内容和消费内容是在同一线程的,所以改变了产生内容所在的线程,就改变了消费内容所在的线程。

    经过上面的阐述,我们知道,observeOn的工作原理是把消费结果先缓存,再切换到新线程上让原始消费者消费,它和生产者是没有一点关系的,就算subscribeOn调用了,也只是改变observeOn这个消费者所在的线程,和OperatorObserveOn中存储的原始消费者一点关系都没有,它还是由observeOn控制。

    总结

    如果我们有一段这样的序列

    Observable
    .map                    // 操作1
    .flatMap                // 操作2
    .subscribeOn(io)
    .map                    //操作3
    .flatMap                //操作4
    .observeOn(main)
    .map                    //操作5
    .flatMap                //操作6
    .subscribeOn(io)        //!!特别注意
    .subscribe(handleData)
    

    假设这里我们是在主线程上调用这段代码,
    那么

    1. 操作1操作2是在io线程上,因为之后subscribeOn切换了线程
    2. 操作3操作4也是在io线程上,因为在subscribeOn切换了线程之后,并没有发生改变。
    3. 操作5操作6是在main线程上,因为在他们之前的observeOn切换了线程。
    4. 特别注意那一段,对于操作5操作6是无效的
      再简单点总结就是
    1. subscribeOn的调用切换之前的线程。
    2. observeOn的调用切换之后的线程。
    3. observeOn之后,不可再调用subscribeOn 切换线程

    相关文章

      网友评论

      • longzekai:记下这段核心话就行了,哈哈。多谢
        subscribeOn的调用切换之前的线程。
        observeOn的调用切换之后的线程。
        observeOn之后,不可再调用subscribeOn 切换线程

      本文标题:迷之RxJava —— subscribeOn 和 observ

      本文链接:https://www.haomeiwen.com/subject/kwrulttx.html