Rxjava2-线程切换解析

作者: Colaman丶 | 来源:发表于2019-02-28 05:11 被阅读57次

    ObservableOn()

    直接查看实现,会发现onSubscribe()中做了一些判断,比如82 104等几行都是做了一些同步 异步 等的判断,然后初始化DisposableonSubscribe()是上游Observable完成了整条订阅链之后调用的,所以这些操作是在开始订阅之后才初始化操作,然后106行可以看出把一个包装处理过的Disposable传递给下游
    image.png
    和之前的一样,subscribeActual方法里会将observer进行包装,然后传递给source也就是上游进行订阅
    * `40`行进行了判断所传进来的`scheduler`是否跟原本的线程一致,如果是一样的就直接传递不用进行处理
    *  `43`行创建了一个对应`scheduler`的`worker`,`worker`在后续负责把数据在对应的线程进行发射操作
    
    image.png
    发射数据onNext处理
            @Override
            public void onNext(T t) {
              ...
              // 前面的都先忽略掉,会发现最后会调用这个方法
                schedule();
            }
            
          void schedule() {
             if (getAndIncrement() == 0) {
             // 在这个可以看到,上面根据schedule的worker执行了schedule(),并且把自身传进去,this其实实现了runnable,所以可以理解为传了一个runnable进去
                worker.schedule(this);
            }
        }
    
    接着上面的以AndroidSchedulers.mainThread()这个scheduler为例,这里实际上是将主线程的looper传进去了
    image.png

    查看一下这个scheduler的worker,会发现worker的基类schedule()方法是相同的互相调用的,所以可以直接看多个参数的schedule(),可以看到73行创建了一个ScheduledRunnable对象,并且把主线程的handler以及外面的Observer传递过去,接着82行用主线程的handler发送消息,119ScheduledRunnable里的run被调用,接着Observer也就是runnable也调用run方法

    image.png image.png
    到这里可以看出,实际上当切换线程的时候,observer(也实现了Runnable)的onNext往scheduler里发送自身,让scheduler来决定自身应该在什么线程执行run方法,接下来看回observer的run方法,就是判断了一下要执行哪个方法
    image.png
    可以看到最后是调用了onNext方法,到这里就完成了指定线程发射数据的功能
            void drainNormal() {
                int missed = 1;
    
                final SimpleQueue<T> q = queue;
                final Observer<? super T> a = downstream;
    
                for (;;) {
                    if (checkTerminated(done, q.isEmpty(), a)) {
                        return;
                    }
    
                    for (;;) {
                        boolean d = done;
                        T v;
    
                        try {
                            v = q.poll();
                        } catch (Throwable ex) {
                            Exceptions.throwIfFatal(ex);
                            disposed = true;
                            upstream.dispose();
                            q.clear();
                            a.onError(ex);
                            worker.dispose();
                            return;
                        }
                        boolean empty = v == null;
    
                        if (checkTerminated(d, empty, a)) {
                            return;
                        }
    
                        if (empty) {
                            break;
                        }
    
                        a.onNext(v);
                    }
    
                    missed = addAndGet(-missed);
                    if (missed == 0) {
                        break;
                    }
                }
            }
    
    
    值得注意的是可以看到的是v也就是我们要发射的数据,是通过poll方法获取的,查看代码可以发现

    queue实际上就是一个Disposable也就是说是上游Observable,通过上游的poll方法去获取要onNext的数据

    image.png
    查看Observable其中一个实现ObservableMap的poll方法,可以看到这里实际上也是调用上游的poll方法,并且对数据的格式也就是不允许为null做了一层判断
            public U poll() throws Exception {
                T t = qd.poll();
                return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
            }
    
    poll方法操作的对象实际上是下图104行的时候new出来的,具体查看其实就是缓存数据,类似一个容量池的作用

    [图片上传失败...(image-a0f00c-1551301878533)]

    image.png
    用一段伪代码来展示切换线程之后的observer,其实相当于onNext等方法都被放在指定的线程里去发射数据
    public class Observer {
        Observer oldObserver;
    
        public Observer(Observer observer) {
            oldObserver = observer;
        }
    
        public void onNext(T t) {
            // 一些其他操作
            new Thread("Android mainThread") {
                @Override
                public void run() {
                    oldObserver.onNext(t);
                }
            } .start();
        }
    
        public void onError(Throwable e) {
            // 一些其他操作
            new Thread("Android mainThread") {
                @Override
                public void run() {
                    oldObserver.onError(e);
                }
            } .start();
        }
    
        public void onComplete() {
            // 一些其他操作
            new Thread("Android mainThread") {
                @Override
                public void run() {
                    oldObserver.onComplete();
                }
            } .start();
        }
    }
    

    相关文章

      网友评论

        本文标题:Rxjava2-线程切换解析

        本文链接:https://www.haomeiwen.com/subject/irscuqtx.html