美文网首页
Android RxJava使用分析(1)

Android RxJava使用分析(1)

作者: Bfmall | 来源:发表于2023-04-09 15:17 被阅读0次

源码地址:

RxJava和RxAndroid都是结合使用,地址如下:
RxJava的github地址:https://github.com/ReactiveX/RxJava
RxAndroid的github地址:https://github.com/ReactiveX/RxAndroid

一、RxJava是什么?

1.Rx(Reactive Extensions)是一个库,用来处理【事件】和【异步】任务,在很多语言上都有实现,RxJava是Rx在Java上的实现。简单来说,RxJava就是处理异步的一个库,最基本是基于观察者模式来实现的。通过Obserable和Observer的机制,实现所谓响应式的编程体验。它扩展了观察者模式以支持数据/事件序列,并添加了运算符,使您可以声明性地将序列组合在一起,同时抽象化了对低级线程,同步,线程安全和并发数据结构等问题的关注。
2、RxAndroid是基于RxJava适用于Android的封装;官方给的这句大概会让你明白适配封装了啥:More specifically, it provides a Scheduler that schedules on the main thread or any given Looper.

二、概念

1.Observable 被观察者 ,事件源。是一个抽象类。
------ 1.1 ObservableEmitter 发射器;
2.Observer 观察者,事件接收处理。是一个接口。
3.subscribe 订阅,把被观察者和观察者关联起来。

先看下面RxJavaAndroid .java就好理解了这几个概念了。

三、基础使用

1.集成依赖
build.gradle中dependencies 下依赖;代码如下(示例):

dependencies {
    implementation "io.reactivex.rxjava3:rxjava:3.0.12"
    implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
}

2.用法及分析

public class RxJavaAndroid {

    public static void main(String[] args) {
        doRxJava();
    }
    private static void doRxJava() {

        //通过 Observable.create 创建被观察者,Observable是一个抽象类
        Observable<String> observable = Observable.create(
                new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                        //发射器发送消息
                        emitter.onNext("hello world");
                        //通过发射器发射异常
                        //emitter.onError(new Throwable("模拟一个异常"));
                        //发射完成
                        emitter.onComplete();
                    }
                }
        );
        
        //通过 new Observer 创建观察者;Observer是一个 interface
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                //第一个执行的方法
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(@NonNull String s) {
                System.out.println("onNext>>>" + s);
            }
                @Override
            public void onError(@NonNull Throwable e) {
                System.out.println("onError>>>" + e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        };
        
        //通过被观察者 Observable 的 subscribe (订阅) 绑定观察者 Observer
        observable.subscribe(observer);
    }

}

代码执行结果如下:


image.png

查看Observable源码

Observable 是数据的上游,事件源,即事件生产者。

    //一:接看代码 Observable.create()方法
    public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
        //ObservableOnSubscribe是一个只有subscribe方法的接口
        Objects.requireNonNull(source, "source is null");//判空
        return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
    }

    //二:继续向下看,进入RxJavaPlugins.onAssembly方法
    /**
     * Calls the associated hook function.
     * @param <T> the value type
     * @param source the hook's input value
     * @return the value returned by the hook
     */
     @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
//源码注释有介绍,Calls the associated hook function. hook java反射、钩子。底层管用黑科技。
//apply(f, source)的返回值依旧是Observable,里面不过多探究。

综上我们能发现
Observable.create(new ObservableOnSubscribe() { })
相当于
new ObservableCreate(new ObservableOnSubscribe() { }))
总结:事件的源就是 new ObservableCreate()对象,将 ObservableOnSubscribe 作为参数传递给 ObservableCreate 的构造函数。

查看Observer 源码

Observer 是数据的下游,即事件消费者。Observer 是个 interface。
查看源码,只有如下几个方法

  void onSubscribe(@NonNull Disposable d);
  void onNext(@NonNull T t);
  void onError(@NonNull Throwable e);
  void onComplete();

查看observable.subscribe 源码:

  public final void subscribe(@NonNull Observer<? super T> observer) {
        Objects.requireNonNull(observer, "observer is null"); //判空
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);//通过hook返回observer

             Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);//核心、真正实现订阅的方法,仔细看subscribeActual是一个抽象方法,所以我们需要去Observable的实现类ObservableCreate中去查看
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }

ObservableCreate类中发现

  @Override
    protected void subscribeActual(Observer<? super T> observer) {
   
        CreateEmitter<T> parent = new CreateEmitter<>(observer); //创建发射器,并把观察者当参数传递给发射器
        observer.onSubscribe(parent);//直接回调了observer的onSubscribe方法

        try {
            source.subscribe(parent);//source为Observable,事件源
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

由上述源码可以看到,在ObservableCreate类的subscribeActual方法下,调用了Observer(观察者即事件接收者)的onSubscribe,并把observer对象当做参数专递给了发射器CreateEmitter;同时将发射器CreateEmitter传递给了事件源。因此可以得出结论:
只有当 observable.subscribe(observer);时,发射器才会被创建,Observer才会被绑定onSubscribe; Observable的subscribe方法才会执行发射事件和数据;此时Observable和Observer的方法和回调都已经准备就绪,只待发送与接收。换言之,事件流是在订阅后才产生的。而 observable 被创建出来时并不生产事件,同时也不发射事件。

ObservableEmitter发射器是如何发生的呢?

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public ObservableEmitter<T> serialize() {
            return new SerializedEmitter<T>(this);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        @Override
        public String toString() {
            return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
        }
    }

CreateEmitter<>(observer);
CreateEmitter实现了ObservableEmitter,同时ObservableEmitter继承自Emitter,CreateEmitter 还实现了 Disposable 接口,这个 disposable 接口是用来判断是否中断事件发射的。
CreateEmitter的主要方法如下:

  public void onNext(T t) {}
  public void onError(Throwable t) {}
  public boolean tryOnError(Throwable t) {}
  public void onComplete() {}

前面我们说了 observer是个接口,与 observer的方法神似,几乎一一对应。
然后我们来分析一下CreateEmitter主要方法内都做了什么。

   @Override
        public void onNext(T t) {
            if (t == null) {
                onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
                return;
            }
            if (!isDisposed()) {    //判断是否丢弃
                observer.onNext(t); //调用Emitter的onNext,它会直接调用observer的 onNext
            }
        }
        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);  //调用 Emitter 的 onError,它会直接调用 observer 的 onError
            }
        }
         @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = ExceptionHelper.createNullPointerException("onError called with a null Throwable.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose(); //执行完中断发射
                }
                return true;
            }
            return false;
        }
        @Override
        public void onComplete() {
            if (!isDisposed()) {        /判断是否丢弃
                try {
                    observer.onComplete();//调用Emitter的onComplete,它会直接调用observer的 onComplete
                } finally {
                    dispose(); //执行完中断发射
                }
            }
        }

CreateEmitter 的 onError 和 onComplete 方法任何一个执行完都会执行 dispose()中断事件
发射,所以 observer 中的 onError 和 onComplete 也只能有一个被执行。

结合以上,我们知道了,当订阅成功后,数据 源ObservableOnSubscribe 开始生产事件 , 调用 Emitter 的 的 onNext ,onComplete 向下游发射事件,Emitter 包含了 observer 的引用,又调用了 observer onNext ,onComplete ,这样下游observer 就接收到了上游发射的数据。

四、调用流程分解

4.1 :将调用流程代码进行分解

public void test() {
        //事件源source
        ObservableOnSubscribe source = new ObservableOnSubscribe<Object>() {

            /**
             * 回调方法subcribe
             * @param emitter 发射器参数
             * @throws Exception
             */
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                Log.i(TAG, "test==>create...subscribe...");
                // TODO 第二步 发射器emitter调用onNext发送消息
                emitter.onNext("hello world");
                // TODO 通过发射器发射异常(如果有异常执行onError方法,就不会再执行onComplete,否则不执行onError而是执行onComplete)
//                emitter.onError(new Throwable("模拟一个异常"));
                // TODO 发射完成
                emitter.onComplete();
            }
        };

        Observable.create(source)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Object>() {//匿名内部类Observer,后面分析
                    @Override
                    public void onSubscribe(Disposable d) {
                        //TODO 第一步,发起订阅
                        Log.i(TAG, "test==>onSubscribe");
                    }

                    @Override
                    public void onNext(Object o) {
                        // TODO 第三步,
                        Log.i(TAG, "test==>onNext");
                    }

                    @Override
                    public void onError(Throwable e) {
                        //TODO 第四步(有异常执行onError不执行onComplete)
                        Log.i(TAG, "test==>onError");
                    }

                    @Override
                    public void onComplete() {
                        // TODO 第四步(无异常执行onComplete不执行onError)
                        Log.i(TAG, "test==>onComplete");
                    }
                });
    }

