美文网首页
Rxjava2.0流式响应的核心思想及其架构源码浅析

Rxjava2.0流式响应的核心思想及其架构源码浅析

作者: Android程序员老鸦 | 来源:发表于2021-07-06 16:31 被阅读0次

这次扒一扒rxjava的源码,在此声明一下我写博客的目的不是为了什么给新人教导,完全是自己做做笔记而已,当然为了让自己的笔记更有意义和便于自己复习,也会尽量写的通俗易懂一些,期间也会到处借鉴别人写的好的一些博客,不是为了功利性目的,如果万幸你看到这篇文章,还请多多包容。
此次用到的版本是2.1.7,导入依赖:

    implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
    implementation 'io.reactivex.rxjava2:rxjava:2.1.7'

想想自己刚接触到rxjava的时候也是很懵逼的,因为没有事先了解到这个框架的核心思想,所以总带着自己以前的思维去看待,结果学起来就觉得很费劲,后来看到darren大佬的视频,讲到的响应式编程思想,瞬间就悟了。

其实编程最核心也最有魅力的地方,个人觉得不是去实现那些复杂的业务,而是发现世界万物的本质,提取其核心思想和提炼各个流程的关键步骤,然后用代码去实现他们封装他们表达他们。比如面向对象编程,光一句--万物皆是对象,其实就是一个非常具有哲理的话,而把这个思想具化成编程语言的主框架,这就是面向对象编程的牛逼之处。

个人是这样理解rxjava的核心思想的:
但凡做一件稍微复杂点的事情,日常生活中我们都是会把它分为几个步骤去完成,从第二个步骤开始之后的每一个步骤都是基于上一个步骤的结果来,通过层层加工作业最终做完了这件事情,最符合这种特征的事件就是工厂流水线作业了,假设要组装一个字符串abc,如果用rxjava来组装是这么写:

      Observable.just("a")
                .map(new Function<String, String>() {
                    @Override
                    public String apply(@NonNull String s) throws Exception {
                        return s + "b";
                    }
                })
                .map(new Function<String, String>() {
                    @Override
                    public String apply(@NonNull String s) throws Exception {
                        return s + "c";
                    }
                })
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {}
                    @Override
                    public void onNext(@NonNull String str) {
                        Log.d("TAG", str);
                    }
                    @Override
                    public void onError(@NonNull Throwable e) {}
                    @Override
                    public void onComplete() {}
                });

打印日志:

2021-07-06 14:15:03.564 30737-30737/com.trendlab.rxjavaex D/TAG: abc

可以看到从开始传入的a,经过2个map的分别加工,最终到最后一个步骤变成了abc,你可能觉得费这么老大劲实现这种功能实在没什么可夸奖的,这样的写法又有什么优越性呢?
这个例子我只是想说明rxjava的结构性是如流水线一样的工作原理,那我再举一个例子,涉及到异步线程和线程切换的事件组合。
比如从网上下载一张图片并放到我们的imageView上显示,如果不用rxjava你应该是这么写的:开一个线程下载图片,下载完后用handler切换到主线程给imageView设置一下图片,对吧?这样写不是不行,但是写起来非常麻烦。

开发者要自己管理线程和手动做线程切换调度,这些都不是业务代码,开发者在工作中应该尽量去专注业务而少去造轮子,而rxjava就是这种轮子,它已经帮我们把这些轮子帮我们造好了,看看它是怎么写的:
      Observable.just("https://img1.sycdn.imooc.com/5b8ab353000181b204000284.jpg")
                .map(new Function<String, Bitmap>() {
                    @Override
                    public Bitmap apply(@NonNull String s) throws Exception {
                        //1.下载图片
                        URL url = new URL(s);
                        URLConnection urlConnection = url.openConnection();
                        InputStream inputStream = urlConnection.getInputStream();
                        return BitmapFactory.decodeStream(inputStream);
                    }
                })
                .subscribeOn(Schedulers.io())//io线程下载
                .observeOn(AndroidSchedulers.mainThread())//android主线程接收图片bitmap
                .subscribe(new Consumer<Bitmap>() {
                    @Override
                    public void accept(Bitmap bitmap) throws Exception {
                        //2.设置图片
                        ivImage.setImageBitmap(bitmap);
                    }
                });

传入图片地址,下载图片,指定下载的线程,指定更新UI的线程,设置图片,简单吧?清晰明朗吧?一条线下来,一步一步走,这就是rxjava的核心思想,流式事件响应。
下面来研究它是怎么做到的。
先不管那些复杂的变换和线程调度,先从简单的一个观察者和被观察者的案例来看看,所谓的流式响应是怎样发生的:

      Observable.just("hello rxjava")//just方法构建了一个observable可观察对象
                 //new了一个Observer观察者 
                //subscribe()方法,观察者订阅了一个可观察对象
                .subscribe(new Observer<String>() {  
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                          Log.d("TAG", "onSubscribe");
                     }
                    @Override
                    public void onNext(@NonNull String str) {
                        Log.d("TAG", str);
                    }
                    @Override
                    public void onError(@NonNull Throwable e) {
                         Log.d("TAG", "onError");
                     }
                    @Override
                    public void onComplete() {
                        Log.d("TAG", "onComplete");
                    }
                });

