美文网首页RxJavaAndroid知识Android开发
Rxjava源码解读笔记:线程、map数据操作

Rxjava源码解读笔记:线程、map数据操作

作者: wenld_ | 来源:发表于2017-06-08 18:01 被阅读393次

    一点牢骚:
    前段时间,接到需求,旧项目要增添许多功能;旧项目是这样的:功能以及代码量就非常庞大,加上各种代码不规范、可读性很差、代码耦合度有点小高;
    听到这个消息真的让我脑袋大了一圈,
    如果真的要在原有架构上做开发,肯定会导致小组成员开发冲突以及众多的冗余代码,浪费时间和精力在非必要的事情上,之前自身也知道旧项目有这个问题 但由于新项目开发呀嫌弃旧项目一直没有决心去改动,这下好了完全推不了 那就改架构吧,新的模式是 组件化+Rxjava.Retrofit+MVP模式,最近一直在忙着项目代码架构调整,相对应的代码模板编写等等,虽然说改架构是被逼的,但改着改着还是有成长以及很有成就感的一件事情; 再接再厉。


    说实话,rxjava的源码太难了,一直没有去时间(懒癌)去学习; 包括现在项目比较紧张,每天下班后更是不太想去学习,那么现在我就和大家一起看一下rxjava的源码吧;

    1、正常简易流程;
    2、带线程切换流程;
    3、map之后;
    4、一些总结

    1、正常简易流程

    基于以下这段代码查看源码

       Observable.just("11")
                .subscribe(observer);
    

    大家应该都知道或者听过,Rxjava采用的是 增强版的观察者模式,在订阅的那一瞬间开始执行整个流程,那么现在看一下订阅方法subscribe(Observer<? super T> observer)

    Observable.class
        @Override
        public final void subscribe(Observer<? super T> observer) {
            //..
            // 实际订阅
                subscribeActual(observer);
            //...
        }
        
        
    RxJavaPlugins.class
        public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
            BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
            
            if (f != null) {
                return apply(f, source, observer);
            }
            return observer;
        }
        
        static <T, U, R> R apply(@NonNull BiFunction<T, U, R> f, @NonNull T t, @NonNull U u) {
            try {
                return f.apply(t, u);
            } catch (Throwable ex) {
                throw ExceptionHelper.wrapOrThrow(ex);
            }
        }
    

    看到这里实际订阅是发生在 observable 的 subscribeActual 中 而 subscribeActual是个抽象方法; 那么我们又要去找它的实现;
    这边通过Observable.just开始看

    Observable.calss
    
      public static <T> Observable<T> just(T item) {
            //...
            return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
        }
    
    ObservableJust.class
        
        protected void subscribeActual(Observer<? super T> s) {
            ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
            //调用 observer的 onSubsribe方法
            s.onSubscribe(sd);
            //执行
            sd.run();
        }
        
    ScalarDisposable.calss
       public void run() {
       // 判断什么的
        if (get() == START && compareAndSet(START, ON_NEXT)) {
        // 
            observer.onNext(value);
            if (get() == ON_NEXT) {
                lazySet(ON_COMPLETE);
                observer.onComplete();
            }
        }
    }
    

    可以看到 run是直接执行的;
    整体的一个简单正常的流程就是: observable.subscribe(Observer) -> observable.subscribeActual -> Observer.onSubscribe( Disposable ) -> ScalarDisposable.run -> observer.onNext(value) -> observer.onComplete();

    简易源码流程——01

    其中正常完整流程都会执行标红部分的方法;其中其它部分先放着,只是判断有没有完成完成所有数据流的发射

    2、线程切换流程

    基于以下这段代码查看源码

    Observable.just("11")
            .subscribeOn(Schedulers.io())//指定Observable 在哪个线程上创建执行操作
            .observeOn(AndroidSchedulers.mainThread()) //在指定下一事件发生的线程
            .subscribe(observer);
    

    2.1、 流向 Observable.subscribe 都经历了什么

    先看下 Observable.subscribeOn都做了些什么

    Observable.class
        public final Observable<T> subscribeOn(Scheduler scheduler) {
            //
            return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
        }
        
    ObservableSubscribeOn.class    本质上继承 Observable
        public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
            //保存以及初始化
            super(source);
            this.scheduler = scheduler;
        }
    
    

    可以看就就是转换变成了 ObservableSubscribeOn

    再看下 Observable.observeOn(Scheduler scheduler) 做了些什么

    Observable.class  这边应该是: ObservableSubscribeOn extends .... Observable
        public final Observable<T> observeOn(Scheduler scheduler) {
            return observeOn(scheduler, false, bufferSize());
        }
        public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
            return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
        }
    
    ObservableObserveOn.class 
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
    

    本质是将Observable转换成ObservableObserveOn ,在这个流程中是将 ObservableSubscribeOn 转换成ObservableObserveOn;

    我们的Observable变换是这样子的,ObservableJust->ObservableSubscribeOn->ObservableObserveOn
    一层一层被包含

    Obserable转换

    2.2、流向 -> Observer.onSubscribe 都经历了什么

    那么又到了我们的 订阅方法subscribe(Observer<? super T> observer)了,只不过我们中间多了几层转换; 我们再来看一下

    Observable.class
        @Override
        public final void subscribe(Observer<? super T> observer) {
            //...
            // 实际订阅
                subscribeActual(observer);//...
        }
    ObservableObserveOn.class
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
            //创建一个  Scheduler.Worker
                Scheduler.Worker w = scheduler.createWorker();
            //   new一个新的 ObserveOnObserver implements Observer 再次循环  Observable.subscribe
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
        
    ObserveOnObserver.class .... implements Observer<T>, Runnable
        
    Observable.class
        @Override
        public final void subscribe(Observer<? super T> observer) {
            //..
            // 实际订阅
                subscribeActual(observer);
            //...
        }
    
    ObservableSubscribeOn.class 
        public void subscribeActual(final Observer<? super T> s) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
            //直接执行,what? Observer.onSubscribe 不能指定线程   
            // 记录一下   Observer.onSubscribe 的入口是
            s.onSubscribe(parent);
    
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
    
    

    Observer的转变是这样的 Observer->ObserveOnObserver->SubscribeOnObserver

    以上面为准,先看下 s.onSubscribe(parent)所经历的事情

    ObserveOnObserver.class 
        @Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                if (s instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) s;
    
                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
    
                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        done = true;
                        actual.onSubscribe(this);
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        actual.onSubscribe(this);
                        return;
                    }
                }
    
                queue = new SpscLinkedArrayQueue<T>(bufferSize);
    
    //   看这里  actual 其实是  Observer ;
                actual.onSubscribe(this);
            }
        }
        
    Observer.class 
        onSubscribe(sd){...}
    
    

    这里究竟可以看到 执行到 最初observeronSubscribe的一条完整的线路;
    ObserveOnObserver.subscribeActual -> ObservableSubscribeOn.subscribeActual -> ObserveOnObserver.onSubscribe -> Observer.onSubscribe ;
    不知道有没有细心的同学发现了没有,'onSubscribe'的执行没有SubscribeOnObserver什么事情,虽然说上面有一层转换成功了SubscribeOnObserver
    画成图应该就是下面这样:

    onSubscribe执行链

    我们发现了 从订阅开始一直到执行我们的 observer.onSubscribe() 中间没有任何切换线程的影子;
    所以我们得出了一个

    observer的 onSubscribe 运行与订阅动作发生在同一线程,不受线程指定方法(observeOn subscribeOn)影响

    2.3、流向 -> observer.next、onComplete 都经历了什么

    ObservableSubscribeOn.class 
        public void subscribeActual(final Observer<? super T> s) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
            s.onSubscribe(parent);
    
    //      new 出一个  SubscribeTask
    //      scheduler.scheduleDirect 切换线程执行  SubscribeTask
    //      SubscribeOnObserver.setDisposable方法
    
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
    

    可以看到上面最后一段代码做个这样事情,一件一件去看一下:

    // new 出一个 SubscribeTask
    // scheduler.scheduleDirect 切换线程执行 SubscribeTask
    // SubscribeOnObserver.setDisposable方法

    先看一下SubscribeTaskrun 里面是干嘛的

    ObservableSubscribeOn.class
        class SubscribeOnObserver
            SubscribeOnObserver(Observer<? super T> actual) {
                this.actual = actual;
                this.s = new AtomicReference<Disposable>();
            }
            
        class SubscribeTask implements Runnable {
            private final SubscribeOnObserver<T> parent;
    
            SubscribeTask(SubscribeOnObserver<T> parent) {
                this.parent = parent;
            }
    
            @Override
            public void run() {
                //其中  source 是  ObservableJust  
                source.subscribe(parent);
            }
        }
    

    由第一节的分析我们可以知道,这边最终会执行到 SubscribeOnObserver.onNext() -> ObserveOnObserver.onNext()->Observer.onNext() 这边一层一层调用出来;

    SubscribeTask.run 最终执行我们的 最初observer.onNext() onComplete(); 这边还没有涉及到线程切换

    再看我们的 scheduler.scheduleDirect(new SubscribeTask)
    我们上面用的是 Scheduler.IO 实际上是 IoScheduler;

    IoScheduler extends Scheduler.class
        @NonNull
        public Disposable scheduleDirect(@NonNull Runnable run) {
            return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
        }
        @NonNull
        public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
            final Worker w = createWorker();    
    
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            DisposeTask task = new DisposeTask(decoratedRun, w);
    
            //指定工作线程
            w.schedule(task, delay, unit);
    
            return task;
        }
        @Override
        public Worker createWorker() {
            return new EventLoopWorker(pool.get());
        }
        
        EventLoopWorker extends Scheduler.Worker 
           @NonNull
            @Override
            public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
                if (tasks.isDisposed()) {
                    // don't schedule, we are unsubscribed
                    return EmptyDisposable.INSTANCE;
                }
    
                return threadWorker.scheduleActual(action, delayTime, unit, tasks);
            }
        
    

    那么这边流程就比较清晰了,拿到subscribeOn 设置的Scheduler中创建一个Worker 设定了一个 IO 线程;
    看到这里 我们就该逆向地执行我们 Observer 真正的方法了;
    执行到 SubscribeOnObserver.onNext()

    ObservableSubscribeOn : SubscribeOnObserver<T> 
            @Override
            public void onNext(T t) {
            // actual 为 ObserveOnObserver
                actual.onNext(t);
            }
    //  scheduler  这边指定为  AndroidSchedulers.mainThread()    createWorker() 这边不深究,里面转成了 handler
    Scheduler.Worker worker = scheduler.createWorker();
    ObservableObserveOn : ObserveOnObserver
            @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
    
                if (sourceMode != QueueDisposable.ASYNC) {
                    queue.offer(t);
                }
                schedule();
            }
            void schedule() {
                if (getAndIncrement() == 0) {
                // 这个最终 执行在handler
                    worker.schedule(this);
                }
            }
    

    最后的流程应该是这样的


    线程切换

    3、map 数据操作源码

    Observable.just(1)
            .map(new Function<Integer, Integer>() {
                @Override
                public Integer apply(@NonNull Integer integer) throws Exception {
                    return null;
                }
            }).subscribe(integer -> out("accept:" + integer));
    
    Observable.class
        public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
            ObjectHelper.requireNonNull(mapper, "mapper is null");
            return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
        }
    
    ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
        final Function<? super T, ? extends U> function;
    
        public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
            super(source);
            this.function = function;
        }
        @Override
        public void subscribeActual(Observer<? super U> t) {
            source.subscribe(new MapObserver<T, U>(t, function));
        }
        
       MapObserver<T, U> extends BasicFuseableObserver<T, U> {
            final Function<? super T, ? extends U> mapper;
    
            MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
                super(actual);
                this.mapper = mapper;
            }
    
            @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
    
                if (sourceMode != NONE) {
                    actual.onNext(null);
                    return;
                }
    
                U v;
    
                try {
                    v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
                } catch (Throwable ex) {
                    fail(ex);
                    return;
                }
                actual.onNext(v);
            }
    

    可以看到 它是在 执行完 function.apply在执行 onNext();
    配合上一节 ,流程图就变成这样了


    加了map以后的流程

    4、一些总结

    来个总结吧: 估计源码看得很混乱。

    1、对Observable指定线程、数据变换等等,都采用了一种代理包装模式; 比如 ObservableJust-> ObservableSubscribeOn -> ObservableMap -> ObservableObserveOn ; 进行了一层包装;
    2、在订阅完成的那一刻起,反向调用 subscribe():subscribeActual()方法;比如 :(ObservableObserveOn.subscribe->ObservableObserveOn.subscribeActual())->(ObservableMap.subscribe->ObservableMap.subscribeActual())->(ObservableSubscribeOn.subscribe->ObservableSubscribeOn.subscribeActual())->(ObservableJust.subscribe->ObservableJust.subscribeActual())
    3、Observer ,同理包装 Observer -> ObservableMap... 添加了指定 Schedulers.createWorker() ;
    4、 Observer 的执行顺序是 Observer.onSubscribe() -> ObservableXX.onNext() -> ObsevableXXX.onNext() ->...-> Obsever.onNext() -> ObservableXX.OnComplete() -> ObsevableXXX.OnComplete() ->...-> Obsever.OnComplete();
    5、 中间有些操作放入到了线程当中.

    其实有点坑的是:原本我就知道这个流程应该是这样的,类似于事件分发机制成 U 字型的流程...... 本篇只是在 众多代码 中验证我的思路.................、

    相关文章

      网友评论

        本文标题:Rxjava源码解读笔记:线程、map数据操作

        本文链接:https://www.haomeiwen.com/subject/tcujfxtx.html