美文网首页
RxJava的源码理解及手动写一个简单的RxJava

RxJava的源码理解及手动写一个简单的RxJava

作者: cao苗子 | 来源:发表于2019-11-18 11:41 被阅读0次

    1.简单使用

    Observable
                .just("miaozi")// 返回 ObservableJust
                .map(new Function<String, String>() {
                    @Override
                    public String apply(@NonNull String s) throws Exception {
                        //可进行耗时操作
                        return s;
                    }
                })// 返回 ObservableMap
                .subscribeOn(Scheduler.io())// 返回 ObservableSubscribeOn
                .observeOn(Scheduler.mainThread())// ObservableObserveOn
                .subscribe(new Consumer<String>() {//开始执行
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e("TAG",s);
                    }
                });
    

    这个是我写的一个简单的RxJava中的map使用和子线程和主线程之间的调度使用。目的主要是加深对RxJava的理解。说实话,我自己是看视频对源码进行分析的,分析了有一个星期的时间吧,到现在才弄懂其中的源码。

    RxJava是一种响应式的编码思想,可以是采用链式和递归的方式来调用的,还采用的静态代理的设计模式来代理使用,下面我在一一介绍和分析。

    2.分析

    第一步:just返回的是一个ObservableJust对象,里面存储一个 value 值 和实现了 subscribeActual方法。

    public final class ObservableJust<T> extends Observable<T>{
        private final T value;
        public ObservableJust(final T value) {
            this.value = value;//miaozi
        }
    
        @Override
        protected void subscribeActual(Observer<T> observer) {
            Log.e("TAG","ObservableJust" + " observer="+observer.toString());
            ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
            observer.onSubscribe();
            sd.run();
        }
    }
    

    第二步:第一步的基础之上 ObservableJust.map() 返回一个ObservableMap对象,里面有两个值,一个是source,source其实就是ObservableJust对象。一个是function对象。还是实现了subscribeActual方法。

    /**
     * created by panshimu
     * on 2019/11/13
     */
    public class ObservableMap<T, U> extends Observable<U>{
        final Observable<T> source;//其实就是 ObservableJust
        final Function<T,U> function;
        public ObservableMap(Observable<T> source, Function<T,U> function) {
            this.function = function;
            this.source = source;
        }
    
        @Override
        protected void subscribeActual(Observer<U> observer) {
            Log.e("TAG","ObservableMap"+" observer="+observer.toString());
            //静态代理 调用的是ObservableJust中的subscribeActual
            source.subscribe(new MapObserver<T>(observer,function));
        }
    
        private final class MapObserver<T> implements Observer<T> {
            final Function<T,U> function;
            final Observer<U> observer;
            public MapObserver(Observer<U> observer, Function<T,U> function) {
                this.function = function;
                this.observer = observer;
            }
            @Override
            public void onSubscribe() {
                observer.onSubscribe();
            }
            @Override
            public void onNext(@NonNull T t) {
                try {
                    U apply = function.apply(t);
                    observer.onNext(apply);
                }catch (Exception e){
                    observer.onError(e);
                }
            }
            @Override
            public void onError(@NonNull Throwable e) {
                observer.onError(e);
            }
    
            @Override
            public void onComplete() {
                observer.onComplete();
            }
        }
    }
    

    第三步骤:切换子线程 subscribeOn 返回一个ObservableSubscribeOn对象,里面有两个值,一个是source,source就是上一个ObservableMap对象,还有一个值就是Scheduler,并且也实现了subscribeActual方法。

    package com.miaozi.myrxjava;
    import android.util.Log;
    /**
     * created by panshimu
     * on 2019/11/15
     */
    public class ObservableSubscribeOn<T> extends Observable<T> {
        final Observable<T> source;//上一个 ObservableMap
        final Scheduler scheduler;//IOScheduler
        public ObservableSubscribeOn(Observable<T> source, Scheduler scheduler) {
            this.source = source;
            this.scheduler = scheduler;
        }
    
        @Override
        protected void subscribeActual(Observer<T> observer) {
            Log.e("TAG","ObservableSubscribeOn" + " observer="+observer.toString());
           //执行的是 IOScheduler 中的 scheduleDirect 然后会调用 SubscribeTask 中的run方法
            scheduler.scheduleDirect(new SubscribeTask(source,observer));
        }
    }
    
    
    public class SubscribeTask<T> implements Runnable {
        Observer<T> observer;//
        final Observable<T> source;//ObservableMap
    
        public SubscribeTask(Observable<T> source, Observer<T> observer) {
            this.source = source;
            this.observer = observer;
        }
        @Override
        public void run() {
            Log.e("TAG", "SubscribeTask" + " 切换到子线程-->"+ source.toString());
            //调用ObservableMap中的 subscribeActual 
            source.subscribe(observer);
        }
    }
    

    第四步:observeOn 切换回主线程 返回一个 ObservableObserveOn 对象,这个里面也是存储两个值,一个是source,source就是上一层ObservableSubscribeOn对象,另一个就是Scheduler了。这个也实现了subscribeActual方法。

    package com.miaozi.myrxjava;
    
    import android.util.Log;
    
    import androidx.annotation.NonNull;
    
    /**
     * created by panshimu
     * on 2019/11/15
     */
    class ObservableObserveOn<T> extends Observable<T> {
        Observable<T> source;//ObservableSubscribeOn
        Scheduler scheduler;
        public ObservableObserveOn(Observable<T> source, Scheduler scheduler) {
            this.source = source;
            this.scheduler = scheduler;
        }
    
        @Override
        protected void subscribeActual(Observer<T> observer) {
            Log.e("TAG","ObservableObserveOn"+ " observer="+observer.toString());
            //调用ObservableSubscribeOn中的subscribeActual
            source.subscribe(new ObserveOnObserver(observer));
        }
    
        private class ObserveOnObserver implements Observer<T>, Runnable {
            private T item;
            final Observer<T> observer;
            public ObserveOnObserver(Observer<T> observer) {
                this.observer = observer;
            }
    
            @Override
            public void onSubscribe() {
                observer.onSubscribe();
            }
    
            @Override
            public void onNext(@NonNull T t) {
                item = t;
                scheduler.scheduleDirect(this);
            }
    
            @Override
            public void onError(@NonNull Throwable e) {
                observer.onError(e);
            }
    
            @Override
            public void onComplete() {
                observer.onComplete();
            }
    
            @Override
            public void run() {
                observer.onNext(item);
            }
        }
    }
    

    第五步:subscribe 调用的时候ObservableObserveOn父类中的subscribe,最终调用ObservableObserveOn中的subscribeActual(observer); observer就是Consumer。也就是调用上层的subscribeActual。

    可以发现他其实是一条链子在调用,到最后一个要开始执行的时候,根据它的source也就是上级,一层一层的网上递归的调用,知道最上层然后执行方法又一层一层往回调用。

    这里说明一下切换主线程和子线程,看源码不难发现切换到子线程是开启一个线程池,然后把 source.subscribe(observer);放到子线程的run方法中,就可以实现子线程的切换,然后又怎么切换到主线程呢?

    切换主线程:

    package com.miaozi.myrxjava;
    
    import android.os.Handler;
    import android.os.Message;
    
    /**
     * created by panshimu
     * on 2019/11/18
     */
    public class MainScheduler extends Scheduler {
        private Handler handler;
        public MainScheduler(Handler handler) {
            this.handler = handler;
        }
        @Override
        public void scheduleDirect(Runnable runnable) {
            Message obtain = Message.obtain(handler, runnable);
            handler.sendMessage(obtain);
        }
    }
    
    

    这段话很简单把?一开始我也猜想到是用 handler 的方式,一看就是。但是跟我们正常的切换时不一样的,并没有实现handlerMassage()方法的重写,是因为handler的源码中放我们实现了,具体去看handler源码中的callback。

    再看MapObserver中的onNext()

    public void onNext(@NonNull T t) {
                try {
                    U apply = function.apply(t);
                    observer.onNext(apply);
                }catch (Exception e){
                    observer.onError(e);
                }
            }
    

    这里的onNext就是不断的下载传递,进行value的转换,最后回执行到LambdaObserver中的onNext()

       @Override
        public void onNext(@NonNull T t) {
            try {
                this.onNext.accept(t);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    

    看到这里就可以看到 accept()方法了。这是最后最后的回调回去了。

    附上源码连接:
    https://github.com/panshimu/MyRxJava2

    相关文章

      网友评论

          本文标题:RxJava的源码理解及手动写一个简单的RxJava

          本文链接:https://www.haomeiwen.com/subject/expvictx.html