这里不能以以前的观察者和被观察者逻辑去理解,rxjava的思想是响应式编程,所以它要做的事情都是即时的,而且它的订阅者只有一个,并且在调用subscribe()方法的时候被观察者就会开始行动执行数据流转了。
先来看看just()方法,它的任务就是创建了一个Observable,可观察对象:

    public static <T> Observable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "The item is null");
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }

    //这个方法可理解成直接返回了入参source,因为我们没有给onObservableAssembly赋值
     public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
      }

//看看这个ObservableJust类


   // 继承自Observable,实现了ScalarCallable接口
   public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
      //这个value是我们传的那个字符串
      private final T value;
      public ObservableJust(final T value) {
        this.value = value;
      }
      //这个方法很重要,到时候会用到
      @Override
      protected void subscribeActual(Observer<? super T> s) {
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        s.onSubscribe(sd);
        sd.run();
      }
      @Override
      public T call() {
          return value;
      }
    }

由此我们可以知道just()方法的主要工作就是new了一个ObservableJust对象,这个对象是Observable的子类,实际上Observable有很多子类,这里我们只分析其中一种,其他的原理类似。
再来看看订阅者,这里直接new了一个匿名类Observer,看看Observer的代码:

public interface Observer<T> {
    void onSubscribe(@NonNull Disposable d);
    void onNext(@NonNull T t);
    void onError(@NonNull Throwable e);
    void onComplete();
}

很简单的一个接口,看来关键代码就是那个subscribe()了:

  public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            //把observer作为入参,类似onAssembly方法,其实就是返回了入参observer
            observer = RxJavaPlugins.onSubscribe(this, observer);
            //判空操作
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            //熟悉的方法,这个方法就是之前提到要注意的的observable里的方法
            subscribeActual(observer);
        } catch (NullPointerException e) { 
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            RxJavaPlugins.onError(e);
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

所以这里的关键代码其实就是调用了observable的subscribeActual()方法,我们再来回顾一下这个方法做了哪些动作,从而调动了观察者接收被观察者的行为:

 @Override
      protected void subscribeActual(Observer<? super T> s) {
        //new了一个ScalarDisposable对象,注意入参是观察者和被观察者的value
        ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
        //首先调用了Observer的onSubscribe()方法,
        s.onSubscribe(sd);
        //sd的run() 方法
        sd.run();
      }

看看ScalarDisposable类:

  /**
     * Represents a Disposable that signals one onNext followed by an onComplete.
     *
     * @param <T> the value type
     */
    public static final class ScalarDisposable<T>
    extends AtomicInteger
    implements QueueDisposable<T>, Runnable {

        private static final long serialVersionUID = 3880992722410194083L;

        final Observer<? super T> observer;

        final T value;

        static final int START = 0;
        static final int FUSED = 1;
        static final int ON_NEXT = 2;
        static final int ON_COMPLETE = 3;
        //注意入参是观察者和被观察者的value
        public ScalarDisposable(Observer<? super T> observer, T value) {
            this.observer = observer;
            this.value = value;
        }

        @Override
        public boolean offer(T value) {
            throw new UnsupportedOperationException("Should not be called!");
        }

        @Override
        public boolean offer(T v1, T v2) {
            throw new UnsupportedOperationException("Should not be called!");
        }

        @Nullable
        @Override
        public T poll() throws Exception {
            if (get() == FUSED) {
                lazySet(ON_COMPLETE);
                return value;
            }
            return null;
        }

        @Override
        public boolean isEmpty() {
            return get() != FUSED;
        }

        @Override
        public void clear() {
            lazySet(ON_COMPLETE);
        }

        @Override
        public void dispose() {
            set(ON_COMPLETE);
        }

        @Override
        public boolean isDisposed() {
            return get() == ON_COMPLETE;
        }

        @Override
        public int requestFusion(int mode) {
            if ((mode & SYNC) != 0) {
                lazySet(FUSED);
                return SYNC;
            }
            return NONE;
        }
        //run()方法被调用
        @Override
        public void run() {
            //判断一下当前状态,如果是START 则将其设置为ON_NEXT
            if (get() == START && compareAndSet(START, ON_NEXT)) {
                //执行了observer的onNext()方法
                observer.onNext(value);
                if (get() == ON_NEXT) {
                    lazySet(ON_COMPLETE);
                    //执行了observer的onComplete()方法
                    observer.onComplete();
                }
            }
        }
    }

由此可知,一旦订阅了,则被订阅者就会开始执行任务把结果传给被订阅者,而且会通知被订阅者执行相关的方法,这就是它基本的工作流程,下篇我们看看事件变换map()是怎么包装一层层转换事件的。

相关文章

网友评论

      本文标题:Rxjava2.0流式响应的核心思想及其架构源码浅析

      本文链接:https://www.haomeiwen.com/subject/ocajultx.html