4.1.1 首先看下Observable.create()方法

/**
 * //Observable.java
 *
 * Observable.create->返回ObservableCreate(source)
 * ObservableCreate是Observable的子类
 *
 * ObservableOnSubscribe source事件源
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    //为hook函数(钩子函数),实际返回的是ObservableCreate
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

4.1.2 接着看ObservableCreate(可以看做自定义被观察者)类的实现,是被观察者Observable的子类

//被观察者Observable的子类
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

    //先记住此处的ObservableCreate的静态内部类CreateEmitter,后续分析
    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
    //省略部分代码
    ......

    }
}

4.1.3 执行Observable的subscribeOn方法,传递的参数为Schedulers.io(),内部的实现是个线程调度器IoScheduler,后面再对Scheduler进行分析

/**
 * //Observable.java
 * 此处scheduler=IoScheduler
 *
 * ObservableSubscribeOn 参数包含(传递source,ioscheduler)
 *
 */
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    //此处RxJavaPlugins.onAssembly为hook函数,实际返回ObservableSubscribeOn对象
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

4.1.4 接着看下 Observable的subscribe方法,执行订阅

/**
 * //Observable.java
 * 调用subscribe方法
 *
 */
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        //RxJavaPlugins.onSubscribe 为hook函数
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

        //实际调用的是subscribeActual
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

调用Observable的subscribeActual方法

protected abstract void subscribeActual(Observer<? super T> observer);

可以发现是个抽象类,实际调用的是Observable的子类ObservableCreate的subscribeActual方法

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //上面我们提到的CreateEmitter发射器,持有匿名内部类对象observer
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //调用observer的onSubscribe方法
        observer.onSubscribe(parent);

        try {
            //回调上面最初提到的事件源source的subscribe方法,执行订阅
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

    ......

}

上面创建了CreateEmitter发射器,在这里我们调用了observer.onSubscribe(parent),也就是我们创建的匿名observer类的onSubscribe方法。

source.subscribe(parent)最重要的方法,观察者和被观察者顺利会师,事件开始执行:

            /**
             * 回调方法subcribe
             * @param emitter 发射器参数
             * @throws Exception
             */
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                Log.i(TAG, "test==>create...subscribe...");
                // TODO 第二步 发射器emitter调用onNext发送消息
                emitter.onNext("hello world");
                // TODO 通过发射器发射异常(如果有异常执行onError方法,就不会再执行onComplete,否则不执行onError而是执行onComplete)
//                emitter.onError(new Throwable("模拟一个异常"));
                // TODO 发射完成
                emitter.onComplete();
            }

看下CreateEmitter的onNext和onComplete方法,就是判断任务是否取消,没有取消则调用观察者的onNext和onComplete方法

       @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

4.1.5 执行流程总结:
1 create方法传返回了一个对象是ObservableCreate,ObservableCreate的构造方法中有一个ObservableOnSubscribe对象,也就是我们使用create时候创建的匿名内部类对象。
2 p.subscribe(o)实际上调用了ObservableCreate的subscribeActual方法
3 subscribeActual中首先调用了 observer的onSubscribe方法,紧接着调用了source.subscribe(parent)也就是ObservableOnSubscribe的subscribe方法,事件开始执行
4 subscribe方法中调用CreateEmitter的onNext方法,这个方法调用了observer的onNext方法,观察者对事件进行反应.
5 subscribe方法中调用CreateEmitter的onComplete方法,这个方法调用了observer的onComplete方法,整个流程结束。

————————————————
参考:
https://blog.csdn.net/Kern_/article/details/115701475
https://segmentfault.com/a/1190000020062497?utm_source=tag-newest
https://blog.csdn.net/qq_40270270/article/details/113726399

相关文章

网友评论

      本文标题:Android RxJava使用分析(1)

      本文链接:https://www.haomeiwen.com/subject/apprddtx.html