美文网首页
Rxjava源码分析总结

Rxjava源码分析总结

作者: gogoingmonkey | 来源:发表于2021-03-11 16:25 被阅读0次

    Rxjava优点

    个人认为优点有下面几个:
    1.使用结构、逻辑清晰;
    2.线程切换非常方便;
    3.只要流程发生错误,一定会执行onError(),不用到处处理;
    4.操作符强大;

    基础用法推荐

    https://www.jianshu.com/p/cd3557b1a474
    https://www.cnblogs.com/liushilin/p/7058302.html
    以上两篇写的很全了,下面是 版本2和版本3的区别,有兴趣的去看下
    https://blog.csdn.net/weixin_45258969/article/details/95386872?utm_medium=distribute.down_relevant_right.none-task-blog-BlogCommendFromBaidu-1.nonecase&depth_1-utm_source=distribute.down_relevant_right.none-task-blog-BlogCommendFromBaidu-1.nonecase

    本文主要分析 subcribe、 线程切换、操作符map,相信看完你就明白Rxjava的玩法,其他的操作符也是基于这个思路。

    源码分析

    1、前提:看源码还是主要先看主线,不要高的无法自拔,尤其是Rxjava这种比较绕的,很多人对他源码不熟就是因为点进去一看,不知道在干什么,怎么实现的直接放弃。
    2、Rxjava的观察者模型是基于Java的观察者模型的一个变种,最大区别就是标准观察者是一个被观察,多个观察者,比如微信公众号推文的模型。Rxjava则是只有一个观察者来消费,看源码重点关注,被观察调用 subscribe订阅方法,把二者绑在一起的。

    subscribe 订阅源码分析

    时序图如下:


    image.png

    我们看下调用方式:

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    //....
                }
            })
                    .subscribe(new Consumer<Integer>() {
                    //...      
                    });
    

    省略部分代码,核心就是 Observable.create(自定义发送事件)调用订阅(目标对象)。
    我们首先理清Observable.create是什么,点进去看下源码,用到的是 RxJavaPlugins.onAssembly方法,里面其实啥都没干,直接把传入的对象返回了,因为判断变量是空的,如果我们想监控整个Rxjava执行了哪些,我们可以自定义一个,就会先走自定义的逻辑。继续看,下面代码的注释1处,是把传入的事件自定义发生源头直接赋值给了 ObservableCreate的成员变量source。

       public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    

    我们在扫一眼, subscribeActual方法,是RxJava中比较重要的一个方法,定义的一个抽象方法,整个Rxjava做事就是靠这个方法实现的,

    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;  // 1.这个是把目标 赋值给了对象的成员变量
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            // 对传入的观察者进行包装
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            // 调用观察者的订阅回调方法
            observer.onSubscribe(parent);
            try {
                // 真正执行订阅的地方
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    
        static final class CreateEmitter<T> extends AtomicReference<Disposable> 
            implements ObservableEmitter<T>, Disposable {
            final Observer<? super T> observer;
    
            CreateEmitter(Observer<? super T> observer) {
                this.observer = observer;
            }
    
            @Override
            public void onNext(T t) {
                if (t == null) {
                    onError(new NullPointerException("onNext ..."));
                    return;
                }
                if (!isDisposed()) {
                    // 调用传入的观察者的 onNext() 方法
                    observer.onNext(t);
                }
            }
    
            @Override
            public void dispose() {
                // 取消订阅
                DisposableHelper.dispose(this);
            }
            // ...
        }
        // ...
    }
    

    再看下 subscribe(目标)方法 ,调用这个方法的是 ObservebleCreate对象。目标就是观察找对象。

    public final void subscribe(Observer<? super T> observer) {
      ...
            try {
                observer = RxJavaPlugins.onSubscribe(this, observer);
                ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
                subscribeActual(observer);   //调用这个核心方法
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
       // ...
            }
        }
    

    主要调用了抽象方法:

     protected abstract void subscribeActual(Observer<? super T> observer);
    

    看到实现类很多,各种操作符的都有,我们直接看 ObservebleCreate这个类中,上面代码

     protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer); //1.对目标包装成发射器
            observer.onSubscribe(parent); //2,调用方法
    
            try {
                source.subscribe(parent); //3,  自定义被观察着调用 被包装的目标观察者
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        } 
    

    如上面的注释1:对目标包装成发射器,Rxjava的链式调用 就是各种包装再拆包装,
    注释2处,主要是调用目标观察者的方法,所以每次这个方法调用;
    注释3处,ObservebleCreate调用 订阅方法,同样,这个调用也是一个抽象方法

    void subscribe(ObservableEmitter<T> e) throws Exception;
    

    实现的就是我们自定义的 方法如下:

            Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> e) throws Exception {
                    e.onNext("XXX");
                }
            })
    

    这下就比较清晰了,当执行订阅方法后,回调我们自定义的订阅方法,

    我们在看线程切换实现源码

    同样的还是先看下我们调用

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    //....
                }
            })
      .subscribeOn(Schedulers.io()) // 上面 异步
                    .observeOn(AndroidSchedulers.mainThread()) // 下面 主线程
                    .subscribe(
    
    

    我们重点看下面两行代码:

    被观察者(ObservableCreate)
    .observeOn(Schedulers.io()) 
              .subscribeOn()
    .subscribe(观察者
    

    明确两个东西,谁来调用的,是ObservableCreate,传入的是什么参数,观察者

    1.我们先看Schedulers.io()是什么:

    还是一样的套路,schedulers中定义变量

    public final class Schedulers {
        @NonNull
        static final Scheduler SINGLE;
    
        @NonNull
        static final Scheduler COMPUTATION;
    
        @NonNull
        static final Scheduler IO;
    
        @NonNull
        static final Scheduler TRAMPOLINE;
    
        @NonNull
        static final Scheduler NEW_THREAD;
    
    

    IO 在静态代码块中初始化了:

    IO = RxJavaPlugins.initIoScheduler(new Callable<Scheduler>() {
                @Override
                public Scheduler call() throws Exception {
                    return IoHolder.DEFAULT;
                }
            });
    

    在看下IoHolder.DEFAULT :是直接new IoScheduler() 里面是直接创建 线程工厂new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority); 着就到了他的 线程池

    总结起来就是Scheduler

    2.subscribeOn方法:

    先看ObservableSubscribeOn类,当发生订阅会执行到方法

     @Override
        public void subscribeActual(final Observer<? super T> s) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); //1
    
            s.onSubscribe(parent);
    
            parent.setDisposable(scheduler.scheduleDirect(new Runnable() {  //3
                @Override
                public void run() {
                    source.subscribe(parent);// 2
                }
            }));
        }
    

    我们可以看到上面的1处先把观察者包装为parent ,然后调用2处的方法传入包装后的观察者,很明显就是放到 子线程

    3.再看observeOn()方法:

    同样,传入的其实就是一个主线程 的handler ,

     public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            ObjectHelper.verifyPositive(bufferSize, "bufferSize");
            return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
        }
    

    看到这个方法中还是同样的套路,直接把传入的 Scheduler包装成一个 ObservableObserveOn

    忽略下 Rxjava 中的命名.之前的 ObservableCreate 和 ObservableObserveOn .都是Observable + 具体的方法

    我们看下ObservableObserveOn 的方法:

    public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
        final boolean delayError;
        final int bufferSize;
        public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
            super(source);
            this.scheduler = scheduler;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                Scheduler.Worker w = scheduler.createWorker(); //1.这里
    
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    ...
    

    省略了部分代码,首先是把刚才传入的schedule 线程池相关的赋值给自己成员变量.同样的,当调用订阅方法的时候,还是会调用到抽象方法 subscribeActual 就这样实现切换到主线程

    操作符 map

     .map(new Function<String, Bitmap>() {
                @Override
                public Bitmap apply(String s) throws Exception {
                    URL url = new URL(PATH);
                    HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
                    httpURLConnection.setConnectTimeout(5000);
                    int responseCode = httpURLConnection.getResponseCode(); // 才开始 request
                    if (responseCode == HttpURLConnection.HTTP_OK) {
                        InputStream inputStream = httpURLConnection.getInputStream();
                        Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
                        return bitmap;
                    }
                    return null;
                }
            })
    

    操作符其实实现是一样的。


    image.png

    相关文章

      网友评论

          本文标题:Rxjava源码分析总结

          本文链接:https://www.haomeiwen.com/subject/orcoqltx.html