RxJava入门

作者: softSnow | 来源:发表于2018-10-18 13:22 被阅读13次

前言

  • 什么是RxJava? 简单来说,RxJava是基于观察者模式,提供便捷的异步操作的一套API。
  • RxJava好在哪?它提供了一系列丰富的操作符,支持链式调用,可以便捷的进行线程的切换。
  • 本文基于RxJava 2.2.2,是自己学习过程中的笔记,方便以后查阅使用

简介

RxJava最基本的两个元素:
1 Observable(被观察者)
2 Observer(观察者)
我们通过subscribe(订阅)便可使它们形成订阅关系。下面就看一下它们最基本的实现:

创建一个Observable:

Observable observable=Observable.create(new ObservableOnSubscribe<String>() {
                           @Override
                           public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                               
                           }
                       });

创建一个Observer:

Observer observer =new Observer<String>() {
                          @Override
                          public void onSubscribe(Disposable disposable) {
                              
                          }

                          @Override
                          public void onNext(String s) {

                          }

                          @Override
                          public void onError(Throwable throwable) {

                          }

                          @Override
                          public void onComplete() {

                          }
                      };

订阅:

observable.subscribe(observer);

链式调用:

Observable.create(new ObservableOnSubscribe<String>() {
                           @Override
                           public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                              
                           }
                       }).subscribe(new Observer<String>() {
                           @Override
                           public void onSubscribe(Disposable disposable) {
                               
                           }

                           @Override
                           public void onNext(String s) {

                           }

                           @Override
                           public void onError(Throwable throwable) {

                           }

                           @Override
                           public void onComplete() {

                           }
                       });

以上,就可以简单的使用RxJava了。。。what? 辣鸡,写了点啥?
1 observable.subscribe(observer); 这个方法干了啥?

observable.subscribe(observer); //这个方法干了啥?

2 ObservableOnSubscribe. subscribe(ObservableEmitter<String> observableEmitter)这个方法啥时候调用,observableEmitter怎么来的?,干什么的?

              new ObservableOnSubscribe<String>() {
                           @Override
                           public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                              //这个方法啥时候调用,observableEmitter怎么来的?,有什么用?
                           }
                       }

首先,我们从头开始看,Observable是个抽象类,其静态方法,Observable.create(ObservableOnSubscribe<T> source ):

 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");//对传进来的source参数判空,如果是null,则抛出异常。
        return RxJavaPlugins.onAssembly(new ObservableCreate(source));
    }

看看onAssembly这个方法

 public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        return f != null ? (Observable)apply(f, source) : source;//f 初始值是null的,也就是说这个方法返回值便是传进来的source参数,至此Observable创建完成
    }

至此,Observable创建流程便清楚了,大体就是通过调用 Observable.create(ObservableOnSubscribe<T> source )方法。new一个ObservableCreate对象并返回,接下来再看看observable.subscribe(observer) 这个方法

public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");//判空

        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
            this.subscribeActual(observer);//通过上面我们知道这个实际上调用了ObservableCreate.subscribeActual(observer)
        } catch (NullPointerException var4) {
            throw var4;
        } catch (Throwable var5) {
            Exceptions.throwIfFatal(var5);
            RxJavaPlugins.onError(var5);
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(var5);
            throw npe;
        }
    }

通过上面可以看出,实际上调用了ObservableCreate.subscribeActual(observer),再看看 ObservableCreate这个类

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;//就是create中的ObservableOnSubscribe对象
    }

/**
     *实现了subscribeActual这个抽象方法
     */

    protected void subscribeActual(Observer<? super T> observer) {
        ObservableCreate.CreateEmitter<T> parent = new ObservableCreate.CreateEmitter(observer);
        observer.onSubscribe(parent);//observer便是我们创建的observer对象

        try {
            this.source.subscribe(parent);//也就是create中的ObservableOnSubscribe对象的suscribe方法
        } catch (Throwable var4) {
            Exceptions.throwIfFatal(var4);
            parent.onError(var4);
        }

    }
...
}

static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
        private static final long serialVersionUID = -3434801548987643227L;
        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;//我们创建的Observer
        }

        public void onNext(T t) {
            if (t == null) {
                this.onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            } else {
                if (!this.isDisposed()) {
                    this.observer.onNext(t);
                }

            }
        }

        public void onError(Throwable t) {
            if (!this.tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }

        }

        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }

            if (!this.isDisposed()) {
                try {
                    this.observer.onError((Throwable)t);
                } finally {
                    this.dispose();
                }

                return true;
            } else {
                return false;
            }
        }

        public void onComplete() {
            if (!this.isDisposed()) {
                try {
                    this.observer.onComplete();
                } finally {
                    this.dispose();
                }
            }

        }

        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        public void setCancellable(Cancellable c) {
            this.setDisposable(new CancellableDisposable(c));
        }

        public ObservableEmitter<T> serialize() {
            return new ObservableCreate.SerializedEmitter(this);
        }

        public void dispose() {
            DisposableHelper.dispose(this);
        }

        public boolean isDisposed() {
            return DisposableHelper.isDisposed((Disposable)this.get());
        }

        public String toString() {
            return String.format("%s{%s}", this.getClass().getSimpleName(), super.toString());
        }
    }
}


以上,便可清楚,当调用observable.subscribe(observer) ,observer的onSubscribe(ObservableEmitter<T> observableEmitter)方法便会调用,同时create中的ObservableOnSubscribe对象的suscribe方法也会调用

总结

Observable observable=Observable.create(new ObservableOnSubscribe<String>() {
                           @Override
                           public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                                 //在observable.subscribe(observer);执行后执行,
                                //observableEmitter里面的Observer便是我们创建的Observer,其onNext,onError,
                               // onComplete均会交给我们创建的Observer执行
                           }
                       });

Observer observer =new Observer<String>() {
                          @Override
                          public void onSubscribe(Disposable disposable) {
                              //在observable.subscribe(observer);执行后执行
                          }

                          @Override
                          public void onNext(String s) {

                          }

                          @Override
                          public void onError(Throwable throwable) {

                          }

                          @Override
                          public void onComplete() {

                          }
                      };

observable.subscribe(observer);

observable.subscribe(observer);订阅方法执行后,observer的onSubscribe(Disposable disposable)方法会调用,ObservableOnSubscribe的subscribe(ObservableEmitter<String> observableEmitter)方法会调用,其中observableEmitter便是由我们创建的observer包装而成的,其发送的事件会被我们创建的Observer收到(disposable.dispose()之后的收不到)。以上便是个人学习RxJava的一点理解。

相关文章

网友评论

    本文标题:RxJava入门

    本文链接:https://www.haomeiwen.com/subject/clsjzftx.html